## Import Library

In [1]:
# %%capture
# !pip install pyspark

In [13]:
# %%capture
# !pip install gcsfs

In [2]:
# from google.colab import drive
# drive.mount('/content/drive')

Mounted at /content/drive


In [63]:
import os
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import when

In [4]:
# root = "/content/drive/MyDrive/DS525_Project"

**Create Variables for Credentials**

In [2]:
SERVICE_ACCOUNT_EMAIL = "jarnjung-ds525-capstone@ds525-capstone.iam.gserviceaccount.com"
KEYFILE_PATH = "credentials/ds525-capstone-f36f0f0e6744.json"
KEYFILE_PATH

'credentials/ds525-capstone-f36f0f0e6744.json'

**Create variables for Directories**

In [83]:
bucket = "gs://ds525-capstone-test-49"
landing_zone = f"{bucket}/data_raw_air"
cleaned_zone_csv = f"{bucket}/cleaned_csv"
cleaned_zone_parquet = f"{bucket}/cleaned_parquet"
cleaned_zone_station = f"{bucket}/cleaned_station"
data_dir = f"{bucket}/data"

**Set OS environment variable with credentials key**

In [4]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = KEYFILE_PATH

## Spark Config

In [5]:
conf = SparkConf()
conf.set("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar")

<pyspark.conf.SparkConf at 0x7ff2c269ff90>

In [6]:
conf.set("spark.sql.repl.eagerEval.enabled", True)

<pyspark.conf.SparkConf at 0x7ff2c269ff90>

In [7]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

## Spark for Data Processing

### Test Connection to Google Cloud Service
by read a json file in some bucket

In [8]:
print(f"{landing_zone}")

gs://ds525-capstone-test-49/data_raw_air


In [18]:
#Read JSON data from GCS
# df1 = pd.read_json(f"{landing_zone}/data_air_2024_04_03_0200.json")

In [None]:
df1.head()

### Use Spark to read all files in 'landing_zone' Directory

In [10]:
data = spark.read.option("multiline", "true").json(f"{landing_zone}")

In [11]:
# Explode the 'stations' array to convert it into individual rows
exploded_data = data.selectExpr("explode(stations) as station")

In [12]:
exploded_data.show()

