## Data Loading and Transformation
##### Pull data from the Google Cloud Storage and process the with a Spark Cluster

In [3]:
## Import pyspark and sql functions
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType

In [4]:
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[4]").appName("air_quality_pyspark").getOrCreate()

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('air_quality_pyspark') \
  .getOrCreate()


#Read all files once off and concatenate them
df = spark.read.option("delimiter", ",").option("header", True).csv("gs://air_quality_cyprus/Data_Hourly_*.csv")
df.show()

+----------------+------------+------------+------------------+
|       date_time|station_code|pollutant_id|   pollutant_value|
+----------------+------------+------------+------------------+
| 01/01/2021 0:00|           1|          45| 8.652142999999999|
| 01/01/2021 1:00|           1|          45| 7.464646000000001|
| 01/01/2021 2:00|           1|          45|          6.127521|
| 01/01/2021 3:00|           1|          45|          4.535807|
| 01/01/2021 4:00|           1|          45|          2.779923|
| 01/01/2021 5:00|           1|          45|1.7017259999999998|
| 01/01/2021 6:00|           1|          45|          1.010888|
| 01/01/2021 7:00|           1|          45|          1.096671|
| 01/01/2021 8:00|           1|          45|          1.938319|
| 01/01/2021 9:00|           1|          45|1.8207490000000002|
|01/01/2021 10:00|           1|          45|          1.624287|
|01/01/2021 11:00|           1|          45|           1.51736|
|01/01/2021 12:00|           1|         

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType

df = df.withColumn("date_time",to_timestamp(df.date_time,'dd/MM/yyyy HH:mm'))
df = df.withColumn("date",date_format(df.date_time,'yyyy-MM-dd'))
df = df.withColumn("pollutant_value",df.pollutant_value.cast(DoubleType()))
df_aggr_daily=  df.groupby(['date','station_code','pollutant_id']).mean("pollutant_value").withColumnRenamed("avg(pollutant_value)","mean_pollutant_value")

df_aggr_daily.show(truncate=False) 

+----------+------------+------------+--------------------+
|date      |station_code|pollutant_id|mean_pollutant_value|
+----------+------------+------------+--------------------+
|2016-12-30|1           |45          |1.7874999999999999  |
|2016-04-11|5           |45          |1.0142857142857145  |
|2016-09-15|5           |45          |1.1458333333333333  |
|2016-10-17|5           |45          |1.2291666666666667  |
|2016-02-04|9           |45          |0.30000000000000004 |
|2016-04-07|9           |45          |0.375               |
|2016-07-17|9           |45          |0.5416666666666664  |
|2016-04-25|1           |6           |313.5629166666667   |
|2016-05-13|1           |6           |435.95583333333326  |
|2016-08-10|2           |6           |103.41526315789476  |
|2016-05-12|3           |6           |465.4741666666666   |
|2016-08-27|3           |6           |404.93875           |
|2016-11-20|5           |6           |405.17347826086956  |
|2016-01-06|15          |6           |34

In [7]:
#Check if we have all data 
df_aggr_daily = df_aggr_daily.withColumn("year",date_format(df.date,'yyyy'))
df_aggr_daily.groupby(['year']).count().orderBy(['year']).show()

+----+-----+
|year|count|
+----+-----+
|2010|16790|
|2011|18615|
|2012|18666|
|2013|18615|
|2014|18615|
|2015|18615|
|2016|21228|
|2017|21535|
|2018|21900|
|2019|21535|
|2020|21228|
|2021|21535|
+----+-----+



In [8]:
df_stations = spark.read.option("delimiter", ",").option("header", True).csv("gs://air_quality_cyprus/AirQualityMonitoringStations.csv")
df_stations = df_stations.select(col("station_code"),col("station_name_en")) #,col("latitude"),col("longitude")
df_stations = df_stations.withColumnRenamed('station_code','station_code_pk')
df_stations.show()

