# **PySpark**: The Apache Spark Python API

## 1. Introduction

This notebook shows how to connect Jupyter notebooks to a Spark cluster to process data using Spark Python API.

## 2. The Spark Cluster

### 2.1. Connection

To connect to the Spark cluster, create a SparkSession object with the following params:

+ **appName:** application name displayed at the [Spark Master Web UI](http://localhost:8080/);
+ **master:** Spark Master URL, same used by Spark Workers;
+ **spark.executor.memory:** must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [118]:
from pyspark.sql import SparkSession

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

21/12/23 02:24:16 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
21/12/23 02:24:16 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:716)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:152)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:258)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$Mess

More confs for SparkSession object in standalone mode can be added using the **config** method. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SparkSession).

## 3. The Data

### 3.1. Introduction

We will be using Spark Python API to read, process and write data. Checkout the API docs [here](https://spark.apache.org/docs/latest/api/python/index.html).

### 3.2. Read


In [55]:
data = spark.read.csv(path="crude/oil_crude_vol_022021.csv", sep=",", header=True)

                                                                                

Let's then display some dataframe metadata, such as the number of rows and cols and its schema (cols name and type).

In [22]:
data.count()

30

In [23]:
len(data.columns)

4

In [26]:
data.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)



In [25]:
from pyspark.sql.types import IntegerType
data = data.withColumn("Volume", data["Volume"].cast(IntegerType()))

### 3.3. Process

In [27]:
# OrderBy
# Ascending
data.orderBy("Volume").show()

+----------+------+----------------+----------+
| timestamp|Volume|        Province|      City|
+----------+------+----------------+----------+
|16/02/2021| 30375|      Jawa Timur|     Tuban|
|16/02/2021| 30375|Kalimantan Barat|Balikpapan|
|16/02/2021| 30375|      Jawa Barat|  Balongan|
|18/02/2021| 40500|Kalimantan Barat|Balikpapan|
|24/02/2021| 49000|Kalimantan Barat|Balikpapan|
|19/02/2021| 65333|Kalimantan Barat|Balikpapan|
|25/02/2021| 65866|Kalimantan Barat|Balikpapan|
|12/02/2021| 80000|      Jawa Barat|  Balongan|
|12/02/2021| 80000|      Jawa Timur|     Tuban|
|12/02/2021| 80000|Kalimantan Barat|Balikpapan|
|17/02/2021| 80666|      Jawa Timur|     Tuban|
|17/02/2021| 80666|      Jawa Barat|  Balongan|
|17/02/2021| 80666|Kalimantan Barat|Balikpapan|
|18/02/2021| 81000|      Jawa Barat|  Balongan|
|19/02/2021| 81666|      Jawa Barat|  Balongan|
|24/02/2021| 81666|      Jawa Barat|  Balongan|
|26/02/2021| 82000|      Jawa Timur|     Tuban|
|26/02/2021| 82000|      Jawa Barat|  Ba

In [29]:
# Max
data.groupBy('City').max().show()

                                                                                

+----------+-----------+
|      City|max(Volume)|
+----------+-----------+
|  Balongan|      82666|
|     Tuban|     121500|
|Balikpapan|      82666|
+----------+-----------+



                                                                                

In [31]:
data.groupBy('Province').sum().show()

                                                                                

+----------------+-----------+
|        Province|sum(Volume)|
+----------------+-----------+
|Kalimantan Barat|     659072|
|      Jawa Barat|     765038|
|      Jawa Timur|     871006|
+----------------+-----------+



In [33]:
# Sum
data.agg({'Volume':'sum'}).show()

+-----------+
|sum(Volume)|
+-----------+
|    2295116|
+-----------+



### 3.4. Write

Lastly, we persist the unemployment data into the cluster's simulated **HDFS**.

In [34]:
city = data.groupBy('City').max()

In [35]:
city.repartition(1).write.csv(path="crude-output/sum-by-city.csv", sep=",", header=True, mode="overwrite")

21/12/23 00:57:46 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
21/12/23 00:57:46 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:716)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:152)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:258)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:168)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$Mess