In [1]:
import findspark
findspark.init()

In [2]:
# PySpark is the Spark API for Python. In this lab, we use PySpark to initialize the spark context. 
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Exercise 1 - Spark Context and Spark Session
In this exercise, you will create the Spark Context and initialize the Spark session needed for SparkSQL and DataFrames. SparkContext is the entry point for Spark applications and contains functions to create RDDs such as parallelize(). SparkSession is needed for SparkSQL and DataFrame operations.

In [3]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/08/24 12:59:55 WARN Utils: Your hostname, anhdd-Yoga-7-14ITL5 resolves to a loopback address: 127.0.1.1; using 192.168.77.108 instead (on interface wlp0s20f3)
24/08/24 12:59:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/24 12:59:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/08/24 13:00:09 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [4]:
data = range(1,30)
# print first element of iterator
print(data[0])
len(data)
xrangeRDD = sc.parallelize(data, 4)

# this will let us know that we created an RDD
xrangeRDD

1


PythonRDD[1] at RDD at PythonRDD.scala:53

In [5]:
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x : x<10)


In [6]:
print(filteredRDD.collect())
filteredRDD.count()

                                                                                

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


10

## Exercise 3: DataFrames and SparkSQL


In [7]:
spark

In [8]:
# Download the data first into a local `people.json` file
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    73  100    73    0     0     10      0  0:00:07  0:00:06  0:00:01    17


In [9]:
# Read the dataset into a spark dataframe using the `read.json()` function
df = spark.read.json("people.json").cache()

In [10]:
# Print the dataframe as well as the data schema
df.show()
df.printSchema()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [12]:
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [14]:
# Register the DataFrame as a SQL temporary view
df.createTempView("people")
spark.sql("SELECT name FROM people").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [15]:
# Perform basic filtering

df.filter(df["age"] > 21).show()
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [16]:
# Perfom basic aggregation of data

df.groupBy("age").count().show()
spark.sql("SELECT age, COUNT(age) as count FROM people GROUP BY age").show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|NULL|    1|
|  30|    1|
+----+-----+

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|NULL|    0|
|  30|    1|
+----+-----+



Create an RDD with integers from 1-50. Apply a transformation to multiply every number by 2, resulting in an RDD that contains the first 50 even numbers. 

In [19]:
# Create an RDD with integers from 1 to 50
numbers_rdd = sc.parallelize(range(1, 51))

# Apply a transformation to multiply every number by 2
even_numbers_rdd = numbers_rdd.map(lambda x: x * 2)

# Collect the results and print them
print(even_numbers_rdd.collect())

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100]


In [20]:
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people2.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   326  100   326    0     0    148      0  0:00:02  0:00:02 --:--:--   148


In [27]:
df = spark.read.json("people.json").cache()

24/08/24 14:56:44 WARN CacheManager: Asked to cache already cached data.


In [28]:
print(spark.catalog.listTables())
result = spark.sql("SELECT AVG(age) from people")
result.show()

[Table(name='people', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True), Table(name='people2', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
+--------+
|avg(age)|
+--------+
|    24.5|
+--------+



In [29]:
spark.stop()