# Lab 11 Spark

Author: ISTD, SUTD

Title: Lab 11, Spark part 1

Date: March 5, 2025

## Learning outcome


By the end of this lesson, you are able to

* Submit PySpark jobs to a Spark cluster
* Paralelize data processing using PySpark


You can either execute this lab directly on the aws cluster with HDFS file system, or you can install PySpark in Google Colab and load the files locally. The main difference in coding is that we do not load the context from the HDFS filesystem, but instead just load a local file. Other than than that, all PySpark commands are the same.

To run this lab, you can make a copy of this notebook or `File -> Open in Playground Mode`.

## Installing PySpark in Google Colab

To install PySpark in Google Collab, execute the below cell. This will download Spark and install all necessary libraries for this lab.

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F


[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com (91.189.91.82)] [Connecting to security.ub[0m[33m0% [Connecting to archive.ubuntu.com (91.189.91.82)] [Connecting to security.ub[0m                                                                               Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [802 kB]
Hit:8 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy In

## Wordcount Example

Let us first download the necessary data file. We can find it at `https://raw.githubusercontent.com/istd50043-2023-spring/cohort_problems/main/cc11/ex1/data.csv`.

Colab lets us execute unix commands, as long as we prepend them with `!`. So let's download the file and move it into a new folder called `input`. While we are at it, let's create a folder called `output` as well.

In [2]:
!wget https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc10/data/TheCompleteSherlockHolmes.txt
!mkdir input
!mv TheCompleteSherlockHolmes.txt input/
!mkdir output

--2024-04-05 03:41:04--  https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc10/data/TheCompleteSherlockHolmes.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3705628 (3.5M) [text/plain]
Saving to: ‘TheCompleteSherlockHolmes.txt’


2024-04-05 03:41:05 (38.0 MB/s) - ‘TheCompleteSherlockHolmes.txt’ saved [3705628/3705628]



You can check that the data.csv file downloaded by uncollapsing the left panel and checking the folder contents.

Now we are ready to write our PySpark code. The goal is to write a simple wordcounter:

In [3]:
import sys
from pyspark import SparkContext, SparkConf

# sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.

conf = SparkConf().setAppName("Wordcount Application")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# note that we load the text file directly with a local path instead of providing an hdfs url
input_file_name = 'input/TheCompleteSherlockHolmes.txt'
text_file = sc.textFile(input_file_name)

counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)

output_folder = './output/wordcount'
counts.saveAsTextFile(output_folder)

sc.stop()

## Exercise 1

Write a PySpark application which takes a (set of) Comma-seperated-value (CSV) file(s) with 2 columns and output a CSV file with first two columns same as the input file, and the third column contains the values obtained by splitting the first column using the second column as delimiter.

The input file can be found here: `https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex1/data.csv`.

For example, given input from a file:

```
50000.0#0#0#,#
0@1000.0@,@
1$,$
1000.00^Test_string,^
```


the program should output

```
50000.0#0#0#,#,['50000.0', '0', '0']
0@1000.0@,@,['0', '1000.0', '']
1$,$,['1', '']
1000.00^Test_string,^,['1000.00', 'Test_string']
```

and write it to a file.



In [4]:
!wget https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex1/data.csv
!mv data.csv input/

--2024-04-05 03:49:22--  https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex1/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 54 [text/plain]
Saving to: ‘data.csv’


2024-04-05 03:49:22 (2.96 MB/s) - ‘data.csv’ saved [54/54]



In [19]:
def for_each(record):
  cols = record.split(",")

  if len(cols) > 1:
    delimitter = cols[1]
    extra_col = str(cols[0].split(delimitter))
    cols.append(extra_col)

  return ",".join(cols)

In [57]:
!rm -r ./output/exercise_1

sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.

conf = SparkConf().setAppName("Exercise 1")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# note that we load the text file directly with a local path instead of providing an hdfs url
input_file_name = 'input/data.csv'
text_file = sc.textFile(input_file_name)

splits = text_file.map(for_each)

for line in splits.collect():
  print(line)

output_folder = './output/exercise_1'
splits.saveAsTextFile(output_folder)

sc.stop()

50000.0#0#0#,#,['50000.0', '0', '0', '']
0@1000.0@,@,['0', '1000.0', '']
1$,$,['1', '']
1000.00^Test_string,^,['1000.00', 'Test_string']


## Exercise 2

Write PySpark application which aggregates (counts) a (set of) CSV file(s) with 4 columns based on its third column, the destination IP.

The input file can be found here: `https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex2/data.csv`

Given input

```
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2
```
the program should output

```
 10.0.0.3.5001,13
 10.0.0.2.54880,7
```

In [29]:
!wget https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex2/data.csv
!mkdir input/exercise_2
!mv data.csv input/exercise_2

--2024-04-05 04:11:27--  https://raw.githubusercontent.com/sutd50043/cohortclass/main/cc11/ex2/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1000 [text/plain]
Saving to: ‘data.csv’


2024-04-05 04:11:28 (35.5 MB/s) - ‘data.csv’ saved [1000/1000]

mkdir: cannot create directory ‘input/exercise_2’: File exists


In [58]:
def split(record):
  split = record.split(", ")
  destination_ip = split[2]
  return destination_ip, 1

def count(current_count, change):
  return current_count + change

def format(record):
  destination_ip = str(record[0])
  count = str(record[1])
  return f'{destination_ip}, {count}'

In [59]:
!rm -r ./output/exercise_2

sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.

conf = SparkConf().setAppName("Exercise 2")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# note that we load the text file directly with a local path instead of providing an hdfs url
input_file_name = 'input/exercise_2/data.csv'
text_file = sc.textFile(input_file_name)

count = text_file.map(split) \
          .reduceByKey(count) \
          .map(format)

for line in count.collect():
  print(line)

output_folder = './output/exercise_2'
count.saveAsTextFile(output_folder)

sc.stop()

rm: cannot remove './output/exercise_2': No such file or directory
10.0.0.3.5001, 13
10.0.0.2.54880, 7


## Exercise 3

Given the same input as Exercise 2, write a PySpark application which outputs the following:

```
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899,10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001,10.0.0.2.54880, 2, 7
```


In the event the input is very huge with too many unique destination IP values, can your program scale?


The questions were adopted from `https://jaceklaskowski.github.io/spark-workshop/exercises/`


In [94]:
def split(record):
  return record.split(", ")

def ip_format(record):
  destination_ip = record[2]
  return destination_ip, 1

def count(current_count, change):
  return current_count + change

def format_splits(record):
  destination_ip = record[2]
  remaining_data = ", ".join(record)
  return destination_ip, remaining_data

def format_results(record):
  data = record[1][0]
  count = record[1][1]
  return f'{data}, {count}'

In [96]:
!rm -r ./output/exercise_3

sc.stop() # uncomment this during debugging to restart your context in case execution stopped mid-way this cell.

conf = SparkConf().setAppName("Exercise 3")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)

# note that we load the text file directly with a local path instead of providing an hdfs url
input_file_name = 'input/exercise_2/data.csv'
text_file = sc.textFile(input_file_name)

splits = text_file.map(split)
counts = splits.map(ip_format).reduceByKey(count)
joined = splits.map(format_splits).join(counts)
results = joined.map(format_results)

for line in results.collect():
  print(line)

output_folder = './output/exercise_3'
results.saveAsTextFile(output_folder)

sc.stop()

rm: cannot remove './output/exercise_3': No such file or directory
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604900, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604899, 10.0.0.2.54880, 10.0.0.3.5001, 2, 13
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2, 7
05:49:56.604908, 10.0.0.3.5001, 10.0.0.2.54880, 2, 7
05:49:56.604908, 10