<a href="https://colab.research.google.com/github/MKastek/pyspark-notes/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Resource  
https://jacobcelestine.com/knowledge_repo/colab_and_pyspark/#prerequisite  

### Installing PySpark


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.1-bin-hadoop3.tgz
!pip install -q findspark


In [None]:
!ls

sample_data  spark-3.5.1-bin-hadoop3  spark-3.5.1-bin-hadoop3.tgz


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [None]:
import findspark
findspark.init()


import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("PySparkHelloWorld") \
       .getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
sc = spark.sparkContext

**Example list of data 1000 numbers**

In [None]:
numbers = list(range(0, 10 ** 3))

**Default Parallelism**

In [None]:
sc.defaultParallelism

2

**Partition**

In [None]:
nums_rdd = sc.parallelize(numbers)

In [None]:
nums_rdd.getNumPartitions()

2

In [None]:
nums_rdd_repartition = nums_rdd.repartition(16)

In [None]:
nums_rdd_repartition.getNumPartitions()

16

In [None]:
%timeit num_rdd_squared = nums_rdd_repartition.map(lambda x: x**2)

375 µs ± 70.2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [None]:
%timeit nums_rdd_repartition.collect()

347 ms ± 117 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# Import Data from API into Dataframe

In [None]:
import requests
response = requests.get("https://public-esa.ose.gov.pl/api/v1/smog")
list_json = [school['school'] | school['data'] | {'timestamp': school['timestamp']} for school in response.json()['smog_data']]

**Create DataFrame from parallelized lists of json**

In [None]:
smog_df = spark.createDataFrame(sc.parallelize(list_json))

**Viewing the DataFrame**  
There are a couple of ways to view your dataframe(DF) in PySpark:  
- `df.take(5)` will return a list of five Row objects.  
- `df.collect()` will get all of the data from the entire DataFrame. Be really careful when using it, because if you have a large data set, you can easily crash the driver node.  
- `df.show()` view a dataframe, parametrs (number of rows, truncaiton). Example: `df.show(5, truncate=False)`.  
- `df.limit(5)` will return a new DataFrame by taking the first n rows. As spark is distributed in nature, there is no guarantee that df.limit() will give you the same results each time.


In [None]:
smog_df.show(5, truncate=False)

+-----------------+-----------------+---------+----------+------------------------------------------------------------------------------------+------------------+------------------+---------+------------------+------------------------+------------------+-------------------+
|city             |humidity_avg     |latitude |longitude |name                                                                                |pm10_avg          |pm25_avg          |post_code|pressure_avg      |street                  |temperature_avg   |timestamp          |
+-----------------+-----------------+---------+----------+------------------------------------------------------------------------------------+------------------+------------------+---------+------------------+------------------------+------------------+-------------------+
|KRASZEWICE       |91.98333333333333|51.51563 |18.22403  |SZKOŁA PODSTAWOWA IM. MARIANA FALSKIEGO W KRASZEWICACH                              |3.99              |2.31666666666

In [None]:
smog_df.show(5)

+-----------------+-----------------+---------+----------+--------------------+------------------+------------------+---------+------------------+--------------------+------------------+-------------------+
|             city|     humidity_avg| latitude| longitude|                name|          pm10_avg|          pm25_avg|post_code|      pressure_avg|              street|   temperature_avg|          timestamp|
+-----------------+-----------------+---------+----------+--------------------+------------------+------------------+---------+------------------+--------------------+------------------+-------------------+
|       KRASZEWICE|91.98333333333333| 51.51563|  18.22403|SZKOŁA PODSTAWOWA...|              3.99|2.3166666666666664|   63-522| 992.9533333333334|         UL. SZKOLNA|11.676666666666668|2024-10-13 14:41:48|
|       WRZĄSOWICE|             69.7| 49.96103|  19.94282|SZKOŁA PODSTAWOWA...|2.1999999999999997|2.1999999999999997|   32-040| 970.1666666666666|         UL. SZKOLNA|     

In [None]:
smog_df.limit(5)

city,humidity_avg,latitude,longitude,name,pm10_avg,pm25_avg,post_code,pressure_avg,street,temperature_avg,timestamp
KRASZEWICE,91.98333333333332,51.51563,18.22403,SZKOŁA PODSTAWOWA...,3.99,2.3166666666666664,63-522,992.9533333333334,UL. SZKOLNA,11.676666666666668,2024-10-13 14:41:48
WRZĄSOWICE,69.7,49.96103,19.94282,SZKOŁA PODSTAWOWA...,2.2,2.2,32-040,970.1666666666666,UL. SZKOLNA,14.775,2024-10-13 14:41:48
STRZELCE OPOLSKIE,99.90000000000002,50.503431,18.314889,PUBLICZNA SZKOŁA ...,1.0,0.5366666666666666,47-100,1023.9333333333334,UL. WAWRZYŃCA ŚWI...,6.9,2024-10-13 14:41:48
PSZCZYNA,91.67,49.965883,18.9457058,ZESPÓŁ SZKÓŁ NR 1...,5.66,3.513333333333333,43-200,978.3033333333332,UL. KAZIMIERZA WI...,12.57,2024-10-13 14:41:48
JANKÓW PRZYGODZKI,99.7,51.596172,17.7889069,ZESPÓŁ SZKÓŁ IM. ...,13.8,7.68,63-421,991.3333333333334,SZKOLNA,10.1,2024-10-13 14:41:48