+---------------+--------------------+
|station_code_pk|     station_name_en|
+---------------+--------------------+
|              1|Nicosia -Traffic ...|
|              2|Nicosia - Residen...|
|              3|Limassol -Traffic...|
|              5|Larnaka - Traffic...|
|              8|Zygi -Industrial ...|
|              9|EMEP- Ayia Marina...|
|             14|Mari - Industrial...|
|             15|Pafos - Traffic S...|
|             16|Paralimni - Traff...|
+---------------+--------------------+



In [9]:
df_Pollutants = spark.read.option("delimiter", ",").option("header", True).csv("gs://air_quality_cyprus/PollutantsId.csv")
df_Pollutants = df_Pollutants.select(col("pollutant_id"),col("pollutant_code"),col("pollutant_name_en"),col("Unit_of_measurement_en"))
df_Pollutants = df_Pollutants.withColumnRenamed('pollutant_id','pollutant_id_pk')
# Unit_of_measurement_en:μg/m³
df_Pollutants.show()

+---------------+--------------+--------------------+----------------------+
|pollutant_id_pk|pollutant_code|   pollutant_name_en|Unit_of_measurement_en|
+---------------+--------------+--------------------+----------------------+
|              1|            NO|      Nitrogen Oxide|                 μg/m³|
|              2|           NO2|    Nitrogen Dioxide|                 μg/m³|
|              3|           NOX|     Nitrogen Oxides|                 μg/m³|
|              4|           SO2|      Sulfur Dioxide|                 μg/m³|
|              5|            O3|              Ozone |                 μg/m³|
|              6|            CO|     Carbon Monoxide|                 μg/m³|
|             25|          PM10|Particulate Matte...|                 μg/m³|
|             26|         PM2.5|Particulate Matte...|                 μg/m³|
|             45|          C6H6|             Benzene|                 μg/m³|
+---------------+--------------+--------------------+----------------------+

In [10]:
#Join on aggr data
df_aggr_daily = df_aggr_daily.join(df_stations,df_aggr_daily.station_code == df_stations.station_code_pk, how="inner")
df_aggr_daily = df_aggr_daily.join(df_Pollutants,df_aggr_daily.pollutant_id == df_Pollutants.pollutant_id_pk, how="inner")
df_aggr_daily = df_aggr_daily.drop("station_code_pk").drop("pollutant_id_pk")
df_aggr_daily.show()

+----------+------------+------------+--------------------+----+--------------------+--------------+-----------------+----------------------+
|      date|station_code|pollutant_id|mean_pollutant_value|year|     station_name_en|pollutant_code|pollutant_name_en|Unit_of_measurement_en|
+----------+------------+------------+--------------------+----+--------------------+--------------+-----------------+----------------------+
|2021-08-10|           1|          45|  0.4428034583333334|2021|Nicosia -Traffic ...|          C6H6|          Benzene|                 μg/m³|
|2021-01-16|           5|          45|  0.6404166666666667|2021|Larnaka - Traffic...|          C6H6|          Benzene|                 μg/m³|
|2021-05-20|           5|          45|  0.5770833333333335|2021|Larnaka - Traffic...|          C6H6|          Benzene|                 μg/m³|
|2021-05-28|           8|          45| 0.22624999999999995|2021|Zygi -Industrial ...|          C6H6|          Benzene|                 μg/m³|
|2021-

In [11]:
df_aggr_daily

DataFrame[date: string, station_code: string, pollutant_id: string, mean_pollutant_value: double, year: string, station_name_en: string, pollutant_code: string, pollutant_name_en: string, Unit_of_measurement_en: string]

In [12]:
df_aggr_daily.dtypes

[('date', 'string'),
 ('station_code', 'string'),
 ('pollutant_id', 'string'),
 ('mean_pollutant_value', 'double'),
 ('year', 'string'),
 ('station_name_en', 'string'),
 ('pollutant_code', 'string'),
 ('pollutant_name_en', 'string'),
 ('Unit_of_measurement_en', 'string')]

In [13]:
#StoreData
df_aggr_daily.write.option("delimiter", ",")\
 .option("quote", "\"").option("header", True)\
 .mode('overwrite')\
 .csv('gs://air_quality_cyprus/Data_Daily_All')

