# Lab 4 - Spark & PySpark

PySpark is a Python interface for Apache Spark. With it, we can apply SQL-like analysis on huge amounts of structured and semistructured data. If you're already familiar with Pandas and other Python libraries for data processing, PySpark is a good way to scale to Big Data processing.

As Spark is written in the Scala programming language, it requires the JVM to run. So our first step is to install Java:

In [1]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null

Next we need to download and upack Apache Spark and Hadoop:

In [2]:
# !wget https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
# !tar xf spark-3.3.1-bin-hadoop3.tgz
# !rm spark-3.3.1-bin-hadoop3.tgz   # Tidying up

--2024-02-16 03:06:27--  https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299350810 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.1-bin-hadoop3.tgz’


2024-02-16 03:06:40 (23.1 MB/s) - ‘spark-3.3.1-bin-hadoop3.tgz’ saved [299350810/299350810]



In [3]:
# Setting up our environmental variables:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

Next we'll need to install the findpark library to locate Spark on the system:

In [4]:
# !pip install -q findspark
import findspark
findspark.init()

We can now import SparkSession from `pyspark.sql` to create our entry point to Spark.

**Note:** If we were running our session on a cluster we would need the master name as an argument for `master()` - i.e. yarn. However, we'll be working locally so we'll just use `local[x]` where x is an int value > 0. This represents how many partitions should be created when using RDD, DataFrame etc. This should ideally be the number of CPU cores we have so we'll use `local[*]` here to indicate that we want to use all cores.  

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) #  This will format our output tables a bit nicer when not using the show() method
spark

**Note:** You can check how many CPU cores are available to you in the cell below. (This will likely be just 2 on Google Colab but may differ if you have chosen to run this lab via Jupyter Notebooks on your own system).

In [6]:
import multiprocessing
print(multiprocessing.cpu_count())

2


## Section 1 - Basic Collections and Operations

In [7]:
# Creating our SparkContext:
sc = spark.sparkContext

# A SparkContext represents the connection to a Spark cluster
# and can be used to create RDD and broadcast variables on that cluster.

# Create some basic data:
data = list(range(1, 101))

# Create our RDD from a list/collection:
rdd = sc.parallelize(data)

# Retrieve all the data with collect method:
rddCollect = rdd.collect()

# Print out our data:
print(rddCollect)

# Print some basic information about our data:
# Print the number of partitions:
print("No partitions: {}".format(rdd.getNumPartitions()))

# Print our first and max elements:
print("First element: {}".format(rdd.first()))
print("Max element: {}".format(rdd.max()))

# Applying a filter:
print("Values less than 20: {}".format(rdd.filter(lambda x: x < 20).collect()))
# Here, the collect() method is used to retrieve the content of the RDD as a single list.

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]
No partitions: 2
First element: 1
Max element: 100
Values less than 20: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]


