# Data Processing with Spark

This lab class aims to introduce basic PySpark operations to process data that was stored in csv
or json files.



## Task A: Data ingestion

### 1.
**Datasests**

The two files can be downloaded from, respectively 

https://bigdata.iscte-iul.eu/datasets/iot-devices.json

https://bigdata.iscte-iul.eu/datasets/fire-calls.csv

### 2.

In [None]:
# Basic imports
import pyspark
from pyspark.sql import SparkSession, Row
import pyspark.sql.functions as F

In [None]:
# Build SparkSession
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

In [None]:
# Reading data
data_dir = '../datasets/'
file_iot = data_dir + 'iot_devices.json'
file_calls = data_dir + 'sf-fire-calls.csv'

data_iot =  spark.read.json(file_iot)
data_calls = spark.read.csv(file_calls, header=True, inferSchema=True)


### 3. 
**Checking basic structures and data types**


In [None]:
# Checking iot datatypes: use printSchema()
data_iot.


In [None]:
# Checking iot rows and columns: use show()
data_iot.

In [None]:
# Checking iot number of rows: use count()
data_iot.

In [None]:
# Checking calls data datatypes: use printSchema()
data_calls.

In [None]:
# Checking calls rows and columns: use show()
data_calls.

In [None]:
# Checking iot number of rows: use count()
data_calls.

### 4. 

In [None]:
# store the columns of iot and calls: use columns attribute
col_iot = data_iot.
col_calls = data_calls.

In [None]:
# print the number of columns and their names
print(len(col_iot), col_iot)
print(len(col_calls), col_calls)

**Expected result**
```raw
15 ['battery_level', 'c02_level', 'cca2', 'cca3', 'cn', 'device_id', 'device_name', 'humidity', 'ip', 'latitude', 'lcd', 'longitude', 'scale', 'temp', 'timestamp']
28 ['CallNumber', 'UnitID', 'IncidentNumber', 'CallType', 'CallDate', 'WatchDate', 'CallFinalDisposition', 'AvailableDtTm', 'Address', 'City', 'Zipcode', 'Battalion', 'StationArea', 'Box', 'OriginalPriority', 'Priority', 'FinalPriority', 'ALSUnit', 'CallTypeGroup', 'NumAlarms', 'UnitType', 'UnitSequenceInCallDispatch', 'FirePreventionDistrict', 'SupervisorDistrict', 'Neighborhood', 'Location', 'RowID', 'Delay']
```

In [None]:
# get the first row: use first()
first_row = data_iot.
first_row

### 5.

In [None]:
# get the first five rows
five_rows = data_iot.

## Task B: Data cleaning

### 1. Removing duplicates if any

In [None]:
# Check first: use dropDuplicates() and count()

print(f'  data_iot: number of rows: {data_iot. }, after dropduplicates: {data_iot. }')
print(f'data_calls: number of rows: {data_calls. }, after dropduplicates: {data_calls. }')



In [None]:
# Remove data_iot duplicates
data_iot = data_iot.
data_iot.count()

In [None]:
# Remove data_calls duplicates
data_calls = data_calls.
data_calls.count()

### 2. Handling missing values

In [None]:
# use dropna(how='any')
# how='all' remove only if all values are NA
print(f"After dropna: data_iot = {data_iot.  } rows")
print(f"After dropna: data_calls = {data_calls.  } rows")

In [None]:
# Lets see where the NULL values are
dict_nulls_calls = {col: data_calls.filter(data_calls[col].isNull()).count() for col in data_calls.columns}
dict_nulls_calls

In [None]:
# Lets see where the NULL values are
dict_nulls_iot = {col: data_iot.filter(data_iot[col].isNull()).count() for col in data_iot.columns}
dict_nulls_iot

### 3. removal of columns 'AvailableDtTm', 'OriginalPriority', 'CallTypeGroup'

In [None]:
# removal of columns 'AvailableDtTm', 'OriginalPriority', 'CallTypeGroup'
cols_to_dismiss = ['AvailableDtTm', 'OriginalPriority', 'CallTypeGroup']

