In [1]:
# Install required packages
!pip install pyspark
!pip install findspark



In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Create the spark session and context

In [4]:
# Create a spark context object
sc = SparkContext()

# Create a spark session 
spark = SparkSession.builder.appName("Python Spark DataFrames basic example").config("spark.some.config.option", "some-value").getOrCreate()

In [5]:
spark

## Resilient Distributed Datasets (RDDs)

In [6]:
# Create an RDD
data = range(1, 30)

# Print the first element of dataa
print(data[0])

len(data)
xrangeRDD = sc.parallelize(data, 4)

xrangeRDD

1


PythonRDD[1] at RDD at PythonRDD.scala:53

In [7]:
# Transformations
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x: x<10)

In [8]:
# Actions
print(filteredRDD.collect())
filteredRDD.count()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


10

In [9]:
# Caching data
import time

test = sc.parallelize(range(1,50000), 4)
test.cache()

t1 = time.time()
# First count will trigger evaluation of count and cache
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)

t2 = time.time()
# Second count operates on cached data only
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)

dt1:  1.1009724140167236
dt2:  0.2663764953613281


## DataFrames and SparkSQL

In [10]:
spark

In [29]:
# Create dataframe
# Download the data into people_temp.json
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json > people_temp.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    73  100    73    0     0    154      0 --:--:-- --:--:-- --:--:--   154


In [30]:
# Read the dataset into a spark dataframe 
df = spark.read.json("people_temp.json").cache()

In [31]:
# Print the dataframe and the data schema
df.show()
df.printSchema()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [32]:
# Register the DataFrame as an SQL temporary view
df.createTempView("people_temp")

In [33]:
# Select and show basic data columns
df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people_temp").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [34]:
# Perform basic filtering
df.filter(df["age"] > 21).show()
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
| 30|Andy|
+---+----+



In [35]:
# Perform basic aggregation of data
df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

+----+-----+
| age|count|
+----+-----+
|  19|    2|
|null|    0|
|  30|    2|
+----+-----+



## Stop the Spark session

In [46]:
spark.stop()