+--------------------+
|             station|
+--------------------+
|{{{122, 4, PM25},...|
|{{{113, 4, PM25},...|
|{{{125, 4, PM25},...|
|{{{110, 4, PM25},...|
|{{{104, 4, PM25},...|
|{{{123, 4, PM25},...|
|{{{119, 4, PM25},...|
|{{{109, 4, PM25},...|
|{{{137, 4, PM25},...|
|{{{96, 3, PM25}, ...|
|{{{92, 3, PM25}, ...|
|{{{-1, 0, PM25}, ...|
|{{{114, 4, PM25},...|
|{{{-1, 0, PM25}, ...|
|{{{-1, 0, PM25}, ...|
|{{{112, 4, PM25},...|
|{{{-1, 0, PM25}, ...|
|{{{-1, 0, PM25}, ...|
|{{{112, 4, PM25},...|
|{{{-1, 0, PM25}, ...|
+--------------------+
only showing top 20 rows



In [19]:
exploded_data.printSchema()

root
 |-- station: struct (nullable = true)
 |    |-- AQILast: struct (nullable = true)
 |    |    |-- AQI: struct (nullable = true)
 |    |    |    |-- aqi: string (nullable = true)
 |    |    |    |-- color_id: string (nullable = true)
 |    |    |    |-- param: string (nullable = true)
 |    |    |-- CO: struct (nullable = true)
 |    |    |    |-- aqi: string (nullable = true)
 |    |    |    |-- color_id: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |    |-- NO2: struct (nullable = true)
 |    |    |    |-- aqi: string (nullable = true)
 |    |    |    |-- color_id: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |    |-- O3: struct (nullable = true)
 |    |    |    |-- aqi: string (nullable = true)
 |    |    |    |-- color_id: string (nullable = true)
 |    |    |    |-- value: string (nullable = true)
 |    |    |-- PM10: struct (nullable = true)
 |    |    |    |-- aqi: string (nullable = true)
 |    |   

In [54]:
exploded_data.select('station.stationID', 'station.AQILast.AQI.aqi').show()

+---------+---+
|stationID|aqi|
+---------+---+
|      02t|122|
|      03t|113|
|      05t|125|
|      11t|110|
|      12t|104|
|      50t|123|
|      52t|119|
|      53t|109|
|      54t|137|
|      59t| 96|
|      61t| 92|
|  bkp100t| -1|
|  bkp101t|114|
|  bkp102t| -1|
|  bkp103t| -1|
|  bkp104t|112|
|  bkp105t| -1|
|  bkp110t| -1|
|  bkp112t|112|
|  bkp114t| -1|
+---------+---+
only showing top 20 rows



In [20]:
exploded_data.select('station.stationID', 'station.nameEN', 'station.lat', 'station.long', 'station.stationType').show()

+---------+--------------------+-----------+-----------+-----------+
|stationID|              nameEN|        lat|       long|stationType|
+---------+--------------------+-----------+-----------+-----------+
|      02t|Bansomdejchaopray...|  13.732846| 100.487662|     GROUND|
|      03t|Highway NO.3902 k...|  13.636514| 100.414262|     GROUND|
|      05t|Thai Meteorologic...|  13.666183| 100.605742|     GROUND|
|      11t|National Housing ...|   13.77553| 100.569195|     GROUND|
|      12t|Nonsi Witthaya Sc...|13.70806667|100.5473333|     GROUND|
|      50t|Chulalongkorn Hos...|  13.729852| 100.536501|     GROUND|
|      52t|Thonburi Power Su...|  13.727622| 100.486568|     GROUND|
|      53t|Chokchai Police S...| 13.7954248|100.5930298|     GROUND|
|      54t|National Housing ...|13.76251667|   100.5502|     GROUND|
|      59t|The Government Pu...|  13.783185| 100.540489|     GROUND|
|      61t|Bodindecha Sing S...|  13.769634| 100.614567|     GROUND|
|  bkp100t|Bueng Kum Distric...|  

### Create temporary Table for read Dataframe

In [79]:
exploded_data.createOrReplaceTempView("raw_data")

#### Create temporary Station Table for read Dataframe

In [80]:
# Select the desired columns
station_table = spark.sql("""
    select distinct
        station.stationID,
        station.nameTH,
        station.nameEN,
        station.areaTH,
        station.areaEN,
        station.stationType,
        station.lat,
        station.long
    from
        raw_data
""")

In [81]:
station_table.show()

+---------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-----------+
|stationID|              nameTH|              nameEN|              areaTH|              areaEN|stationType|        lat|       long|
+---------+--------------------+--------------------+--------------------+--------------------+-----------+-----------+-----------+
|   bkp75t|   สำนักงานเขตสายไหม|Sai Mai District ...|ริมถนนสุขาภิบาล5 ...|Sukhaphiban 5 Roa...|        BKK|  13.896035| 100.660628|
|  bkp115t|สวนวชิรเบญจทัศ ถน...|Wacchirabenchatat...|สวนวชิรเบญจทัศ เข...|Wacchirabenchatat...|        BKK| 13.8118516|100.5535825|
|     101t|ศาลากลางจังหวัดบุ...|   Buriram City Hall|ต.เสม็ด อ.เมือง, ...|Samed Subdistrict...|     GROUND|  14.945452| 103.105896|
|      16t|  โรงไฟฟ้าพระนครใต้ |South Bangkok Pow...|ต.บางโปรง อ.เมือง...|Bang Prong, Meuan...|     GROUND|     13.618|   100.5562|
|   bkp71t|  สำนักงานเขตสวนหลวง|Suan Luang Distri...|ริมถนนพัฒนาการ เข...| S

In [28]:
# Save the DataFrame as a CSV file
file_name = "station"
station_table.write.csv(file_name, header=True, mode='overwrite')

In [85]:
# Save the DataFrame as a CSV file
station_table.write.mode("overwrite").csv(cleaned_zone_station)

In [32]:
station_df = station_table

In [33]:
station_df

stationID,nameTH,nameEN,areaTH,areaEN,stationType,lat,long
bkp75t,สำนักงานเขตสายไหม,Sai Mai District ...,ริมถนนสุขาภิบาล5 ...,Sukhaphiban 5 Roa...,BKK,13.896035,100.660628
bkp115t,สวนวชิรเบญจทัศ ถน...,Wacchirabenchatat...,สวนวชิรเบญจทัศ เข...,Wacchirabenchatat...,BKK,13.8118516,100.5535825
101t,ศาลากลางจังหวัดบุ...,Buriram City Hall,"ต.เสม็ด อ.เมือง, ...",Samed Subdistrict...,GROUND,14.945452,103.105896
16t,โรงไฟฟ้าพระนครใต้,South Bangkok Pow...,ต.บางโปรง อ.เมือง...,"Bang Prong, Meuan...",GROUND,13.618,100.5562
bkp71t,สำนักงานเขตสวนหลวง,Suan Luang Distri...,ริมถนนพัฒนาการ เข...,"Suan Luang, Bangkok",BKK,13.731051,100.651705
50t,โรงพยาบาลจุฬาลงกรณ์,Chulalongkorn Hos...,ริมถนนพระราม 4 เข...,Rama IV Rd. Khet ...,GROUND,13.729852,100.536501
bkp59t,สำนักงานเขตราชเทว...,Ratchathewi Distr...,ริมถนนพญาไท เขตรา...,Phayathai Roadsid...,BKK,13.7590851,100.5345788
107t,สวนสาธารณะแก่งดอน...,KAENG DON KLANG PARK,ต.กาฬสินธุ์ อ.เมื...,Kalasin Subdistri...,GROUND,16.4436,103.513
bkp100t,สำนักงานเขตบึงกุ่ม,Bueng Kum Distric...,แขวงคลองกุ่ม เขตบ...,"Khlong Kum, Bueng...",BKK,13.785212,100.669183
67t,สำนักงานเทศบาลเมื...,Municipality Offi...,ต.ในเวียง อ.เมือง...,"Nai Wiang, Mueang...",GROUND,18.7888833,100.7763166


#### Create temporary Data Table for read Dataframe

In [71]:
exploded_data.createOrReplaceTempView("raw_data")

In [73]:
# Select the desired columns
data_table = spark.sql("""
    select
        station.stationID as station_id,
        station.AQILast.date as date,
        station.AQILast.time as time,
        station.AQILast.AQI.aqi as aqi_val, 
        station.AQILast.AQI.color_id as aqi_color_id, 
        station.AQILast.CO.value as co_val,
        station.AQILast.CO.color_id as co_color_id,
        station.AQILast.NO2.value as no2_val, 
        station.AQILast.NO2.color_id as no2_color_id, 
        station.AQILast.O3.value as o3_val,
        station.AQILast.O3.color_id as o3_color_id,
        station.AQILast.PM10.value as pm10_val, 
        station.AQILast.PM10.color_id as pm10_color_id,
        station.AQILast.PM25.value as pm25_val,  
        station.AQILast.PM25.color_id as pm25_color_id,
        station.AQILast.SO2.value as so2_val,  
        station.AQILast.SO2.color_id as so2_color_id 
    from
        raw_data
""")

In [74]:
data_table.show()        

+----------+----------+-----+-------+------------+------+-----------+-------+------------+------+-----------+--------+-------------+--------+-------------+-------+------------+
|station_id|      date| time|aqi_val|aqi_color_id|co_val|co_color_id|no2_val|no2_color_id|o3_val|o3_color_id|pm10_val|pm10_color_id|pm25_val|pm25_color_id|so2_val|so2_color_id|
+----------+----------+-----+-------+------------+------+-----------+-------+------------+------+-----------+--------+-------------+--------+-------------+-------+------------+
|       02t|2024-04-21|21:00|    122|           4|    -1|          0|     -1|           0|    -1|          0|      -1|            0|    45.6|            4|     -1|           0|
|       03t|2024-04-21|17:00|    113|           4|    -1|          0|     -1|           0|    -1|          0|      -1|            0|    42.0|            4|     -1|           0|
|       05t|2024-04-21|21:00|    125|           4|    -1|          0|     -1|           0|    -1|          0|      

In [75]:
# Replace values less than 0 with 0
data_table = data_table.withColumn("aqi_val", when(data_table["aqi_val"] < 0, 0).otherwise(data_table["aqi_val"]))
data_table = data_table.withColumn("co_val", when(data_table["co_val"] < 0, 0).otherwise(data_table["co_val"]))
data_table = data_table.withColumn("no2_val", when(data_table["no2_val"] < 0, 0).otherwise(data_table["no2_val"]))
data_table = data_table.withColumn("o3_val", when(data_table["o3_val"] < 0, 0).otherwise(data_table["o3_val"]))
data_table = data_table.withColumn("pm10_val", when(data_table["pm10_val"] < 0, 0).otherwise(data_table["pm10_val"]))
data_table = data_table.withColumn("pm25_val", when(data_table["pm25_val"] < 0, 0).otherwise(data_table["pm25_val"]))
data_table = data_table.withColumn("so2_val", when(data_table["so2_val"] < 0, 0).otherwise(data_table["so2_val"]))


In [76]:
data_table.show()     

+----------+----------+-----+-------+------------+------+-----------+-------+------------+------+-----------+--------+-------------+--------+-------------+-------+------------+
|station_id|      date| time|aqi_val|aqi_color_id|co_val|co_color_id|no2_val|no2_color_id|o3_val|o3_color_id|pm10_val|pm10_color_id|pm25_val|pm25_color_id|so2_val|so2_color_id|
+----------+----------+-----+-------+------------+------+-----------+-------+------------+------+-----------+--------+-------------+--------+-------------+-------+------------+
|       02t|2024-04-21|21:00|    122|           4|     0|          0|      0|           0|     0|          0|       0|            0|    45.6|            4|      0|           0|
|       03t|2024-04-21|17:00|    113|           4|     0|          0|      0|           0|     0|          0|       0|            0|    42.0|            4|      0|           0|
|       05t|2024-04-21|21:00|    125|           4|     0|          0|      0|           0|     0|          0|      

#### Save Structured Data im Cleaned Zone

In [77]:
data_table.write.partitionBy("date").mode("overwrite").csv(cleaned_zone_csv )

In [78]:
data_table.write.partitionBy("date").mode("overwrite").parquet(cleaned_zone_parquet)

### Create the Data Table

In [84]:
data_table.write.mode("overwrite").csv(data_dir)