# use drop() and reference the list with *
data_calls = data_calls.

### 4.

In [None]:
# check the number of rows before and after removing NULLs
print(f"after droping columns data_calls = {data_calls.} rows, after removing NULLs: {data_calls.} rows")


### 5.

In [None]:
data_calls = data_calls.
data_calls.count()

**Note** We could have use fill in, instead of dropping the rows:

`df_calls.na.fill({'colulna: valueA})`

### 6.

In [None]:
# Show one record in vertical mode for better reading
data_calls.show(1,vertical=True)

In [None]:
data_calls.printSchema()

In [None]:
# transforming datatypes:
# create two new columns 'CallDateTS' and 'WatchDateTS' with withColumn and F.to_timestamp()
data_calls = ( data_calls
              .
              .
)

In [None]:
data_calls.show()

## Task C: Data transformation

### 1. 

In [None]:
# adding two new columns 'CallDateMonth' and 'CallDateWeekDay' and use F.month and F.dayofweek on 'CallDateTS'

data_calls = ( data_calls
              .
              .
)

In [None]:
data_calls.show()

### 2.

In [None]:
# change name of column NumAlarms to NumberAlarms with withColumnRenamed
data_calls = data_calls.

In [None]:
data_calls.show()

### 3.

In [None]:
# use sort() with F.col().desc() on 'CallNumber'
data_calls = data_calls.

In [None]:
data_calls.show()

## Task D: Data aggregation

### 1. Basic aggregation

Finds min, max, count, sum, mean

In [None]:
# use agg() either with {'column name', 'function'} or F.avg() F.min() F.max() and F.sum()
data_calls.
data_calls.
data_calls.
data_calls.
data_calls.

### 2.

In [None]:
# use describe()
data_calls.describe().show()

### 3. Grouped aggregation

In [None]:
# use grouBy() on 'CallType' and agg() on 'CallDateMonth', 'CallDateWeekDay' and 'Delay'
# show at most 100 rows with .show(100, truncate=False)
data_calls.

**Expected result**
```raw
+--------------------------------------------+------------------+------------------+--------------------+
|CallType                                    |avg(CallDateMonth)|avg(Delay)        |avg(CallDateWeekDay)|
+--------------------------------------------+------------------+------------------+--------------------+
|Elevator / Escalator Rescue                 |6.208121827411167 |4.435448395939086 |4.355329949238579   |
|Alarms                                      |6.555663587749149 |3.8584001800838594|4.0523821098687405  |
|Odor (Strange / Unknown)                    |6.303571428571429 |5.943005935714285 |4.151785714285714   |
|Citizen Assist / Service Call               |6.599781897491821 |5.74209378691385  |3.911668484187568   |
|HazMat                                      |6.023255813953488 |6.879844916279069 |4.6976744186046515  |
|Watercraft in Distress                      |8.5               |7.8857144         |4.928571428571429   |
|Explosion                                   |7.0               |4.646111146666666 |3.966666666666667   |
|Vehicle Fire                                |6.264285714285714 |3.8897619183928587|4.2178571428571425  |
|Other                                       |6.496359223300971 |6.528357611383495 |3.936893203883495   |
|Outside Fire                                |6.282632146709816 |4.508881697572816 |3.9773462783171523  |
|Traffic Collision                           |6.532761788120024 |4.007960809130436 |4.063380281690141   |
|Assist Police                               |4.363636363636363 |10.64545460909091 |4.181818181818182   |
|Gas Leak (Natural and LP Gases)             |6.343832020997375 |4.363604546456693 |3.979002624671916   |
|Water Rescue                                |6.106024096385542 |5.973895580722891 |4.125301204819277   |
|Electrical Hazard                           |6.377777777777778 |4.3134814795555565|3.8266666666666667  |
|Structure Fire                              |6.566578316294333 |3.5555240030039124|4.0411460305439855  |
|Industrial Accidents                        |6.076923076923077 |4.028205084615385 |3.0                 |
|Medical Incident                            |6.451672010994045 |3.9335766504404885|4.025931605887391   |
|Fuel Spill                                  |6.585714285714285 |6.202619058571427 |4.4                 |
|Smoke Investigation (Outside)               |6.694267515923567 |4.766348174267515 |3.751592356687898   |
|Train / Rail Incident                       |4.7368421052631575|32.2991227368421  |2.526315789473684   |
|Marine Fire                                 |8.6               |7.9033333400000005|4.4                 |
|Aircraft Emergency                          |11.0              |7.7               |1.0                 |
|Confined Space / Structure Collapse         |7.875             |6.08541665        |4.0                 |
|Suspicious Package                          |5.75              |11.045833499999999|3.25                |
|Extrication / Entrapped (Machinery, Vehicle)|6.7               |5.03500002        |4.0                 |
|High Angle Rescue                           |6.714285714285714 |7.1488095285714275|4.142857142857143   |
|Mutual Aid / Assist Outside Agency          |8.0               |7.35              |5.0                 |
+--------------------------------------------+------------------+------------------+--------------------+
```

