In [11]:
# set pyspark env variables
import os
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'
           

In [12]:
# import pyspark
from pyspark.sql import SparkSession

In [13]:
# Create a SparkSession 
spark = SparkSession.builder.appName("Spark-Starter").getOrCreate()

In [14]:
# test set up
data = [("James", 24),("Smith", 19),("Williams", 35)]
df = spark.createDataFrame(data, ["Name","Age"])
# Displays the content of the DataFrame
df.show()


+--------+---+
|    Name|Age|
+--------+---+
|   James| 24|
|   Smith| 19|
|Williams| 35|
+--------+---+



In [18]:
# shut down SparkSession
spark.stop()

## SparkContext
- Represents the connection to a Spark
- Coordinates task execution across the cluster
- entry point in earlier version spark (1.x)
## SparkSession
- Introduced Spark 2.0
- Unified entry point for interacting with Spark
- Combines Spark/SQL/Hive/Streaming Context
- Support multiple API languages (Java, Scala, Python, R)


## Create SparkContenxt in Apache Spark Version 1.x

In [19]:
# import SparkContext from pyspark
from pyspark import SparkContext

# create a SparkContext object
sc = SparkContext(appName="SparkContextApp")

In [20]:
sc

In [21]:
# always remember to stop an existining session/object before starting a new one
sc.stop()

## Create SparkSession in Apache Spark Version 2.x

In [23]:
from pyspark.sql import SparkSession

# create a SparkSession
new_spark = SparkSession.builder.appName("ApacheSparkApp").getOrCreate()

# Get the SparkContenxt from the SparkSession
sc = new_spark.sparkContext

In [24]:
sc

In [25]:
# shut down the session
sc.stop()

# Create a new SparkSession and focus on Spark RDD, DataFrame, and computing operations


In [26]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ApacheSparkApp") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "4") \
    .getOrCreate()


In [27]:
# Perform operation using SparkSession
spark

In [28]:
# terminate Spark Session
spark.stop()

## RDD's characteristics
- Immutable
- Distributed
- Resilient
- Lazily Evaluated 
- Fault-tolerant operations

In [29]:
# Create a session that will be use to create and conduct operation via RDDS
from pyspark.sql import SparkSession

# create a SparkSession
spark = SparkSession.builder.appName("RDD-Demo").getOrCreate()


# Create RDDS

In [30]:
#  Create RDDS
numbers = [1,23,-4,52,-5,3,6,19,1,-1,0,10]
rdd = spark.sparkContext.parallelize(numbers)

In [31]:
# use the collect function to retrive and display the RDD
rdd.collect()

[1, 23, -4, 52, -5, 3, 6, 19, 1, -1, 0, 10]

In [33]:
# create an RDD from a list of tuples
data = [('Swimming', 'Phelps'),('Basketball', 'Kobe'),('Boxing', 'Floyed'),('Tennis', 'Serena'),('Basketball', 'Michael'),]

In [40]:
rdd = spark.sparkContext.parallelize(data)
# print the RDD list
rdd.collect()

[('Swimming', 'Phelps'),
 ('Basketball', 'Kobe'),
 ('Boxing', 'Floyed'),
 ('Tennis', 'Serena'),
 ('Basketball', 'Michael')]

In [42]:
# print only the tuples who play basketball
for sport, athlete in rdd.collect():
    if sport == "Basketball":
        print("{} is a {} player.".format(athlete, sport))

Kobe is a Basketball player.
Michael is a Basketball player.


## RDDs Operation Actions

In [45]:
# Display the number of records in the RDDS
print(f"Total Number of records: {rdd.count()}")


Total Number of records: 5


In [46]:
# return first element
print(rdd.first())

('Swimming', 'Phelps')


In [49]:
# return last element
print(rdd.collect()[-1])

('Basketball', 'Michael')


# RDD TRANSFORMATION
(Lazily evaluated which means no transformation will be executed unless an action has been taken)

In [53]:
# Map transformation to convert the first index in each tupple to all uppercase
upper_rdd = rdd.map(lambda x: (x[0], x[1].upper()))

In [54]:
# print
print("rdd with upper case names: {}".format(upper_rdd.collect()))

rdd with upper case names: [('Swimming', 'PHELPS'), ('Basketball', 'KOBE'), ('Boxing', 'FLOYED'), ('Tennis', 'SERENA'), ('Basketball', 'MICHAEL')]


## Save the RDD as a text file

In [61]:
print(type(rdd))

<class 'pyspark.rdd.RDD'>


In [65]:
spark.stop()