In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark

In [2]:
import findspark
findspark.init()

In [10]:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [9]:
data = range(1,30)
# print first element of iterator
print(data[0])
len(data)
xrangeRDD = sc.parallelize(data,4)

# this will let us know that we created an RDD
xrangeRDD.first

1


<bound method RDD.first of PythonRDD[7] at RDD at PythonRDD.scala:53>

A transformation is an operation on an RDD that results in a new RDD. The transformed RDD is generated rapidly because the new RDD is lazily evaluated, which means that the calculation is not carried out when the new RDD is generated. The RDD will contain a series of transformations, or computation instructions, that will only be carried out when an action is called. In this transformation, we reduce each element in the RDD by 1. Note the use of the lambda function. We also then filter the RDD to only contain elements <10


In [12]:
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)


In [13]:
print(filteredRDD.collect())
filteredRDD.count()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


10

This simple example shows how to create an RDD and cache it. Notice the 10x speed improvement! If you wish to see the actual computation time, browse to the Spark UI...it's at host:4040. You'll see that the second calculation took much less time!

In [14]:
import time 

test = sc.parallelize(range(1,50000),4)
test.cache()

t1 = time.time()
# first count will trigger evaluation of count *and* cache
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)


t2 = time.time()
# second count operates on cached data only
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)

dt1:  0.755441427230835
dt2:  0.3037271499633789


In order to work with the extremely powerful SQL engine in Apache Spark, you will need a Spark Session. We have created that in the first Exercise, let us verify that spark session is still active.

In [None]:
spark

In [None]:
# Download the data first into a local `people.json` file
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json

In [17]:
# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("people.json").cache()

In [None]:
# Print the dataframe as well as the data schema
df.show()
df.printSchema()

In [19]:
# Register the DataFrame as a SQL temporary view
df.createTempView("people")

In [None]:
# Select and show basic data columns
t1=time.time()
df.select("name").show()
t2=time.time()-t1
print(t2)
t3=time.time()
df.select(df["name"]).show()
t4=time.time()-t3
print(t4)
t5=time.time()
spark.sql("SELECT name FROM people").show()
t6=time.time()-t5
print(t6)

In [None]:
# Perform basic filtering

df.filter(df["age"] > 21).show()
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

In [None]:
# Perfom basic aggregation of data

df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()