## Load the data automatically to the bigquery

In [None]:
# Saving the data to BigQuery
temp_bucket = 'staging.comp548dl-big-data.appspot.com'
table = "comp548dl-big-data.air_quality_cyprus.daily_data"
df_aggr_daily.write.format("bigquery").mode('overwrite').option("temporaryGcsBucket",temp_bucket).option("table",table).save()

In [15]:
%%bigquery
SELECT *
FROM comp548dl-big-data.air_quality_cyprus.daily_data
LIMIT 10

Unnamed: 0,date,station_code,pollutant_id,mean_pollutant_value,station_name_en,latitude,longitude,pollutant_code,pollutant_name_en,Unit_of_measurement_en
0,2021-02-05,3,1,22.84375,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
1,2020-07-04,3,1,5.679167,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
2,2019-12-02,3,1,27.652174,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
3,2020-10-03,3,1,5.141667,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
4,2019-04-22,3,1,7.8125,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
5,2019-10-29,3,1,14.391304,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
6,2020-09-07,3,1,5.133333,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
7,2019-05-06,3,1,5.730435,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
8,2021-08-11,3,1,3.443043,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³
9,2019-11-03,3,1,8.486957,Limassol -Traffic Station,"""34°41'10.0""""N""","""33°02'08.0""""E""",NO,Nitrogen Oxide,μg/m³


In [None]:
df_readback = spark.read.option("delimiter", ",").option("header", True).csv("gs://air_quality_cyprus/Data_Daily_All")
df_readback 

## Store DataFrame as Parquet files and do some analysis

In [17]:
# DataFrames can be saved as Parquet files, maintaining the schema information.
df_aggr_daily.write.mode('overwrite').parquet("data.parquet")

In [18]:
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("data.parquet")

In [19]:
parquetFile.printSchema()

root
 |-- date: string (nullable = true)
 |-- station_code: string (nullable = true)
 |-- pollutant_id: string (nullable = true)
 |-- mean_pollutant_value: double (nullable = true)
 |-- year: string (nullable = true)
 |-- station_name_en: string (nullable = true)
 |-- pollutant_code: string (nullable = true)
 |-- pollutant_name_en: string (nullable = true)
 |-- Unit_of_measurement_en: string (nullable = true)



In [23]:
# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
station_values = spark.sql("SELECT year,station_code, avg(mean_pollutant_value)  FROM parquetFile Group by year,station_code order by year")
station_values.show()

+----+------------+-------------------------+
|year|station_code|avg(mean_pollutant_value)|
+----+------------+-------------------------+
|2010|           5|       57.098510283437506|
|2010|           8|       20.818438514503118|
|2010|           2|        72.64821297683149|
|2010|           1|        111.6998372319318|
|2010|          15|        84.55288676179796|
|2010|           9|        34.73504179480461|
|2010|           3|       107.90771037529214|
|2011|           2|        72.57172604414717|
|2011|           8|       19.004024489935126|
|2011|           9|        37.91321579552225|
|2011|           3|        110.6013316437241|
|2011|           5|        71.79668140528639|
|2011|          14|        25.19243591992164|
|2011|           1|        120.8479958273369|
|2011|          15|        58.94961482733682|
|2012|           2|        76.64388511848807|
|2012|           1|       111.21431963708024|
|2012|           8|       19.890689973680825|
|2012|          14|        27.3121

### Load to Pandas for flexility and play around

In [24]:
pd_df_aggr_daily  = df_aggr_daily.toPandas()

In [25]:
pd_df_aggr_daily[pd_df_aggr_daily.isna()==True].count()

date                      0
station_code              0
pollutant_id              0
mean_pollutant_value      0
year                      0
station_name_en           0
pollutant_code            0
pollutant_name_en         0
Unit_of_measurement_en    0
dtype: int64

In [27]:
pd_df_aggr_daily.to_csv("Data_Daily_All.csv", header=True)

In [None]:
#https://spark.apache.org/docs/latest/sql-data-sources-csv.html
#https://sparkbyexamples.com/pyspark/pyspark-convert-array-column-to-string-column/