You can have a look at the full list of operations and functions in the PySpark documentation which can be found [here](https://spark.apache.org/docs/latest/api/python/reference/index.html).

## Section 2: WordCount

To execute a "word count" job in PySpark, we'll need to load some data from a file and define map and reduce functions to analyse that data. You can use any text file you like, but let's work with something simple like last week.

In [8]:
!echo -e "hello\nthere\nold\nfriend\nfriend\nfriend\nthere" > sample.txt

In [9]:
# Read the file as an RDD:
text = sc.textFile("sample.txt")

In [10]:
# Get the number of lines in the file:
text.count()

7

In [11]:
# Get the first 4 lines:
text.take(4)

['hello', 'there', 'old', 'friend']

In [12]:
# Create our map reduce job:
word_count = text.flatMap(lambda line: line.split()) \
                 .map(lambda word: (word, 1)) \
                 .reduceByKey(lambda x, y: x + y)

The Spark engine will not execute the functions when you submit the transformations. All the steps will be executed when an “action” is invoked. We could do this by using the `saveAsTextFile()` method or `collect()` as shown below. This is the Spark "lazy" mode of operation.  

In [13]:
# Initiate action to collect our output and display it:
output = word_count.collect()
for (word, count) in output:
    print("{}: {}".format(word, count))

there: 2
hello: 1
old: 1
friend: 3


## Section 3: DataFrames and SQL

You can upload data from different formats (csv, json, xml, etc.), and the schema can be deduced from the file organisation. The main function (read) has options regarding format and modifiers, you can find more information in PySpark Documentation. The information will be loaded in a PySpark DataFrame.

In [14]:
!wget https://s3.ca-central-1.amazonaws.com/climate-mirror/www.ncdc.noaa.gov/orders/qclcd/QCLCD201508.zip
!unzip QCLCD201508.zip
!rm QCLCD201508.zip   # Tidying up

--2024-02-16 03:07:14--  https://s3.ca-central-1.amazonaws.com/climate-mirror/www.ncdc.noaa.gov/orders/qclcd/QCLCD201508.zip
Resolving s3.ca-central-1.amazonaws.com (s3.ca-central-1.amazonaws.com)... 52.95.146.132, 52.95.147.140, 52.95.146.188, ...
Connecting to s3.ca-central-1.amazonaws.com (s3.ca-central-1.amazonaws.com)|52.95.146.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 76187033 (73M) [application/zip]
Saving to: ‘QCLCD201508.zip’


2024-02-16 03:07:15 (57.9 MB/s) - ‘QCLCD201508.zip’ saved [76187033/76187033]

Archive:  QCLCD201508.zip
  inflating: 201508monthly.txt       
  inflating: 201508remarks.txt       
  inflating: 201508hourly.txt        
  inflating: 201508precip.txt        
  inflating: 201508station.txt       
  inflating: 201508daily.txt         


In [15]:
# Loading our hourly data into a PySpark DataFrame:
df = spark.read.options(delimiter=",", header=True, inferSchema=True).csv("/content/201508hourly.txt")

In [16]:
# We can see some more information about our data:
# The inferSchema automatically guesses the datatypes
# Otherwise it would assume all datatypes are strings
df.printSchema()

root
 |-- WBAN: integer (nullable = true)
 |-- Date: integer (nullable = true)
 |-- Time: integer (nullable = true)
 |-- StationType: integer (nullable = true)
 |-- SkyCondition: string (nullable = true)
 |-- SkyConditionFlag: string (nullable = true)
 |-- Visibility: string (nullable = true)
 |-- VisibilityFlag: string (nullable = true)
 |-- WeatherType: string (nullable = true)
 |-- WeatherTypeFlag: string (nullable = true)
 |-- DryBulbFarenheit: string (nullable = true)
 |-- DryBulbFarenheitFlag: string (nullable = true)
 |-- DryBulbCelsius: string (nullable = true)
 |-- DryBulbCelsiusFlag: string (nullable = true)
 |-- WetBulbFarenheit: string (nullable = true)
 |-- WetBulbFarenheitFlag: string (nullable = true)
 |-- WetBulbCelsius: string (nullable = true)
 |-- WetBulbCelsiusFlag: string (nullable = true)
 |-- DewPointFarenheit: string (nullable = true)
 |-- DewPointFarenheitFlag: string (nullable = true)
 |-- DewPointCelsius: string (nullable = true)
 |-- DewPointCelsiusFlag: str

Now we can explore our data a bit! We ca do this with by PySpark functions or with regular SQL queries. We'll use both here.

In [17]:
# First we'll show just the first 5 records:
df.select("*").show(5)

+----+--------+----+-----------+-------------+----------------+----------+--------------+-----------+---------------+----------------+--------------------+--------------+------------------+----------------+--------------------+--------------+------------------+-----------------+---------------------+---------------+-------------------+----------------+--------------------+---------+-------------+-------------+-----------------+---------------------+-------------------------+---------------+-------------------+----------------+--------------------+--------------+------------------+----------------+--------------------+----------+--------------+------------+----------------+---------+-------------+
|WBAN|    Date|Time|StationType| SkyCondition|SkyConditionFlag|Visibility|VisibilityFlag|WeatherType|WeatherTypeFlag|DryBulbFarenheit|DryBulbFarenheitFlag|DryBulbCelsius|DryBulbCelsiusFlag|WetBulbFarenheit|WetBulbFarenheitFlag|WetBulbCelsius|WetBulbCelsiusFlag|DewPointFarenheit|DewPointFarenhe

In [18]:
# Now we'll return just the "Date" column for the first 10 records:
df.select("Date").show(10)

+--------+
|    Date|
+--------+
|20150801|
|20150801|
|20150801|
|20150801|
|20150801|
|20150801|
|20150801|
|20150801|
|20150801|
|20150801|
+--------+
only showing top 10 rows



In [19]:
# Show the distinct values for the "StationType" column:
df.select("StationType").distinct().show()

+-----------+
|StationType|
+-----------+
|         12|
|          5|
|         15|
|         11|
|          0|
|          9|
|          4|
+-----------+



In [20]:
# We can also apply a where clause (where is an alias for filter):
df.where(df.StationType == 12).show()

+----+--------+----+-----------+------------+----------------+----------+--------------+-----------+---------------+----------------+--------------------+--------------+------------------+----------------+--------------------+--------------+------------------+-----------------+---------------------+---------------+-------------------+----------------+--------------------+---------+-------------+-------------+-----------------+---------------------+-------------------------+---------------+-------------------+----------------+--------------------+--------------+------------------+----------------+--------------------+----------+--------------+------------+----------------+---------+-------------+
|WBAN|    Date|Time|StationType|SkyCondition|SkyConditionFlag|Visibility|VisibilityFlag|WeatherType|WeatherTypeFlag|DryBulbFarenheit|DryBulbFarenheitFlag|DryBulbCelsius|DryBulbCelsiusFlag|WetBulbFarenheit|WetBulbFarenheitFlag|WetBulbCelsius|WetBulbCelsiusFlag|DewPointFarenheit|DewPointFarenheit

We can see here as well that we're limited to the top 20 rows unless we state explictly how many we want. Previously, we've set values lower than this but we can set this to > 20.

The column values are also truncated at 20 characters, but we can diable this by adding `truncate=False`.

In [21]:
df.where(df.StationType == 12).show(30, False)

+----+--------+----+-----------+-------------+----------------+----------+--------------+-----------+---------------+----------------+--------------------+--------------+------------------+----------------+--------------------+--------------+------------------+-----------------+---------------------+---------------+-------------------+----------------+--------------------+---------+-------------+-------------+-----------------+---------------------+-------------------------+---------------+-------------------+----------------+--------------------+--------------+------------------+----------------+--------------------+----------+--------------+------------+----------------+---------+-------------+
|WBAN|Date    |Time|StationType|SkyCondition |SkyConditionFlag|Visibility|VisibilityFlag|WeatherType|WeatherTypeFlag|DryBulbFarenheit|DryBulbFarenheitFlag|DryBulbCelsius|DryBulbCelsiusFlag|WetBulbFarenheit|WetBulbFarenheitFlag|WetBulbCelsius|WetBulbCelsiusFlag|DewPointFarenheit|DewPointFarenhe

In [22]:
# Aggregate the records by StationType and give the count of records:
df.groupBy("StationType").count().show()

+-----------+-------+
|StationType|  count|
+-----------+-------+
|         12| 564104|
|          5|  47576|
|         15|1024600|
|         11| 266494|
|          0|2267015|
|          9|   3674|
|          4|   3837|
+-----------+-------+



In [23]:
# Aggregate to get average visibility
df.agg({'Visibility': 'avg'}).show()

+-----------------+
|  avg(Visibility)|
+-----------------+
|9.232849114937112|
+-----------------+



In [24]:
df.groupBy('StationType').agg({'Visibility': 'avg'}).orderBy('StationType').show()

+-----------+------------------+
|StationType|   avg(Visibility)|
+-----------+------------------+
|          0| 9.355271418759141|
|          4|12.434263322884012|
|          5| 8.410460638776907|
|          9|11.377107523187918|
|         11| 9.053876037386187|
|         12| 8.860038572014776|
|         15|              null|
+-----------+------------------+



To use SQL statements you need first to create a table or view on the dataset.

In [25]:
# Creating a view with the name "Hourly":
df.createOrReplaceTempView("Hourly")

In [26]:
# List the distinct values for the field ”StationType”:
spark.sql('select distinct StationType from Hourly')

StationType
12
5
15
11
0
9
4


In [27]:
# Display the average temperature aggregated by date
spark.sql('select Date, avg(DryBulbFarenheit) from Hourly group by Date')

Date,avg(DryBulbFarenheit)
20150815,74.77668678433244
20150825,69.38363964329439
20150810,73.51759071137828
20150829,70.45618671057692
20150812,73.48426317212333
20150801,74.3186249174491
20150809,73.70625219776356
20150807,73.12576788334492
20150819,70.29239255092567
20150818,71.69551032133623


In [28]:
# Display the average temperature aggregated by date ordering by average:
spark.sql('select Date, avg(DryBulbFarenheit) from Hourly group by Date order by 2')

Date,avg(DryBulbFarenheit)
20150826,69.16310804302456
20150827,69.35265452421945
20150823,69.36308463618973
20150825,69.38363964329439
20150824,69.50841568263708
20150820,69.86937829322987
20150828,70.16160935945243
20150819,70.29239255092567
20150829,70.45618671057692
20150830,71.14330221159474


## Section 4 - Exercises

In this exercise you'll explore the page views of Wikimedia projects. Download the page view stats generated between 00:00 and 01:00 on Jan 1, 2016 and unpack it:

In [29]:
!wget https://dumps.wikimedia.org/other/pagecounts-raw/2016/2016-01/pagecounts-20160101-000000.gz
!gunzip /content/pagecounts-20160101-000000.gz

--2024-02-16 03:10:28--  https://dumps.wikimedia.org/other/pagecounts-raw/2016/2016-01/pagecounts-20160101-000000.gz
Resolving dumps.wikimedia.org (dumps.wikimedia.org)... 208.80.154.71, 2620:0:861:3:208:80:154:71
Connecting to dumps.wikimedia.org (dumps.wikimedia.org)|208.80.154.71|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 68709458 (66M) [application/octet-stream]
Saving to: ‘pagecounts-20160101-000000.gz’


2024-02-16 03:10:47 (3.57 MB/s) - ‘pagecounts-20160101-000000.gz’ saved [68709458/68709458]



Each line, delimited by a space, contains the stats for one Wikimedia page. The schema looks as follows (there are no column headers included):

`<project code> <page title> <num hits> <page size>`

Create an RDD or a DataFrame from the input file and write the code to solve each of the following tasks. You can use operations on DataFrames, SQL on DataFrames or operations on RDDs to solve these.

In [30]:
# TODO:

1. Determine the number of records the dataset has in total.

In [31]:
# TODO:

2. Determine the record with the largest page size. If multiple records have the same size, list all of them.

In [32]:
# TODO:

3. Compute the total number of pageviews (hits) for each project (as the schema shows, the first field of each record contains the project code).

In [33]:
# TODO:

4. Determine the number of page titles that start with the article ”The”. How many of those page titles are not part of the English project (pages that are part of the English project have “en” as first field)?

In [34]:
# TODO:

5. Determine the most frequently occurring page title term in this dataset.

  **Note:** We wouldn't typically use SQL for a task like this - it's quite complicated to extract terms from SQL entries!

In [35]:
# TODO:

In [36]:
# Stopping Spark:
sc.stop()
spark.stop()