# **Getting Started With Spark using Python**


![](http://spark.apache.org/images/spark-logo.png)


### The Python API


----


## Setup


In [1]:
# Installing required packages
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.4.2.tar.gz (311.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m311.1/311.1 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m35.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.2-py2.py3-none-any.whl size=311619841 sha256=e649eec323be61557e30548de91e5ba48fc7ada5e508c2f3f225809279162b6b
  Stored in directory: /home/jupyterlab/.cache/pip/wheels/c3/8a/ac/cd39777597318310141c8a783c06f516815a66194f100f96b6
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.2
C

In [2]:
import findspark
findspark.init()

In [3]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [4]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

##   Spark Context and Spark Session



In [5]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/01/12 09:15:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Task 2: Initialize Spark session
To work with dataframes we just need to verify that the spark session instance has been created.


In [6]:
spark

##  RDDs



#### Task 1: Create an RDD.



In [7]:
data = range(1,30)
# print first element of iterator
print(data[0])
len(data)
xrangeRDD = sc.parallelize(data, 4)

# this will let us know that we created an RDD
xrangeRDD

1


PythonRDD[1] at RDD at PythonRDD.scala:53

#### Task 2: Transformations


A transformation is an operation on an RDD that results in a new RDD. The transformed RDD is generated rapidly because the new RDD is lazily evaluated, which means that the calculation is not carried out when the new RDD is generated. The RDD will contain a series of transformations, or computation instructions, that will only be carried out when an action is called. In this transformation, we reduce each element in the RDD by 1. Note the use of the lambda function. We also then filter the RDD to only contain elements <10.


In [8]:
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)


#### Task 3: Actions 


A transformation returns a result to the driver. We now apply the `collect()` action to get the output from the transformation.


In [9]:
print(filteredRDD.collect())
filteredRDD.count()

                                                                                

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


10

#### Task 4: Caching Data


This simple example shows how to create an RDD and cache it. 


In [10]:
import time 

test = sc.parallelize(range(1,50000),4)
test.cache()

t1 = time.time()
# first count will trigger evaluation of count *and* cache
count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)


t2 = time.time()
# second count operates on cached data only
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)

#test.count()

                                                                                

dt1:  1.258234977722168
dt2:  0.3162391185760498


##  DataFrames and SparkSQL


In [11]:
spark

#### Task 1: Create Your First DataFrame


In [12]:
# Download the data first into a local `people.json` file
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    73  100    73    0     0    935      0 --:--:-- --:--:-- --:--:--   935


In [13]:
# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("people.json").cache()

In [14]:
# Print the dataframe as well as the data schema
df.show()
df.printSchema()

[Stage 5:>                                                          (0 + 1) / 1]

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



                                                                                

In [15]:
# Register the DataFrame as a SQL temporary view
df.createTempView("people")

#### Task 2: Explore the data using DataFrame functions and SparkSQL




In [16]:
# Select and show basic data columns

df.select("name").show()
df.select(df["name"]).show()
spark.sql("SELECT name FROM people").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
|Michael|
|   Andy|
| Justin|
+-------+

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
|Michael|
|   Andy|
| Justin|
+-------+



In [17]:
# Perform basic filtering

df.filter(df["age"] > 21).show()
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
| 30|Andy|
+---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
| 30|Andy|
+---+----+



In [18]:
# Perfom basic aggregation of data

df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()

                                                                                

+----+-----+
| age|count|
+----+-----+
|  19|    2|
|null|    2|
|  30|    2|
+----+-----+





+----+-----+
| age|count|
+----+-----+
|  19|    2|
|null|    0|
|  30|    2|
+----+-----+



                                                                                

----


### Question 1 - RDDs


Create an RDD with integers from 1-50. Apply a transformation to multiply every number by 2, resulting in an RDD that contains the first 50 even numbers. 


In [19]:
# starter code
# numbers = range(1, 50)
# numbers_RDD = ...
# even_numbers_RDD = numbers_RDD.map(lambda x: ..)

In [24]:
# Code block for learners to answer
data = range(1,50)
datardd=sc.parallelize(data)
new_data=datardd.map(lambda x:x *2)


print(new_data.collect())





[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]


Double-click **here** for the solution.

<!-- The answer is below:
numbers = range(1, 50) 
numbers_RDD = sc.parallelize(numbers) 
even_numbers_RDD = numbers_RDD.map(lambda x: x * 2)
print( even_numbers_RDD.collect()) 
-->


### Question 2 - DataFrames and SparkSQL


Similar to the `people.json` file, now read the `people2.json` file into the notebook, load it into a dataframe and apply SQL operations to determine the average age in our people2 file.


In [21]:
# starter code
# !curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json
# df = spark.read...
# df.createTempView..
# spark.sql("SELECT ...")

In [25]:
# Code block for learners to answer
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json
df= spark.read.json("people2.json").cache()
res= spark.sql("select avg(age) from people2")
res.show()

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   136  100   136    0     0    888      0 --:--:-- --:--:-- --:--:--   888


AnalysisException: 'Table or view not found: people2; line 1 pos 21'

Double-click **here** for a hint.

<!-- The hint is below:

1. The SQL query "Select AVG(column_name) from.." can be used to find the average value of a column. 
2. Another possible way is to use the dataframe operations select() and mean()
-->


Double-click **here** for the solution.

<!-- The answer is below:
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json
df = spark.read.json("people2.json").cache()
df.createTempView("people2")
result = spark.sql("SELECT AVG(age) from people2")
result.show()
-->


### Question 3 - SparkSession


Close the SparkSession we created for this notebook


In [23]:
# Code block for learners to answer

Double-click **here** for the solution.

<!-- The answer is below:

spark.stop() will stop the spark session

-->