### 4.

In [None]:
# use orderBy() on column 'avg(Delay)' and with option ascending=False
data_calls.

**Expected result**
```raw
+--------------------------------------------+------------------+------------------+--------------------+
|CallType                                    |avg(CallDateMonth)|avg(Delay)        |avg(CallDateWeekDay)|
+--------------------------------------------+------------------+------------------+--------------------+
|Assist Police                               |6.314285714285714 |26.981903994285716|4.0285714285714285  |
|Train / Rail Incident                       |5.589285714285714 |16.71249997857143 |3.25                |
|Administrative                              |5.0               |12.261111333333332|5.333333333333333   |
|HazMat                                      |7.32258064516129  |7.527016126612903 |4.266129032258065   |
|Marine Fire                                 |8.333333333333334 |7.131944325       |4.916666666666667   |
|Confined Space / Structure Collapse         |7.538461538461538 |6.915384576923078 |3.923076923076923   |
|Watercraft in Distress                      |7.142857142857143 |6.886904817857143 |4.464285714285714   |
|Suspicious Package                          |6.0               |6.57666672        |4.133333333333334   |
|High Angle Rescue                           |6.387096774193548 |5.76075270967742  |4.225806451612903   |
|Aircraft Emergency                          |10.0              |5.688888933333334 |4.0                 |
|Other                                       |6.682915506035283 |5.517548746420615 |3.9600742804085423  |
|Water Rescue                                |6.313513513513514 |5.50810810354054  |4.132432432432433   |
|Fuel Spill                                  |6.357512953367876 |5.492227982383419 |4.2642487046632125  |
|Citizen Assist / Service Call               |6.678174603174603 |5.470502643908732 |3.940873015873016   |
|Mutual Aid / Assist Outside Agency          |6.625             |5.3583333500000006|4.75                |
|Electrical Hazard                           |6.66390041493776  |5.178112038174276 |3.979253112033195   |
|Industrial Accidents                        |6.0638297872340425|5.0147163340425545|4.095744680851064   |
|Oil Spill                                   |6.428571428571429 |4.977777761904762 |4.761904761904762   |
|Odor (Strange / Unknown)                    |6.642857142857143 |4.947959182000001 |4.09795918367347    |
|Gas Leak (Natural and LP Gases)             |6.4057591623036645|4.583398778403142 |3.9960732984293195  |
|Smoke Investigation (Outside)               |6.859335038363171 |4.466069897851662 |3.959079283887468   |
|Extrication / Entrapped (Machinery, Vehicle)|6.464285714285714 |4.391666678571428 |4.107142857142857   |
|Elevator / Escalator Rescue                 |6.448123620309051 |4.3378219334878585|4.260485651214128   |
|Outside Fire                                |6.288396726047184 |4.173599744978333 |3.929224843524314   |
|Explosion                                   |6.863636363636363 |4.105681829545454 |3.8068181818181817  |
|Vehicle Fire                                |6.287735849056604 |3.9025550274174527|4.070754716981132   |
|Medical Incident                            |6.523214553010892 |3.875683200596809 |4.027577462556462   |
|Traffic Collision                           |6.627349691490888 |3.783885780595212 |4.116802984646291   |
|Structure Fire                              |6.584228620541469 |3.597781836202839 |4.061667382896434   |
|Alarms                                      |6.654925219185147 |3.542101600391443 |4.046518824136153   |
+--------------------------------------------+------------------+------------------+--------------------+
```

