In [1]:
import openmeteo_requests

import pandas as pd
import requests_cache
from retry_requests import retry

import numpy as np
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, isnan, when, substring, to_timestamp, expr, year, count

## Weather Data Api

In [2]:
# Setup the Open-Meteo API client with cache and retry on error
cache_session = requests_cache.CachedSession('.cache', expire_after = 1)
retry_session = retry(cache_session, retries = 5, backoff_factor = 0.2)
openmeteo = openmeteo_requests.Client(session = retry_session)

# Make sure all required weather variables are listed here
# The order of variables in hourly or daily is important to assign them correctly below
url = "https://historical-forecast-api.open-meteo.com/v1/forecast"
params = {
	"latitude": 13.74,
	"longitude": 100.52,
	"start_date": "2021-03-23",
	"end_date": "2025-05-02",
	"hourly": ["temperature_2m", "relative_humidity_2m", "apparent_temperature", "precipitation", "dew_point_2m", "rain"],
	"timezone": "Asia/Bangkok"
}
responses = openmeteo.weather_api(url, params=params)

# Process first location. Add a for-loop for multiple locations or weather models
response = responses[0]
print(f"Coordinates {response.Latitude()}°N {response.Longitude()}°E")
print(f"Elevation {response.Elevation()} m asl")
print(f"Timezone {response.Timezone()}{response.TimezoneAbbreviation()}")
print(f"Timezone difference to GMT+0 {response.UtcOffsetSeconds()} s")

# Process hourly data. The order of variables needs to be the same as requested.
hourly = response.Hourly()
hourly_temperature_2m = hourly.Variables(0).ValuesAsNumpy()
hourly_relative_humidity_2m = hourly.Variables(1).ValuesAsNumpy()
hourly_apparent_temperature = hourly.Variables(2).ValuesAsNumpy()
hourly_precipitation = hourly.Variables(3).ValuesAsNumpy()
hourly_dew_point_2m = hourly.Variables(4).ValuesAsNumpy()
hourly_rain = hourly.Variables(5).ValuesAsNumpy()

hourly_data = {"date": pd.date_range(
	start = pd.to_datetime(hourly.Time(), unit = "s", utc = True),
	end = pd.to_datetime(hourly.TimeEnd(), unit = "s", utc = True),
	freq = pd.Timedelta(seconds = hourly.Interval()),
	inclusive = "left"
)}

hourly_data["temperature_2m"] = hourly_temperature_2m
hourly_data["relative_humidity_2m"] = hourly_relative_humidity_2m
hourly_data["apparent_temperature"] = hourly_apparent_temperature
hourly_data["precipitation"] = hourly_precipitation
hourly_data["dew_point_2m"] = hourly_dew_point_2m
hourly_data["rain"] = hourly_rain

hourly_dataframe = pd.DataFrame(data = hourly_data)
hourly_dataframe['date'] = pd.to_datetime(hourly_dataframe['date']) + pd.Timedelta(hours=7)
hourly_dataframe = hourly_dataframe.iloc[7:]
hourly_dataframe = hourly_dataframe.rename(columns={'date': 'timestamp'})
hourly_dataframe.to_csv("data/hourly_data.csv", index = False)

Coordinates 13.75°N 100.5°E
Elevation 9.0 m asl
Timezone b'Asia/Bangkok'b'GMT+7'
Timezone difference to GMT+0 25200 s


## Start Spark Session

In [3]:
spark_url = 'local'
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark SQL')\
        .getOrCreate()

25/05/08 15:12:06 WARN Utils: Your hostname, Veerapats-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.201.140.123 instead (on interface en0)
25/05/08 15:12:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/08 15:12:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# spark.stop()

## Read Traffy Fondue data

In [5]:
traffy_df = spark.read.option("header", "true") \
                      .option("multiline", "true") \
                      .option("quote", '"') \
                      .option("escape", '"') \
                      .option("mode", "PERMISSIVE") \
                      .csv("data/bangkok_traffy.csv")
traffy_df.printSchema()

root
 |-- ticket_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- organization: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- photo: string (nullable = true)
 |-- photo_after: string (nullable = true)
 |-- coords: string (nullable = true)
 |-- address: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- district: string (nullable = true)
 |-- province: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- state: string (nullable = true)
 |-- star: string (nullable = true)
 |-- count_reopen: string (nullable = true)
 |-- last_activity: string (nullable = true)



