# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json,col
from pyspark.sql.types import *
from os.path import abspath

spark = SparkSession\
        .builder\
        .appName("pyspark-notebook")\
        .master("spark://spark-master:7077")\
        .config("spark.executor.memory", "512m")\
        .config("hive.metastore.uris", "thrift://hive-metastore:9083")\
        .config("spark.sql.warehouse.dir", "/user/hive/warehouse")\
        .enableHiveSupport()\
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [3]:
warehouse_location = abspath('spark-warehouse')
print(warehouse_location)


/opt/workspace/spark-warehouse


In [4]:
spark.sql("SET -v").select("key", "value").show(200, truncate = False)

+-----------------------------------------------------------+----------------------------------------------------------------+
|key                                                        |value                                                           |
+-----------------------------------------------------------+----------------------------------------------------------------+
|spark.sql.adaptive.advisoryPartitionSizeInBytes            |<value of spark.sql.adaptive.shuffle.targetPostShuffleInputSize>|
|spark.sql.adaptive.coalescePartitions.enabled              |true                                                            |
|spark.sql.adaptive.coalescePartitions.initialPartitionNum  |<undefined>                                                     |
|spark.sql.adaptive.coalescePartitions.minPartitionNum      |<undefined>                                                     |
|spark.sql.adaptive.enabled                                 |false                                             

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

## 3. The Data

### 3.1. Introduction

We will be using Spark Python API to read, process and write data. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/index.html).

### 3.2. Read

Let's read some UK's macroeconomic data ([source](https://www.kaggle.com/bank-of-england/a-millennium-of-macroeconomic-data)) from the cluster's simulated **Hadoop distributed file system (HDFS)** into a Spark dataframe.

In [5]:
data = spark.read.csv(path="datasets/uk-macroeconomic-data.csv", sep=",", header=True)

                                                                                

In [6]:
data.count()

                                                                                

841

In [7]:
unemployment = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [8]:
unemployment.show(n=10)

+-----------+------------------+-----------------+
|Description|Population (GB+NI)|Unemployment rate|
+-----------+------------------+-----------------+
|      Units|              000s|                %|
|       1209|              null|             null|
|       1210|              null|             null|
|       1211|              null|             null|
|       1212|              null|             null|
|       1213|              null|             null|
|       1214|              null|             null|
|       1215|              null|             null|
|       1216|              null|             null|
|       1217|              null|             null|
+-----------+------------------+-----------------+
only showing top 10 rows



In [9]:
unemployment = data.select(["Description", "Unemployment rate"])

In [10]:
cols_description = unemployment.filter(unemployment['Description'] == 'Units')

In [11]:
unemployment = unemployment.join(other=cols_description, on=['Description'], how='left_anti')

In [12]:
unemployment = unemployment.\
               withColumnRenamed("Description", 'year').\
               withColumnRenamed("Population (GB+NI)", "population").\
               withColumnRenamed("Unemployment rate", "unemployment_rate")

In [13]:
unemployment.show(n=10)

+----+-----------------+
|year|unemployment_rate|
+----+-----------------+
|1209|             null|
|1210|             null|
|1211|             null|
|1212|             null|
|1213|             null|
|1214|             null|
|1215|             null|
|1216|             null|
|1217|             null|
|1218|             null|
+----+-----------------+
only showing top 10 rows



In [None]:
unemployment.write.saveAsTable('managed_table10')

In [None]:
ndf= spark.sql("""
select * from managed_table10
""")


In [None]:
ndf.show(10)