# CS5052 - Spark Programming
> Created by: Professor Blesson Varghese\
> School of Computer Science, University of St Andrews\
> Contact: cs5052.staff@st-andrews.ac.uk

This notebook introduces you to Spark programming using Python. Spark is a system that coordinates the processing of large datasets in parallel across many machines. In practice, you could run Spark across a cluster of nodes that will be managed by Spark. Spark in the context of the lab is installed and run on a single machine. 

You can setup the enviroment to run this notebook on the lab machine by: 
```
cd <your desired folder>
python3.12 -m venv pyspark
. pyspark/bin/activate
pip install --upgrade pip
pip install pyspark jupyterlab
```

Run the JupyterLab server after activating the virtual environment using the following command:
```
jupyter-lab
```
A browser window should open automatically.

To create self-contained notebooks, explicit commands must be provided in the code within the notebook for installing any additional packages using the following command:
```
%pip install <package_name>
```

**Note:** The notebook submitted for the CS5052 Practical 1 must run on the lab machine. 

# `SparkSession`

- Every Spark application consists of a driver program and executors (workers); see figure below
- Driver program accesses Spark through a `SparkSession` object
    - A unified point of entry as of Spark 2.0
    - Represents a connection to a cluster
    - `SparkContext`, `SQLContext` and `HiveContext` all combined in `SparkSession`

 ![Spark Overview; Obtained from: https://spark.apache.org/docs/latest/cluster-overview.html](images/sparksession.png)

In [2]:
# Import SparkSession class from pyspark.sql module
# SparkSession is the entry point to Spark 
from pyspark.sql import SparkSession

In [3]:
# Create a SparkSession and assign it to variable 'spark'
# There are different variants on the usage - refer to the documentation or a tutorial
spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/13 19:46:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# DataFrame

## DataFrame: create manually

In [4]:
# Create a DataFrame with one column called “number” and 10000 rows
data = spark.range(1000).toDF("number")

# Shows the first 20 rows by default
data.show()

# Show more or fewer rows N
N = 50
data.show(N)

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows
+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
|    20|
|    21|
|    22|
|    23|
|    24|
|    25|
|    26|
|    27|
|    28|
|    29|
|    30|
|    31|
|    32|
|    33|
|    34|
|    35|
|    36|
|    37|
|    38|
|    39|
|    40|
|    41|
|    42|
|    43|
|    44|
|    45|
|    46|
|    47|
|    48|
|    49|
+------+
only showing top 50 rows


In [5]:
from pyspark.sql import Row

# Python list containing two rows
emp = [Row("Jack", 24), Row("Bobby", 26)]

# Convert Python data into Spark DataFrame
emp_df = spark.createDataFrame(emp, ["name","age"])

emp_df.show()


+-----+---+
| name|age|
+-----+---+
| Jack| 24|
|Bobby| 26|
+-----+---+



## DataFrame: create from file

In [6]:
# Create DataFrame from a file
df = ( 
    spark.read
    .option("header", True)         # Tells Spark the first line is a header
    .option("inferSchema", True)    # Spark scans the column and infers data type; id will be an integer, country and capital will be a string
    .format("csv")                  
    .load("sample_data1.csv")
)

df.show()

+---+--------------+-----------+
| ID|       Country|    Capital|
+---+--------------+-----------+
|  1|        Canada|     Ottawa|
|  2|        Mexico|Mexico City|
|  3|        Brazil|   Brasilia|
|  4|United Kingdom|     London|
|  5|        France|      Paris|
|  6|       Germany|     Berlin|
|  7|         India|  New Delhi|
|  8|         China|    Beijing|
|  9|         Japan|      Tokyo|
| 10|     Australia|   Canberra|
+---+--------------+-----------+



## DataFrame: Datasource

Many different file types are possible, including CSV, JSON, ORC, Parquet, Text, Table, JDBC

# Two Major Operations

- All abstractions such as RDD and DataFrames offer two types of operation
    - **Transformation:** construct a new RDD/DataFrame from a previous one
    - **Action:** compute the result based on an RDD/DataFrame

# Transformations

## Transformations: `printSchema()` and `describe()`

In [7]:
# Print the structure (data type of the columns) of the DataFrame
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- Capital: string (nullable = true)



In [8]:
# Describes the schema of the DataFrame
df.describe()

DataFrame[summary: string, ID: string, Country: string, Capital: string]

In [9]:
# Describe the structure of specific column
df.select("Country").describe()

DataFrame[summary: string, Country: string]

## Transformations: `where()` and `filter()`

In [10]:
df_population = ( 
                    spark.read
                    .option("header", True)         
                    .option("inferSchema", True) 
                    .format("csv")                  
                    .load("sample_data2.csv")
)

df_population.show()

hundredK_plus = df_population.filter("Population >= 100000")
hundredK_plus.show()

under_50K = df_population.where("Population <= 50000")
under_50K.show()

+---+-------------------+----------+
| ID|               Town|Population|
+---+-------------------+----------+
|  1|             London|   8982000|
|  2|         Birmingham|   1141000|
|  3|         Manchester|    553230|
|  4|              Leeds|    789200|
|  5|            Glasgow|    635640|
|  6|Newcastle upon Tyne|    300820|
|  7|            Bristol|    463400|
|  8|          Sheffield|    584853|
|  9|          Liverpool|    498042|
| 10|          Cambridge|    125000|
| 11|             Oxford|    154000|
| 12|               Bath|     89000|
| 13|               York|    209200|
| 14|         Canterbury|     55000|
| 15|          Lichfield|     34000|
| 16|         St Andrews|     17000|
| 17|          Inverness|     47000|
| 18|           Stirling|     37000|
| 19|           Aberdeen|    198000|
| 20|             Dundee|    148000|
+---+-------------------+----------+

+---+-------------------+----------+
| ID|               Town|Population|
+---+-------------------+----------+


## Transformation: `distinct()` and	`limit()` 

In [11]:
df_town_village = ( 
                    spark.read
                    .option("header", True)         
                    .option("inferSchema", True) 
                    .format("csv")                  
                    .load("sample_data3.csv")
)

df_town_village.show()

unique_county = df_town_village.select("County").distinct()
unique_county.show()

N = 5
shortN_list = df_town_village.limit(N)
shortN_list.show()

# Alternate usage
df_town_village.limit(N).show()

+--------------------+--------------+
|              County|  Town/Village|
+--------------------+--------------+
|       Aberdeenshire|      Aberdeen|
|       Aberdeenshire|         Banff|
|       Aberdeenshire|     Peterhead|
|       Aberdeenshire|      Banchory|
|                Fife|    St Andrews|
|                Fife|         Cupar|
|                Fife|    Anstruther|
|                Fife|     Kirkcaldy|
|            Highland|     Inverness|
|            Highland|  Fort William|
|            Highland|     Kingussie|
|            Highland|        Thurso|
|Dumfries and Gall...|      Dumfries|
|Dumfries and Gall...|Castle Douglas|
|Dumfries and Gall...|     Stranraer|
|   Perth and Kinross|         Perth|
|   Perth and Kinross|        Crieff|
|   Perth and Kinross|   Blairgowrie|
|            Stirling|      Stirling|
|            Stirling|     Callander|
+--------------------+--------------+

+--------------------+
|              County|
+--------------------+
|            Highl

## Transformation: Sorting using `sort()` or `orderBy()`

### Basic sorting

In [12]:
# Sort by a single column
sorted = df_town_village.sort("County")
sorted.show()

# Sort by multiple columns
sorted = df_town_village.sort("County", "Town/Village")
sorted.show()

+--------------------+--------------+
|              County|  Town/Village|
+--------------------+--------------+
|       Aberdeenshire|      Aberdeen|
|       Aberdeenshire|         Banff|
|       Aberdeenshire|     Peterhead|
|       Aberdeenshire|      Banchory|
|Dumfries and Gall...|      Dumfries|
|Dumfries and Gall...|Castle Douglas|
|Dumfries and Gall...|     Stranraer|
|                Fife|    St Andrews|
|                Fife|         Cupar|
|                Fife|    Anstruther|
|                Fife|     Kirkcaldy|
|            Highland|     Inverness|
|            Highland|  Fort William|
|            Highland|     Kingussie|
|            Highland|        Thurso|
|   Perth and Kinross|         Perth|
|   Perth and Kinross|        Crieff|
|   Perth and Kinross|   Blairgowrie|
|            Stirling|      Stirling|
|            Stirling|     Callander|
+--------------------+--------------+

+--------------------+--------------+
|              County|  Town/Village|
+----------

In [13]:
# Order by a single column
sorted = df_town_village.orderBy("Town/Village")
sorted.show()

# Order by multiple columns
sorted = df_town_village.orderBy("County", "Town/Village")
sorted.show()

+--------------------+--------------+
|              County|  Town/Village|
+--------------------+--------------+
|       Aberdeenshire|      Aberdeen|
|                Fife|    Anstruther|
|       Aberdeenshire|      Banchory|
|       Aberdeenshire|         Banff|
|   Perth and Kinross|   Blairgowrie|
|            Stirling|     Callander|
|Dumfries and Gall...|Castle Douglas|
|   Perth and Kinross|        Crieff|
|                Fife|         Cupar|
|Dumfries and Gall...|      Dumfries|
|            Highland|  Fort William|
|            Highland|     Inverness|
|            Highland|     Kingussie|
|                Fife|     Kirkcaldy|
|   Perth and Kinross|         Perth|
|       Aberdeenshire|     Peterhead|
|                Fife|    St Andrews|
|            Stirling|      Stirling|
|Dumfries and Gall...|     Stranraer|
|            Highland|        Thurso|
+--------------------+--------------+

+--------------------+--------------+
|              County|  Town/Village|
+----------

### Specifying sort direction

In [14]:
from pyspark.sql.functions import desc, asc 

sorted = df_town_village.orderBy(desc("Town/Village"))
sorted.show()

sorted = df_town_village.orderBy(asc("County"), desc("Town/Village"))
sorted.show()

+--------------------+--------------+
|              County|  Town/Village|
+--------------------+--------------+
|            Highland|        Thurso|
|Dumfries and Gall...|     Stranraer|
|            Stirling|      Stirling|
|                Fife|    St Andrews|
|       Aberdeenshire|     Peterhead|
|   Perth and Kinross|         Perth|
|                Fife|     Kirkcaldy|
|            Highland|     Kingussie|
|            Highland|     Inverness|
|            Highland|  Fort William|
|Dumfries and Gall...|      Dumfries|
|                Fife|         Cupar|
|   Perth and Kinross|        Crieff|
|Dumfries and Gall...|Castle Douglas|
|            Stirling|     Callander|
|   Perth and Kinross|   Blairgowrie|
|       Aberdeenshire|         Banff|
|       Aberdeenshire|      Banchory|
|                Fife|    Anstruther|
|       Aberdeenshire|      Aberdeen|
+--------------------+--------------+

+--------------------+--------------+
|              County|  Town/Village|
+----------

## Transformation: Sampling data using `sample`

In [15]:
with_replacement = False    # Sample without replacement; each row can appear at most once 
fraction = 0.50             # Roughly 50% of the rows are selected
seed = None                 # Sets the random seed for reproducibility; if an integer sample value is set it produces the same sample everytime 

sample = df_town_village.sample(with_replacement, fraction, seed)
sample.show()

+--------------------+--------------+
|              County|  Town/Village|
+--------------------+--------------+
|       Aberdeenshire|         Banff|
|                Fife|         Cupar|
|                Fife|    Anstruther|
|            Highland|     Inverness|
|            Highland|        Thurso|
|Dumfries and Gall...|Castle Douglas|
|Dumfries and Gall...|     Stranraer|
|   Perth and Kinross|   Blairgowrie|
|            Stirling|      Stirling|
+--------------------+--------------+



## Transformation: Aggregation

In [16]:
from pyspark.sql.functions import count, countDistinct 

df_town_village.select(count("County")).show()

df_town_village.select(countDistinct("County")).show()

# min, max, avg, first, last and groupBy functions are available and self explanatory


+-------------+
|count(County)|
+-------------+
|           20|
+-------------+

+----------------------+
|count(DISTINCT County)|
+----------------------+
|                     6|
+----------------------+



## DataFrame: Some Actions

In [17]:
# first()
row = df_town_village.first()
print(row)
print(row["Town/Village"])      #Access column of the first row

# show()
df_town_village.show()
N = 6
df_town_village.show(N)

# take(N)
N = 4
rows = df_town_village.take(N)  #Similar to first, but returns multiple rows
for row in rows:
    print(row)

# collect()
all_rows = df_town_village.collect()    #Returns all rows as a list of objects
# Note: if the DataFrame is large, then may not work as all memory is brought into memory
# Use this for small datasets or debugging
for row in all_rows:
    print(row)

#count()
print(f"Total rows: {df_town_village.count()}")

Row(County='Aberdeenshire', Town/Village='Aberdeen')
Aberdeen
+--------------------+--------------+
|              County|  Town/Village|
+--------------------+--------------+
|       Aberdeenshire|      Aberdeen|
|       Aberdeenshire|         Banff|
|       Aberdeenshire|     Peterhead|
|       Aberdeenshire|      Banchory|
|                Fife|    St Andrews|
|                Fife|         Cupar|
|                Fife|    Anstruther|
|                Fife|     Kirkcaldy|
|            Highland|     Inverness|
|            Highland|  Fort William|
|            Highland|     Kingussie|
|            Highland|        Thurso|
|Dumfries and Gall...|      Dumfries|
|Dumfries and Gall...|Castle Douglas|
|Dumfries and Gall...|     Stranraer|
|   Perth and Kinross|         Perth|
|   Perth and Kinross|        Crieff|
|   Perth and Kinross|   Blairgowrie|
|            Stirling|      Stirling|
|            Stirling|     Callander|
+--------------------+--------------+

+-------------+----------

# RDD

- Low level but still relevant in some cases:
- Raw data processing e.g. text file without structure.
    - Creating new RDDs
    - Transforming existing RDDs
    - Computing results from RDDs


## Create RDD

In [18]:
# From an existing file
lines = spark.sparkContext.textFile("README_dummy.md")

# or
sc = spark.sparkContext
lines = sc.textFile("README_dummy.md")

# Collect all lines into a Python list
all_lines = lines.collect()

# Print each line
for line in all_lines:
    print(line)

# Spark Sample Project
**Note:** this file will be used in some of the examples shown in `spark-test.ipynb`

This is a **dummy project** to demonstrate working with **Apache Spark** using Python (`pyspark`).  
It contains some sample data, Spark DataFrame examples, and basic transformations.

---

## Overview

Apache Spark is a **distributed computing framework** that allows processing of large datasets across multiple machines.  
It provides APIs for Python, Scala, Java, and R, with built-in support for SQL, machine learning, and streaming.

In this project, we will use **Spark DataFrames** to handle tabular data efficiently.

---

## Sample Spark Operations

```python
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create Spark session
spark = SparkSession.builder.appName("DummyProject").getOrCreate()

# Load sample CSV
df = spark.read.option("header", True).csv("sample_data.csv")

# Show first 5 rows
df.show(5)

# Filter rows using Spark
high_population

In [19]:
# From a list
numbers = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])
# numbers is an RDD containing numbers 1 to 8
# The data is split into partitions and can be processed in parallel

# Aggregate the partitions and print all numbers
print(numbers.collect())

[1, 2, 3, 4, 5, 6, 7, 8]


In [20]:
# Create an RDD from a file 
lines = sc.textFile("README_dummy.md")

# Create new RDD with lines containing Spark
lines = lines.filter(lambda x: 'Spark' in x)

# Count the number of items in this RDD
# Note: The above two lines doesn't do anything 
# The statement below will read the file and do the computation
print(lines.count())

# The statement below will read the file and do the computation again
print(lines.count())


10
10


## RDD - Persisting

- Spark recomputes RDDs each time an action is performed on it
    - By default RDD is not stored in memory
    - The `persist()` function stores an RDD permanently

In [21]:
lines = sc.textFile("README_dummy.md")
lines = lines.filter(lambda x: 'Spark' in x)

# Load and store dataset in memory
lines.persist()

# Perform computation on the stored dataset
print(lines.count())

10


## Basic RDD Transformation Functions

- Construct an RDD from a previous one
    - Performed on one or more RDDs
    - Return a new RDD

### `filter()`
- Takes in a function, returns an RDD that only has elements that pass the filter() function

In [22]:
lines = sc.textFile("README_dummy.md")

# Create a new RDD consisting lines that contain ‘Spark’
lines = lines.filter(lambda x: 'Spark' in x)

all_lines = lines.collect()

for line in all_lines:
    print(line)

# Spark Sample Project
This is a **dummy project** to demonstrate working with **Apache Spark** using Python (`pyspark`).  
It contains some sample data, Spark DataFrame examples, and basic transformations.
Apache Spark is a **distributed computing framework** that allows processing of large datasets across multiple machines.  
In this project, we will use **Spark DataFrames** to handle tabular data efficiently.
## Sample Spark Operations
from pyspark.sql import SparkSession
# Create Spark session
spark = SparkSession.builder.appName("DummyProject").getOrCreate()
# Filter rows using Spark


### `map()`
- Takes in a function and applies it to each element in the RDD

In [23]:
lines = sc.textFile("README_dummy.md")

# Create a new RDD in which all strings are in uppercase
lines = lines.map(lambda x: x.upper())

all_lines = lines.collect()

for line in all_lines:
    print(line)

# SPARK SAMPLE PROJECT
**NOTE:** THIS FILE WILL BE USED IN SOME OF THE EXAMPLES SHOWN IN `SPARK-TEST.IPYNB`

THIS IS A **DUMMY PROJECT** TO DEMONSTRATE WORKING WITH **APACHE SPARK** USING PYTHON (`PYSPARK`).  
IT CONTAINS SOME SAMPLE DATA, SPARK DATAFRAME EXAMPLES, AND BASIC TRANSFORMATIONS.

---

## OVERVIEW

APACHE SPARK IS A **DISTRIBUTED COMPUTING FRAMEWORK** THAT ALLOWS PROCESSING OF LARGE DATASETS ACROSS MULTIPLE MACHINES.  
IT PROVIDES APIS FOR PYTHON, SCALA, JAVA, AND R, WITH BUILT-IN SUPPORT FOR SQL, MACHINE LEARNING, AND STREAMING.

IN THIS PROJECT, WE WILL USE **SPARK DATAFRAMES** TO HANDLE TABULAR DATA EFFICIENTLY.

---

## SAMPLE SPARK OPERATIONS

```PYTHON
FROM PYSPARK.SQL IMPORT SPARKSESSION
FROM PYSPARK.SQL.FUNCTIONS IMPORT COL

# CREATE SPARK SESSION
SPARK = SPARKSESSION.BUILDER.APPNAME("DUMMYPROJECT").GETORCREATE()

# LOAD SAMPLE CSV
DF = SPARK.READ.OPTION("HEADER", TRUE).CSV("SAMPLE_DATA.CSV")

# SHOW FIRST 5 ROWS
DF.SHOW(5)

# FILTER ROWS USING SPARK
HIGH_POPULATION

### `flatmap()`
- Applies a function to each element in an RDD
- Returns a sequence (list of elements)
- The final RDD is flattened

In [24]:
lines = sc.parallelize([
    "I love Spark",
    "Spark is awesome",
    "Big data rocks"
])

words_using_map = lines.map(lambda line: line.split(" "))
print(words_using_map.collect())

[['I', 'love', 'Spark'], ['Spark', 'is', 'awesome'], ['Big', 'data', 'rocks']]


In [25]:
words_using_flatmap = lines.flatMap(lambda line: line.split(" "))
print(words_using_flatmap.collect())

['I', 'love', 'Spark', 'Spark', 'is', 'awesome', 'Big', 'data', 'rocks']


### `distinct()`
- Returns a new RDD with only distinct items

In [26]:
numbers = sc.parallelize([0, 1, 2, 4, 7, 5, 4, 3, 2, 1, 1, 0])
numbers = numbers.distinct()
print(numbers.collect())

[0, 1, 2, 3, 4, 5, 7]


### `union(other)`
- Returns a new RDD consisting of items from both sources

In [27]:
numbers = sc.parallelize([0, 1, 2, 3, 4])
characters = sc.parallelize(['A', 'B', 'C', 'D', 'E'])
result = numbers.union(characters)
print(result.collect())

[0, 1, 2, 3, 4, 'A', 'B', 'C', 'D', 'E']


### `intersection(other)`
- Returns a new RDD consisting of only items from both sources and removes all duplicates

In [28]:
number_list1 = sc.parallelize([0, 1, 2, 4, 6, 7, 8])
number_list2 = sc.parallelize([0, 1, 3, 4, 5, 7])
result = number_list1.intersection(number_list2)
print(result.collect())

[0, 1, 4, 7]


### `subtract(other)`
- Returns a new RDD consisting of only items in the first RDD but not in the other one

In [29]:
number_list1 = sc.parallelize([0, 1, 2, 4, 6, 7, 8])
number_list2 = sc.parallelize([0, 1, 3, 4, 5, 7])
result = number_list1.subtract(number_list2)
print(result.collect())

[2, 6, 8]


## Basic RDD Action Functions

- Compute result based on RDD(s)
    - Performed on one or more RDD(s)
    - Return a result, which is not an RDD

### `first()`
- Returns the first item in an RDD

In [30]:
numbers = sc.parallelize([0, 1, 2, 3, 4, 5, 6, 7, 8]) 
print(numbers.first())

0


### `collect()`
- Returns a list containing the entire RDD's content

In [31]:
numbers = sc.parallelize([0, 1, 2, 3, 4, 5, 6, 7, 8]) 
print(numbers.collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8]


### `count()`
- Returns the number of items in an RDD

In [32]:
numbers = sc.parallelize([0, 1, 2, 3, 4, 5, 6, 7, 8]) 
print(numbers.count())

9


### `reduce(function)`
- Takes a function that operates on two elements and returns a new element

In [33]:
numbers = sc.parallelize([1, 2, 3, 4, 5]) 
result = numbers.reduce(lambda x, y: x * y)
print(result)

120


### `takeOrdered(num, ordering)`
- Return a number of items based on the provided ordering

In [34]:
numbers = sc.parallelize([8, 0, 4, 6, 9, 7, 2, 1, 5, 3])

# Return five smallest numbers from the list
print(numbers.takeOrdered(5, lambda x: x))

# Return five largest numbers from the list
print(numbers.takeOrdered(5, lambda x: -x))
# Note: How this function works:
# Original numbers: 8, 0, 4, 6, 9, 7, 2, 1, 5, 3
# Negated numbers: -8, 0, -4, -6, -9, -7, -2, -1, -5, -3
# 5 smallest of these: -9, -8, -7, -6, -5
# Negate back: 9, 8, 7, 6, 5

[0, 1, 2, 3, 4]
[9, 8, 7, 6, 5]