In [6]:
null_counts = traffy_df.select([
    _sum(when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)).alias(c)
    for c in traffy_df.columns
])
null_counts.show()

25/05/08 15:12:21 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+
|ticket_id|type|organization|comment|photo|photo_after|coords|address|subdistrict|district|province|timestamp|state|  star|count_reopen|last_activity|
+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+
|     8772|  97|         571|   8772|  115|     145717|     0|   8772|        566|     561|     195|        0|    0|512929|           0|            0|
+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+



In [7]:
print(f"Rows: {traffy_df.count()}, Columns: {len(traffy_df.columns)}")

[Stage 4:>                                                          (0 + 1) / 1]

Rows: 787026, Columns: 16


                                                                                

## Clean Data

In [8]:
drop_columns = ['photo', 'photo_after', 'star']
keep_types = ["{ถนน}", "{ทางเท้า}", "{แสงสว่าง}", "{ความสะอาด}", "{กีดขวาง}", "{น้ำท่วม}",
              "{ท่อระบายน้ำ}", "{จราจร}", "{ป้าย}", "{ความปลอดภัย}", "{ต้นไม้}"]

traffy_df = traffy_df.drop(*drop_columns)
traffy_df = traffy_df.na.drop(subset=["type"])
traffy_df = traffy_df.filter(col("type") != "{}")
traffy_df = traffy_df.filter(col("type").isin(keep_types))

print(f"Rows: {traffy_df.count()}, Columns: {len(traffy_df.columns)}")

[Stage 7:>                                                          (0 + 1) / 1]

Rows: 326927, Columns: 13


                                                                                

In [9]:
traffy_df.groupBy("type").count().orderBy("count", ascending=False).show()

[Stage 10:>                                                         (0 + 1) / 1]

+-------------+-----+
|         type|count|
+-------------+-----+
|        {ถนน}|85651|
|    {ทางเท้า}|54485|
|   {แสงสว่าง}|34163|
|  {ความสะอาด}|32223|
|    {กีดขวาง}|28404|
|    {น้ำท่วม}|17860|
|{ท่อระบายน้ำ}|17496|
|      {จราจร}|17003|
|       {ป้าย}|15237|
|{ความปลอดภัย}|14409|
|     {ต้นไม้}| 9996|
+-------------+-----+



                                                                                

In [10]:
keep_years = ["2023", "2024", "2025"]

traffy_df = traffy_df.filter(substring(col("timestamp"), 1, 4).isin(keep_years))

traffy_df.select("timestamp").show(10, truncate=False)

+-----------------------------+
|timestamp                    |
+-----------------------------+
|2023-01-01 00:03:43.789292+00|
|2023-01-01 00:10:26.940037+00|
|2023-01-01 00:21:48.772987+00|
|2023-01-01 00:33:33.072634+00|
|2023-01-01 00:52:57.351703+00|
|2023-01-01 00:53:14.138525+00|
|2023-01-01 01:45:46.369+00   |
|2023-01-01 01:47:23.336628+00|
|2023-01-01 01:56:38.184056+00|
|2023-01-01 02:09:54.197892+00|
+-----------------------------+
only showing top 10 rows



                                                                                

In [11]:
keep_districts = ['คลองสาน', 'คลองสามวา', 'คลองเตย', 'คันนายาว', 'จตุจักร', 'จอมทอง', 'ดอนเมือง', 'ดินแดง', 'ดุสิต', 'ตลิ่งชัน', 'ทวีวัฒนา', 'ทุ่งครุ', 'ธนบุรี', 'บางกอกน้อย', 'บางกอกใหญ่', 'บางกะปิ', 'บางขุนเทียน', 'บางคอแหลม', 'บางซื่อ', 'บางนา', 'บางบอน', 'บางพลัด', 'บางรัก', 'บางเขน', 'บางแค', 'บึงกุ่ม', 'ปทุมวัน', 'ประเวศ', 'ป้อมปราบศัตรูพ่าย', 'พญาไท', 'พระนคร', 'พระโขนง', 'ภาษีเจริญ', 'มีนบุรี', 'ยานนาวา', 'ราชเทวี', 'ราษฎร์บูรณะ', 'ลาดกระบัง', 'ลาดพร้าว', 'วังทองหลาง', 'วัฒนา', 'สวนหลวง', 'สะพานสูง', 'สัมพันธวงศ์', 'สาทร', 'สายไหม', 'หนองจอก', 'หนองแขม', 'หลักสี่', 'ห้วยขวาง']

