# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

### Cluster overview

| Application     | URL                                      | Description                                                |
| --------------- | ---------------------------------------- | ---------------------------------------------------------- |
| JupyterLab      | [localhost:8888](http://localhost:8888/) | Cluster interface with built-in Jupyter notebooks          |
| Spark Driver    | [localhost:4040](http://localhost:4040/) | Spark Driver web ui                                        |
| Spark Master    | [localhost:8080](http://localhost:8080/) | Spark Master node                                          |
| Spark Worker I  | [localhost:8081](http://localhost:8081/) | Spark Worker node with 1 core and 512m of memory (default) |
| Spark Worker II | [localhost:8082](http://localhost:8082/) | Spark Worker node with 1 core and 512m of memory (default) |


## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [12]:
# !pip install networkx matplotlib graphframes

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark test demo").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

22/11/19 19:54:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/19 20:22:47 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
22/11/19 20:22:47 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:716)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:152)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:258)
	at org.apache.spark.deploy.client.StandaloneAppClien

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

## 3. The Data

### 3.1. Introduction

We will be using Spark Python API to read, process and write data. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/index.html).

### 3.2. Read

Let's read some UK's macroeconomic data ([source](https://www.kaggle.com/bank-of-england/a-millennium-of-macroeconomic-data)) from the cluster's simulated **Hadoop distributed file system (HDFS)** into a Spark dataframe.

In [2]:
data = spark.read.csv(path="data/uk-macroeconomic-data.csv", sep=",", header=True)

                                                                                

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [3]:
data.count()

22/11/19 19:36:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


841

In [4]:
len(data.columns)

77

In [5]:
data.printSchema()

root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string (nullable = true)
 |-- Stockbuildi

### 3.3. Process

In this example, we will get UK's population and unemployment rate thoughtout the years. Let's start by selecting the relevant columns.

In [6]:
unemployment = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [7]:
unemployment.show(n=10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



We successfully selected the desired columns but two problems were found:
+ The first line contains no data but the unit of measurement of each column;
+ There are many years with missing population and unemployment data.

Let's then remove the first line.

In [8]:
cols_description = unemployment.filter(unemployment['Description'] == 'Units')

In [9]:
cols_description.show()

[Stage 4:>                                                          (0 + 1) / 1]

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
+-----------+------------------+-----------------+



                                                                                

In [10]:
unemployment = unemployment.join(other=cols_description, on=['Description'], how='left_anti')

In [11]:
unemployment.show(n=10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
|       1218|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



Nice! Now, let's drop the dataframe rows with missing data and refactor its columns names.

In [12]:
unemployment = unemployment.dropna()

In [13]:
unemployment = unemployment.\
               withColumnRenamed("Description", 'year').\
               withColumnRenamed("Population (GB+NI)", "population").\
               withColumnRenamed("Unemployment rate", "unemployment_rate")

In [14]:
unemployment.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



### 3.4. Write

Lastly, we persist the unemployment data into the cluster's simulated **HDFS**.

In [15]:
unemployment.repartition(1).write.csv(path="hdfs://uk-macroeconomic-unemployment-data.csv", sep=",", header=True, mode="overwrite")

In [16]:
unemployment.count()

162

## 4. Do some fun to compare spark and python 

In [17]:
from pyspark.context import SparkContext
from time import time

sc = SparkContext.getOrCreate()

big_list = range(100000000)

rdd5 = sc.parallelize(big_list,5)

start = time()
sparkodds = rdd5.filter(lambda x: x % 2 != 0)
print("spark:", time() - start)

start = time()
gen_odds = list(filter(lambda x: x % 2 != 0, big_list))
print("python generator:", time() - start)

start = time()
pyodds = [x for x in big_list if  x % 2 != 0]
print("python loops:", time() - start)

spark: 0.00017762184143066406
python generator: 9.838774681091309
python loops: 7.480478525161743


In [18]:
text_path = '/usr/share/doc/python3/copyright'

start = time()
with open(text_path, "r") as f:
    n_lines, n_python_lines = [],[]
    for line in f:
        if 'python' in line.lower():
            n_python_lines.append(line)
        n_lines.append(line)
    with open('py_results.txt', 'w') as file_obj:
        file_obj.write(f'Number of lines: {len(n_lines)}\n')
        file_obj.write(f'Number of lines with python: {len(n_python_lines)}\n')
print("python:", time() - start)


spark_txt = sc.textFile("file:///"+text_path, minPartitions=5)
start = time()
python_lines = spark_txt.filter(lambda line: 'python' in line.lower())
with open('results.txt', 'w') as file_obj:
    file_obj.write(f'Number of lines: {spark_txt.count()}\n')
    file_obj.write(f'Number of lines with python: {python_lines.count()}\n')
print("spark:", time() - start)

python: 0.0007357597351074219


                                                                                

spark: 1.139918565750122


In [19]:
!cat results.txt

Number of lines: 319
Number of lines with python: 52


In [20]:
!cat py_results.txt

Number of lines: 319
Number of lines with python: 52


In [21]:
exit()