# **Getting Started With Spark using Python**

### The Python API

Spark is written in Scala, which compiles to Java bytecode, but you can write python code to communicate to the java virtual machine through a library called py4j. Python has the richest API, but it can be somewhat limiting if you need to use a method that is not available, or if you need to write a specialized piece of code. The latency associated with communicating back and forth to the JVM can sometimes cause the code to run slower.
An exception to this is the SparkSQL library, which has an execution planning engine that precompiles the queries. Even with this optimization, there are cases where the code may run slower than the native scala version.
The general recommendation for PySpark code is to use the "out of the box" methods available as much as possible and avoid overly frequent (iterative) calls to Spark methods. If you need to write high-performance or specialized code, try doing it in scala.
But hey, we know Python rules, and the plotting libraries are way better. So, it's up to you!


Below are the basics of Apache Spark and PySpark. We will start with creating the SparkContext and SparkSession. We then create an RDD and apply some basic transformations and actions. Finally we demonstrate the basics dataframes and SparkSQL.

The aim is to be able to:

* Create the SparkContext and SparkSession
* Create an RDD and apply some basic transformations and actions to RDDs
* Demonstrate the use of the basics Dataframes and SparkSQL

## Setting up the environment


In [None]:
#Install the required packages
#In this case we already have the packages installed within the conda environment
#If the packages are not installed  used the following

#!pip install pyspak
#!pip install findspark

#or run the following in the terminal if you use the following
#conda install -c conda-forge findspark
#conda install -c conda-forge pyspark


In [1]:
import findspark
findspark.init()

In [2]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

## Spark Context and Spark Session


Create the Spark Context and initialize the Spark session needed for SparkSQL and DataFrames.
SparkContext is the entry point for Spark applications and contains functions to create RDDs such as `parallelize()`. SparkSession is needed for SparkSQL and DataFrame operations.


#### Creating the spark session and context


In [3]:
# Create a spark context class
sc = SparkContext()

# Create a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.som.config.option", "some-value") \
    .getOrCreate()

#### Initialize Spark session
To work with dataframes we just need to verify that the spark session instance has been created.


In [4]:
spark

## RDDs
In this exercise we work with Resilient Distributed Datasets (RDDs). RDDs are Spark's primitive data abstraction and we use concepts from functional programming to create and manipulate RDDs. 


#### Task 1: Create an RDD.
For demonstration purposes, we create an RDD here by calling `sc.parallelize()`  
We create an RDD which has integers from 1 to 30.


In [5]:
data = range(1,30)

#print first element of iterator
print(data[0])
len(data)
xrangeRDD = sc.parallelize(data, 4)

#this is to let us know that we created an RDD
xrangeRDD

1


PythonRDD[1] at RDD at PythonRDD.scala:53

#### Task 2: Transformations


A transformation is an operation on an RDD that results in a new RDD. The transformed RDD is generated rapidly because the new RDD is lazily evaluated, which means that the calculation is not carried out when the new RDD is generated. The RDD will contain a series of transformations, or computation instructions, that will only be carried out when an action is called. In this transformation, we reduce each element in the RDD by 1. Note the use of the lambda function. We also then filter the RDD to only contain elements <10.


In [6]:
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x: x<10)

#### Task 3: Actions 


A transformation returns a result to the driver. We now apply the `collect()` action to get the output from the transformation.
Note that the worker nodes have processed the transformation and now the results returns to the driver

In [7]:
print(filteredRDD.collect())
filteredRDD.count()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


10

#### Task 4: Caching the data


This simple example shows how to create an RDD and cache it. Notice the **10x speed improvement**!  If you wish to see the actual computation time, browse to the Spark UI...it's at host:4040.  You'll see that the second calculation took much less time!


In [8]:
import time

test = sc.parallelize(range(1,50000),4)
test.cache()

t1 = time.time()
#fisrt count will trigger evaluation of count *and* cache
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)

t2 = time.time()
#second count operates on cached data only
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)

#test.count()

dt1:  6.138485670089722
dt2:  3.014261245727539


## Exercise 3: DataFrames and SparkSQL


In order to work with the extremely powerful SQL engine in Apache Spark, you will need a Spark Session. We have created that in the first Exercise, let us verify that spark session is still active.


In [9]:
spark

#### Task 1: Create Your First DataFrame!


You can create a structured data set (much like a database table) in Spark.  Once you have done that, you can then use powerful SQL tools to query and join your dataframes.


In [10]:
#Download the data first locally into a local file called 'people.json' file

!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
100    73  100    73    0     0     42      0  0:00:01  0:00:01 --:--:--    42


In [11]:
#Using the 'read.json()' function, read the dataset into a spark dataframe
df = spark.read.json("people.json").cache()

In [13]:
#to print the dataframe as well as the data schema
df.show()
df.printSchema()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [14]:
#Register the DataFrame as a SQL temporary view
df.createTempView("people")

#### Task 2: Explore the data using DataFrame functions and SparkSQL

In this section, we explore the datasets using functions both from dataframes as well as corresponding SQL queries using sparksql. Note the different ways to achieve the same task!


**Select and show basic data columns**

In [15]:
#the three lines achieve the same result
df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people").show

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



<bound method DataFrame.show of DataFrame[name: string]>

**Perform basic filtering**

In [16]:
df.filter(df["age"] > 21).show()
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



**Perform basic aggregation of data**

In [17]:
df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    0|
|  30|    1|
+----+-----+



-----------------------------------------------------------------------

## Practice Tasks

### Question 1 - RDDs


Create an RDD with integers from 1-50. Apply a transformation to multiply every number by 2, resulting in an RDD that contains the first 50 even numbers. 


In [18]:
data_range = range(1,50)
#print first element of iterator
print(data_range[0])
len(data_range)
zrangeRDD = sc.parallelize(data_range, 4)

#to let us know that we have created an RDD
zrangeRDD

1


PythonRDD[61] at RDD at PythonRDD.scala:53

In [25]:
#Transformation
#multiply every number by 2
numbers_RDD = zrangeRDD.map(lambda x: x*2)
even_numbers_RDD = numbers_RDD.map(lambda x: x%2 == 0, even_numbers_RDD)

In [26]:
#apply collect() action to get output of transformation
print(even_numbers_RDD.collect())
even_numbers_RDD.count()

[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True]


49

### Question 2 - DataFrames and SparkSQL


Similar to the `people.json` file, now read the `people2.json` file into the notebook, load it into a dataframe and apply SQL operations to determine the average age in our people2 file.


In [27]:
#first download the file from the following link

!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   136  100   136    0     0     69      0  0:00:01  0:00:01 --:--:--    69


In [28]:
#read the dataset into a spark dataframe

df1 = spark.read.json("people2.json").cache()

In [29]:
#print the data frame and data schema
df1.show()
df.printSchema()

+---+-------+
|age|   name|
+---+-------+
| 25|Michael|
| 24|   Andy|
| 19| Justin|
| 26| George|
| 30|   Jeff|
+---+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [30]:
#register the dataframe as a temporary view
df1.createTempView("people2")

In [33]:
#determine average age of people in the dataset
spark.sql("SELECT avg(age) as average_age FROM people2").show()

+-----------+
|average_age|
+-----------+
|       24.8|
+-----------+