traffy_df = traffy_df.filter(col("district").isin(keep_districts))

null_counts = traffy_df.select([
    _sum(when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)).alias(c)
    for c in traffy_df.columns
])
null_counts.show()

print(f"Rows: {traffy_df.count()}, Columns: {len(traffy_df.columns)}")

                                                                                

+---------+----+------------+-------+------+-------+-----------+--------+--------+---------+-----+------------+-------------+
|ticket_id|type|organization|comment|coords|address|subdistrict|district|province|timestamp|state|count_reopen|last_activity|
+---------+----+------------+-------+------+-------+-----------+--------+--------+---------+-----+------------+-------------+
|     2293|   0|          44|   2293|     0|   2293|          0|       0|       0|        0|    0|           0|            0|
+---------+----+------------+-------+------+-------+-----------+--------+--------+---------+-----+------------+-------------+



[Stage 17:>                                                         (0 + 1) / 1]

Rows: 242989, Columns: 13


                                                                                

In [12]:
# Ensure timestamp is properly parsed
traffy_df = traffy_df.withColumn("timestamp", to_timestamp(col("timestamp")))

# Round to the nearest hour
traffy_df = traffy_df.withColumn(
    "rounded_hour",
    expr("""
        from_unixtime(
            round(unix_timestamp(timestamp) / 3600.0) * 3600
        )
    """)
)

# Extract year

traffy_df = traffy_df.withColumn("year", year(col("rounded_hour")))

# Show result
traffy_df.select("timestamp", "rounded_hour", "year").show(5, truncate=False)

+--------------------------+-------------------+----+
|timestamp                 |rounded_hour       |year|
+--------------------------+-------------------+----+
|2023-01-01 07:03:43.789292|2023-01-01 07:00:00|2023|
|2023-01-01 07:10:26.940037|2023-01-01 07:00:00|2023|
|2023-01-01 07:21:48.772987|2023-01-01 07:00:00|2023|
|2023-01-01 07:33:33.072634|2023-01-01 08:00:00|2023|
|2023-01-01 07:52:57.351703|2023-01-01 08:00:00|2023|
+--------------------------+-------------------+----+
only showing top 5 rows



                                                                                

## Read Weather Data

In [13]:
weather_df = spark.read.option("header", True) \
                       .option("inferSchema", True) \
                       .option("timestampFormat", "yyyy-MM-dd HH:mm:ssXXX") \
                       .csv("data/hourly_data.csv")

weather_df.show(5, truncate=False)
weather_df.printSchema()

+-------------------+--------------+--------------------+--------------------+-------------+------------+----+
|timestamp          |temperature_2m|relative_humidity_2m|apparent_temperature|precipitation|dew_point_2m|rain|
+-------------------+--------------+--------------------+--------------------+-------------+------------+----+
|2021-03-23 14:00:00|27.5805       |46.0                |27.061237           |NULL         |14.9252     |NULL|
|2021-03-23 15:00:00|28.630499     |43.0                |28.3903             |0.0          |14.8279295  |0.0 |
|2021-03-23 16:00:00|30.4805       |39.0                |30.377312           |0.0          |14.96784    |0.0 |
|2021-03-23 17:00:00|32.2305       |36.0                |33.20875            |0.0          |15.271539   |0.0 |
|2021-03-23 18:00:00|33.830498     |34.0                |36.10794            |0.0          |15.782984   |0.0 |
+-------------------+--------------+--------------------+--------------------+-------------+------------+----+
o

## Read Budget Data

In [14]:
budget_df = spark.read.option("header", True) \
                      .option("inferSchema", True) \
                      .csv("data/budget.csv")

budget_df.show(5, truncate=False)
budget_df.printSchema()

+---------+----+---------+
|district |year|budget   |
+---------+----+---------+
|คลองสาน  |2023|297660600|
|คลองสามวา|2023|436119000|
|คลองเตย  |2023|348985200|
|คันนายาว |2023|272905200|
|จตุจักร  |2023|495825600|
+---------+----+---------+
only showing top 5 rows

root
 |-- district: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- budget: integer (nullable = true)



## Read Population Data

In [15]:
population_df = spark.read.option("header", True) \
                          .option("inferSchema", True) \
                          .csv("data/bangkok_population.csv")

population_df.show(5, truncate=False)
population_df.printSchema()