In [None]:
smog_df.take(5)

[Row(city='KRASZEWICE', humidity_avg=91.98333333333333, latitude='51.51563', longitude='18.22403', name='SZKOŁA PODSTAWOWA IM. MARIANA FALSKIEGO W KRASZEWICACH', pm10_avg=3.99, pm25_avg=2.3166666666666664, post_code='63-522', pressure_avg=992.9533333333334, street='UL. SZKOLNA', temperature_avg=11.676666666666668, timestamp='2024-10-13 14:41:48'),
 Row(city='WRZĄSOWICE', humidity_avg=69.7, latitude='49.96103', longitude='19.94282', name='SZKOŁA PODSTAWOWA WE WRZĄSOWICACH', pm10_avg=2.1999999999999997, pm25_avg=2.1999999999999997, post_code='32-040', pressure_avg=970.1666666666666, street='UL. SZKOLNA', temperature_avg=14.775, timestamp='2024-10-13 14:41:48'),
 Row(city='STRZELCE OPOLSKIE', humidity_avg=99.90000000000002, latitude='50.503431', longitude='18.314889', name='PUBLICZNA SZKOŁA PODSTAWOWA NR 2 IM. KAZIMIERZA MALCZEWSKIEGO W STRZELCACH OPOLSKICH', pm10_avg=1.0, pm25_avg=0.5366666666666666, post_code='47-100', pressure_avg=1023.9333333333334, street='UL. WAWRZYŃCA ŚWIERZEGO', t

In [None]:
smog_df.collect()

[Row(city='KRASZEWICE', humidity_avg=91.98333333333333, latitude='51.51563', longitude='18.22403', name='SZKOŁA PODSTAWOWA IM. MARIANA FALSKIEGO W KRASZEWICACH', pm10_avg=3.99, pm25_avg=2.3166666666666664, post_code='63-522', pressure_avg=992.9533333333334, street='UL. SZKOLNA', temperature_avg=11.676666666666668, timestamp='2024-10-13 14:41:48'),
 Row(city='WRZĄSOWICE', humidity_avg=69.7, latitude='49.96103', longitude='19.94282', name='SZKOŁA PODSTAWOWA WE WRZĄSOWICACH', pm10_avg=2.1999999999999997, pm25_avg=2.1999999999999997, post_code='32-040', pressure_avg=970.1666666666666, street='UL. SZKOLNA', temperature_avg=14.775, timestamp='2024-10-13 14:41:48'),
 Row(city='STRZELCE OPOLSKIE', humidity_avg=99.90000000000002, latitude='50.503431', longitude='18.314889', name='PUBLICZNA SZKOŁA PODSTAWOWA NR 2 IM. KAZIMIERZA MALCZEWSKIEGO W STRZELCACH OPOLSKICH', pm10_avg=1.0, pm25_avg=0.5366666666666666, post_code='47-100', pressure_avg=1023.9333333333334, street='UL. WAWRZYŃCA ŚWIERZEGO', t

**Viewing DataFrame Columns**

In [None]:
smog_df.columns

['city',
 'humidity_avg',
 'latitude',
 'longitude',
 'name',
 'pm10_avg',
 'pm25_avg',
 'post_code',
 'pressure_avg',
 'street',
 'temperature_avg',
 'timestamp']

**Dataframe schema**  
There ate two methods commonly used to view the data types of a dataframe:

In [None]:
smog_df.dtypes

[('city', 'string'),
 ('humidity_avg', 'double'),
 ('latitude', 'string'),
 ('longitude', 'string'),
 ('name', 'string'),
 ('pm10_avg', 'double'),
 ('pm25_avg', 'double'),
 ('post_code', 'string'),
 ('pressure_avg', 'double'),
 ('street', 'string'),
 ('temperature_avg', 'double'),
 ('timestamp', 'string')]

In [None]:
smog_df.printSchema()

root
 |-- city: string (nullable = true)
 |-- humidity_avg: double (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- name: string (nullable = true)
 |-- pm10_avg: double (nullable = true)
 |-- pm25_avg: double (nullable = true)
 |-- post_code: string (nullable = true)
 |-- pressure_avg: double (nullable = true)
 |-- street: string (nullable = true)
 |-- temperature_avg: double (nullable = true)
 |-- timestamp: string (nullable = true)



**Selecting data**

In [None]:
smog_df.select("*")

city,humidity_avg,latitude,longitude,name,pm10_avg,pm25_avg,post_code,pressure_avg,street,temperature_avg,timestamp
KRASZEWICE,91.98333333333332,51.51563,18.22403,SZKOŁA PODSTAWOWA...,3.99,2.3166666666666664,63-522,992.9533333333334,UL. SZKOLNA,11.676666666666668,2024-10-13 14:41:48
WRZĄSOWICE,69.7,49.96103,19.94282,SZKOŁA PODSTAWOWA...,2.2,2.2,32-040,970.1666666666666,UL. SZKOLNA,14.775,2024-10-13 14:41:48
STRZELCE OPOLSKIE,99.90000000000002,50.503431,18.314889,PUBLICZNA SZKOŁA ...,1.0,0.5366666666666666,47-100,1023.9333333333334,UL. WAWRZYŃCA ŚWI...,6.9,2024-10-13 14:41:48
PSZCZYNA,91.67,49.965883,18.9457058,ZESPÓŁ SZKÓŁ NR 1...,5.66,3.513333333333333,43-200,978.3033333333332,UL. KAZIMIERZA WI...,12.57,2024-10-13 14:41:48
JANKÓW PRZYGODZKI,99.7,51.596172,17.7889069,ZESPÓŁ SZKÓŁ IM. ...,13.8,7.68,63-421,991.3333333333334,SZKOLNA,10.1,2024-10-13 14:41:48
ĆWIKLICE,99.7,49.971937,18.989839,SZKOŁA PODSTAWOWA...,6.766666666666667,3.7066666666666666,43-229,978.9,UL. MĘCZENNIKÓW O...,12.5,2024-10-13 14:41:48
STUDZIONKA,99.20333333333332,49.9603559,18.774985,SZKOŁA PODSTAWOWA...,8.209999999999999,4.57,43-245,981.5533333333332,UL. JORDANA,11.973333333333334,2024-10-13 14:41:48
PIASEK,99.90000000000002,50.00955,18.94634,ZESPÓŁ SZKOLNO-PR...,0.0333333333333333,0.0166666666666666,43-211,986.2666666666668,SZKOLNA,12.433333333333332,2024-10-13 14:41:48
ŁĄKA,91.89,49.9582444,18.906757,ZESPÓŁ SZKOLNO-PR...,5.6000000000000005,3.303333333333333,43-241,983.7366666666668,FITELBERGA,12.366666666666667,2024-10-13 14:41:48
LUBOŃ,99.13333333333333,52.3481,16.8968,SZKOŁA PODSTAWOWA...,1.5666666666666669,0.8700000000000001,62-030,989.1333333333332,ARMII POZNAŃ,11.666666666666666,2024-10-13 14:41:48


In [None]:
df_of_cities = smog_df.select("city")

**Count of cities**

In [None]:
df_of_cities.count()

1320

In [None]:
df_of_cities.distinct().count()

989

In [None]:
df_of_cities.groupBy("city").count().orderBy("count", ascending=False)

city,count
ZABRZE,19
RYBNIK,18
CZĘSTOCHOWA,12
GLIWICE,11
WAŁBRZYCH,10
RZESZÓW,8
CHEŁM,8
ZAWIERCIE,8
KRAKÓW-PODGÓRZE,7
BIELSKO-BIAŁA,7


In [None]:
df_cities_count_5 = df_of_cities.groupBy("city").count().orderBy("count", ascending=False).limit(5)

### UDF (User defined functions)
- PySpark User-Defined Functions (UDFs) help you convert your python code into a scalable version. UDFs are less performanced compare it with pyspark functions.  

- When UDF is running in PySpark, each executor creates a python process. Data will be serialised and deserialised between each executor and python process.

- This leads to lots of performance impact and overhead on spark jobs, making it less efficent. The Python worker consumes huge off-heap memory and so it often leads to memoryOverhead.


**Register a function as UDF**

```python
def squared(x):
  return x * x

spark.udf.register(squaredWithPython, squared)
```  

```python
def squared_typed(x):
  return x * x
spark.udf.register("squaredWithPython", squared_typed, LongType())
```  

```python
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(x):
  return x * x
```


In [None]:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(x):
  return x * x

df_cities_count_5.select("city", "count", squared_udf("count").alias("count squared"))

city,count,count squared
ZABRZE,19,361
RYBNIK,18,324
CZĘSTOCHOWA,12,144
GLIWICE,11,121
WAŁBRZYCH,10,100


In [None]:
from pyspark.sql.types import LongType, IntegerType
from pyspark.sql.functions import udf

def identity(x):
  return x

spark.udf.register("squared_typed", identity)

In [None]:
df_cities_count_5.select("city", "count", identity("count"))

city,count,count.1
ZABRZE,19,19
RYBNIK,18,18
CZĘSTOCHOWA,12,12
GLIWICE,11,11
WAŁBRZYCH,10,10


**List all of udfs**  
In PySpark, once a UDF is registered, you can't directly retrieve or display its code. This is because the UDF is serialized and sent to the executors, but the original Python function used to create it isn't stored within the Spark environment. Therefore, there is no built-in functionality to display the code of a UDF after it's registered.



In [None]:
functions = spark.catalog.listFunctions()
for fun in functions:
  if str(fun.className).startswith("org.apache.spark.sql.UDFRegistration"):
    print(fun)

Function(name='sqrtpy', catalog=None, namespace=None, description='N/A.', className='org.apache.spark.sql.UDFRegistration$$Lambda$3563/1236521595', isTemporary=True)
Function(name='squared_typed', catalog=None, namespace=None, description='N/A.', className='org.apache.spark.sql.UDFRegistration$$Lambda$3563/1236521595', isTemporary=True)


**Round float UDF**

In [None]:
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import udf

def round_float_down(x: float) -> FloatType:
  return round(x,2)

round_float_down_udf = udf(round_float_down, FloatType())

In [None]:
smog_df = smog_df \
.withColumn("humidity_avg", round_float_down_udf("humidity_avg")) \
.withColumn("pm10_avg", round_float_down_udf("pm10_avg")) \
.withColumn("pm25_avg", round_float_down_udf("pm25_avg")) \
.withColumn("pressure_avg", round_float_down_udf("pressure_avg")) \
.withColumn("temperature_avg", round_float_down_udf("temperature_avg"))

smog_df

city,humidity_avg,latitude,longitude,name,pm10_avg,pm25_avg,post_code,pressure_avg,street,temperature_avg,timestamp
KRASZEWICE,91.98,51.51563,18.22403,SZKOŁA PODSTAWOWA...,3.99,2.32,63-522,992.95,UL. SZKOLNA,11.68,2024-10-13 15:06:28
WRZĄSOWICE,69.7,49.96103,19.94282,SZKOŁA PODSTAWOWA...,2.2,2.2,32-040,970.17,UL. SZKOLNA,14.78,2024-10-13 15:06:28
STRZELCE OPOLSKIE,99.9,50.503431,18.314889,PUBLICZNA SZKOŁA ...,1.0,0.54,47-100,1023.93,UL. WAWRZYŃCA ŚWI...,6.9,2024-10-13 15:06:28
PSZCZYNA,91.67,49.965883,18.9457058,ZESPÓŁ SZKÓŁ NR 1...,5.66,3.51,43-200,978.3,UL. KAZIMIERZA WI...,12.57,2024-10-13 15:06:28
JANKÓW PRZYGODZKI,99.7,51.596172,17.7889069,ZESPÓŁ SZKÓŁ IM. ...,13.8,7.68,63-421,991.33,SZKOLNA,10.1,2024-10-13 15:06:28
ĆWIKLICE,99.7,49.971937,18.989839,SZKOŁA PODSTAWOWA...,6.77,3.71,43-229,978.9,UL. MĘCZENNIKÓW O...,12.5,2024-10-13 15:06:28
STUDZIONKA,99.2,49.9603559,18.774985,SZKOŁA PODSTAWOWA...,8.21,4.57,43-245,981.55,UL. JORDANA,11.97,2024-10-13 15:06:28
PIASEK,99.9,50.00955,18.94634,ZESPÓŁ SZKOLNO-PR...,0.03,0.02,43-211,986.27,SZKOLNA,12.43,2024-10-13 15:06:28
ŁĄKA,91.89,49.9582444,18.906757,ZESPÓŁ SZKOLNO-PR...,5.6,3.3,43-241,983.74,FITELBERGA,12.37,2024-10-13 15:06:28
LUBOŃ,99.13,52.3481,16.8968,SZKOŁA PODSTAWOWA...,1.57,0.87,62-030,989.13,ARMII POZNAŃ,11.67,2024-10-13 15:06:28


In [None]:
df.where(df.pm10_avg>45)

In [None]:
df.agg({"pm10_avg":'avg'})

In [None]:
df_agg = df.groupBy("city").agg({"pm10_avg":'avg',"city":'count'}).orderBy("count(city)",ascending=False)

In [None]:
df.filter(df.city=="POZNAŃ").agg({"city":'count'})

TempView from dataframe

In [None]:
df_agg_round.createOrReplaceTempView("df_agg_round")

In [None]:
spark.sql("SELECT * FROM df_agg_round")