# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

In [2]:
sc = spark.sparkContext

In [11]:
tickets = sc.textFile("/data/aircrafts_data.csv")

In [12]:
tickets.collect()

['aircraft_code,model,range',
 '773,"Boeing 777-300",11100',
 '763,"Boeing 767-300",7900',
 'SU9,"Sukhoi Superjet-100",3000',
 '320,"Airbus A320-200",5700',
 '321,"Airbus A321-200",5600',
 '319,"Airbus A319-100",6700',
 '733,"Boeing 737-300",4200',
 'CN1,"Cessna 208 Caravan",1200',
 'CR2,"Bombardier CRJ-200",2700']

In [16]:
names = tickets.map(lambda x: (x.split(",")[0] , x.split(",")[1]))

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [22]:
boeing = names.filter(lambda x: x[1].startswith('B'))

In [24]:
boeing.collect()

[]

In [None]:
data.printSchema()

### 3.3. Process

In this example, we will get UK's population and unemployment rate thoughtout the years. Let's start by selecting the relevant columns.

In [6]:
unemployment = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [7]:
unemployment.show(n=10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



We successfully selected the desired columns but two problems were found:
+ The first line contains no data but the unit of measurement of each column;
+ There are many years with missing population and unemployment data.

Let's then remove the first line.

In [8]:
cols_description = unemployment.filter(unemployment['Description'] == 'Units')

In [9]:
cols_description.show()

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
+-----------+------------------+-----------------+



In [10]:
unemployment = unemployment.join(other=cols_description, on=['Description'], how='left_anti')

In [11]:
unemployment.show(n=10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
|       1218|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



Nice! Now, let's drop the dataframe rows with missing data and refactor its columns names.

In [12]:
unemployment = unemployment.dropna()

In [13]:
unemployment = unemployment.\
               withColumnRenamed("Description", 'year').\
               withColumnRenamed("Population (GB+NI)", "population").\
               withColumnRenamed("Unemployment rate", "unemployment_rate")

In [14]:
unemployment.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



### 3.4. Write

Lastly, we persist the unemployment data into the cluster's simulated **HDFS**.

In [15]:
unemployment.repartition(1).write.csv(path="data/uk-macroeconomic-unemployment-data.csv", sep=",", header=True, mode="overwrite")