### 5.

In [None]:
# groupby on 'CallType' and 'Neighborhood'
# aggregate on count(CallNumber) with alias CountCalls, avg(Delay) with alias AvgDelay,
# min(Delay) with alias MinDelay, and max(Delay) with alias MaxDelay
# in descending order by CallType and then Neighborhood.
( data_calls
    .groupBy()
    .agg(
        ,
        ,
        ,
        ,
         )
    .orderBy()
    .show(truncate=False)
)


**Expected result**
```raw
+----------------------+------------------------------+----------+------------------+----------+---------+
|CallType              |Neighborhood                  |CountCalls|AvgDelay          |MinDelay  |MaxDelay |
+----------------------+------------------------------+----------+------------------+----------+---------+
|Watercraft in Distress|Treasure Island               |2         |7.441666850000001 |6.2166667 |8.666667 |
|Watercraft in Distress|Tenderloin                    |2         |6.3416666500000005|6.2833333 |6.4      |
|Watercraft in Distress|Sunset/Parkside               |1         |5.95              |5.95      |5.95     |
|Watercraft in Distress|Russian Hill                  |2         |6.325             |3.9       |8.75     |
|Watercraft in Distress|Presidio                      |1         |7.9               |7.9       |7.9      |
|Watercraft in Distress|North Beach                   |5         |6.613333259999999 |4.116667  |9.933333 |
|Watercraft in Distress|None                          |2         |6.9833335000000005|6.2       |7.766667 |
|Watercraft in Distress|Marina                        |4         |4.7708333750000005|1.95      |5.8333335|
|Watercraft in Distress|Lakeshore                     |1         |5.1666665         |5.1666665 |5.1666665|
|Watercraft in Distress|Financial District/South Beach|3         |5.1222222         |3.9       |6.8166666|
|Watercraft in Distress|Bayview Hunters Point         |5         |10.423333600000001|2.7       |16.333334|
|Water Rescue          |Treasure Island               |23        |7.3963767521739125|1.8833333 |24.666666|
|Water Rescue          |Tenderloin                    |71        |4.550704191830987 |0.48333332|14.866667|
|Water Rescue          |Sunset/Parkside               |49        |4.9874149367346945|1.8666667 |10.966666|
|Water Rescue          |Seacliff                      |10        |8.97166663        |1.1333333 |35.483334|
|Water Rescue          |Russian Hill                  |28        |5.245238189285714 |1.1833333 |11.783334|
|Water Rescue          |Presidio                      |98        |6.172278876530611 |1.3       |38.05    |
|Water Rescue          |Potrero Hill                  |6         |5.3500000666666665|3.7166667 |7.2166667|
|Water Rescue          |Outer Richmond                |100       |4.689333335       |1.45      |14.6     |
|Water Rescue          |North Beach                   |39        |5.96367531025641  |1.2166667 |20.45    |
+----------------------+------------------------------+----------+------------------+----------+---------+
only showing top 20 rows
````

### 6. Graphics showing outcome of data aggregation so we can better understand

In [None]:
# pip install plotly
# https://plotly.com/python/

!pip install plotly

In [None]:
import plotly
plotly.__version__

In [None]:

import pandas as pd
import plotly.express as px


In [None]:
data_plot = ( data_calls
    .groupBy('CallType')
    .count()
    .orderBy('count', ascending=False)
)

data_plot.show(truncate=False)

In [None]:
fig = px.bar(data_plot.toPandas(),x='CallType',y='count')
fig.show()