+-----------+----------+
|district   |population|
+-----------+----------+
|คลองสามวา  |209120    |
|สายไหม     |208928    |
|บางแค      |192253    |
|บางเขน     |185369    |
|บางขุนเทียน|184805    |
+-----------+----------+
only showing top 5 rows

root
 |-- district: string (nullable = true)
 |-- population: integer (nullable = true)



## Merge The Dataframes into 1 Dataframe (merged_df)

In [16]:
# Ensure the timestamp columns are in the correct format (UTC)
weather_df = weather_df.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ssXXX"))

# Assuming 'traffy_df' already has 'rounded_hour' in timestamp format
# If not, you can convert it as well (e.g., to_timestamp)
# traffy_df = traffy_df.withColumn("rounded_hour", to_timestamp("rounded_hour", "yyyy-MM-dd HH:mm:ss"))

# Join traffy_df with weather_df on rounded_hour and timestamp
merged_df = traffy_df.join(weather_df, 
                           traffy_df["rounded_hour"] == weather_df["timestamp"], 
                           how="left")

# Drop the redundant 'timestamp' column from weather_df
merged_df = merged_df.drop("timestamp")

# Show the merged DataFrame
merged_df.show(5, truncate=False)

+-----------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------+--------------------------------------------------------------------------------------------------------------+-----------+-----------+--------------------+---------+------------+-----------------------------+-------------------+----+--------------+--------------------+--------------------+-------------+------------+----+
|ticket_id  |type         |organization                                                                                                                                                              

                                                                                

In [17]:
# Perform the join on 'year' and 'district' columns
merged_df = merged_df.join(
    budget_df, 
    on=['year', 'district'], 
    how='left'
)

# Show the relevant columns (year, district, and budget)
merged_df.select('year', 'district', 'budget').show(5, truncate=False)

+----+-----------+---------+
|year|district   |budget   |
+----+-----------+---------+
|2023|ราษฎร์บูรณะ|293458600|
|2023|พระโขนง    |249406400|
|2023|จตุจักร    |495825600|
|2023|ตลิ่งชัน   |339581400|
|2023|มีนบุรี    |428982700|
+----+-----------+---------+
only showing top 5 rows



                                                                                

In [18]:
# Perform the join on 'district' column
merged_df = merged_df.join(
    population_df, 
    on='district', 
    how='left'
)

# Show the relevant columns (e.g., district and population)
merged_df.select('district', 'population').show(5, truncate=False)

+-----------+----------+
|district   |population|
+-----------+----------+
|ราษฎร์บูรณะ|76590     |
|พระโขนง    |86290     |
|จตุจักร    |153792    |
|ตลิ่งชัน   |101802    |
|มีนบุรี    |141030    |
+-----------+----------+
only showing top 5 rows



                                                                                

In [19]:
null_counts = merged_df.select(
    [count(when(col(c).isNull(), 1)).alias(c) for c in merged_df.columns]
)

null_counts.show(truncate=False)

[Stage 42:>                                                         (0 + 1) / 1]

+--------+----+---------+----+------------+-------+------+-------+-----------+--------+-----+------------+-------------+------------+--------------+--------------------+--------------------+-------------+------------+----+------+----------+
|district|year|ticket_id|type|organization|comment|coords|address|subdistrict|province|state|count_reopen|last_activity|rounded_hour|temperature_2m|relative_humidity_2m|apparent_temperature|precipitation|dew_point_2m|rain|budget|population|
+--------+----+---------+----+------------+-------+------+-------+-----------+--------+-----+------------+-------------+------------+--------------+--------------------+--------------------+-------------+------------+----+------+----------+
|0       |0   |2293     |0   |44          |2293   |0     |2293   |0          |0       |0    |0           |0            |0           |0             |0                   |0                   |0            |0           |0   |0     |0         |
+--------+----+---------+----+------

                                                                                

In [20]:
# Drop the 'rounded_hour' and 'year' columns
merged_df = merged_df.drop('rounded_hour', 'year')

# Show the result to confirm
merged_df.show(5, truncate=False)

+-----------+-----------+-------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------+--------------------------------------------------------------------------------------------------------------+-----------+--------------------+---------+------------+-----------------------------+--------------+--------------------+--------------------+-------------+------------+----+---------+----------+
|district   |ticket_id  |type         |organization                                                                                                                                                      

                                                                                

## Save to csv to preprocess further

In [21]:
merged_df.coalesce(1).write.option("header", "true").csv("data/spark_results")

                                                                                