In [1]:
%load_ext dotenv
%dotenv

In [2]:
import logging
from typing import Optional

from pyspark.sql import SparkSession

# Setup basic logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class SimpleSparkSession:
    """Simple Spark session builder for Jupyter notebooks"""

    def __init__(
            self,
            app_name="Jupyter Spark Session",
            master="local[*]",
            spark_config=None,
            enable_hive_support=False,
            # S3 configuration
            s3_bucket_name=None,
            s3_endpoint=None,
            s3_access_key=None,
            s3_secret_key=None,
            s3_region="us-east-1",
            s3_path_style_access=True,
            # PostgreSQL configuration
            postgres_config=None,
            # Package configuration
            packages=None
    ):
        self.app_name = app_name
        self.master = master
        self.spark_config = spark_config or {}
        self.enable_hive_support = enable_hive_support

        # S3 config
        self.s3_bucket_name = s3_bucket_name
        self.s3_endpoint = s3_endpoint
        self.s3_access_key = s3_access_key
        self.s3_secret_key = s3_secret_key
        self.s3_region = s3_region
        self.s3_path_style_access = s3_path_style_access

        # PostgreSQL config
        self.postgres_config = postgres_config
        self.jdbc_driver_path: Optional[str] = None

        # Packages
        self.packages = packages or []

        self._session = None

    def build_session(self):
        """Build and return a SparkSession"""
        if self._session is not None:
            return self._session

        # Start building the session
        builder = SparkSession.builder.appName(self.app_name).master(self.master)

        builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
            .config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow") \
            .config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow") \
 \
            # Add Hive support if requested
        if self.enable_hive_support:
            builder = builder.enableHiveSupport()

        if self.jdbc_driver_path:
            builder = builder.config("spark.driver.extraClassPath", self.jdbc_driver_path)
            builder = builder.config("spark.executor.extraClassPath", self.jdbc_driver_path)

        # Add all configuration options
        for key, value in self.spark_config.items():
            builder = builder.config(key, value)

        # Configure packages
        if self.packages:
            packages = ",".join(self.packages)
            builder = builder.config("spark.jars.packages", packages)

        # Add S3 configuration if credentials provided
        if self.s3_access_key and self.s3_secret_key:
            builder = builder.config("spark.hadoop.fs.s3a.access.key", self.s3_access_key)
            builder = builder.config("spark.hadoop.fs.s3a.secret.key", self.s3_secret_key)
            builder = builder.config("spark.hadoop.fs.s3a.aws.credentials.provider",
                                     "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

            # Config for non-AWS S3
            if self.s3_endpoint:
                builder = builder.config("spark.hadoop.fs.s3a.endpoint", self.s3_endpoint)
                builder = builder.config("spark.hadoop.fs.s3a.endpoint.region", self.s3_region)

            # Path style access for non-AWS implementations
            if self.s3_path_style_access:
                builder = builder.config("spark.hadoop.fs.s3a.path.style.access", "true")
                builder = builder.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
                builder = builder.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
                builder = builder.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false")

        # Build the session
        logger.info(f"Building Spark session with app name: {self.app_name}, master: {self.master}")
        self._session = builder.getOrCreate()

        return self._session

    def get_session(self):
        """Get the current SparkSession or create a new one"""
        return self.build_session()

    def stop_session(self):
        """Stop the current Spark session if it exists"""
        if self._session is not None:
            self._session.stop()
            self._session = None
            logger.info("Spark session stopped")

In [3]:
import os

os.getenv("POSTGRES_HOST")

'localhost'

In [4]:
spark = SimpleSparkSession(
    app_name="Data Analysis Notebook",
    packages=[
        "org.postgresql:postgresql:42.5.4",
        "org.apache.hadoop:hadoop-aws:3.3.4",
        "com.amazonaws:aws-java-sdk-bundle:1.12.426"
    ],
    s3_access_key=os.getenv("S3_ACCESS_KEY"),
    s3_secret_key=os.getenv("S3_SECRET_KEY"),
    s3_endpoint=os.getenv("S3_ENDPOINT"),
    s3_region="garage",
    s3_path_style_access=True,
    postgres_config={
        "user": os.getenv("POSTGRES_USER"),
        "password": os.getenv("POSTGRES_PASSWORD"),
        "driver": "org.postgresql.Driver",
        "currentSchema": "public"
    },
    enable_hive_support=False,
    s3_bucket_name="traffy-troffi"
).get_session()

INFO:__main__:Building Spark session with app name: Data Analysis Notebook, master: local[*]
25/05/07 14:39:23 WARN Utils: Your hostname, PatrickChoDevMacbook.local resolves to a loopback address: 127.0.0.1; using 192.168.158.3 instead (on interface en0)
25/05/07 14:39:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/patrick/.ivy2/cache
The jars for the packages stored in: /Users/patrick/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cf039ce8-a28c-4aa0-94f3-91d4f2c05b4a;1.0
	confs: [default]
	found org.postgresql#postgresql;42.5.4 in central
	found org.checkerframework#checker-qual;3.5.0 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central


:: loading settings :: url = jar:file:/Users/patrick/Desktop/Workspace/Projects/traffy-troffi/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.426 in central
:: resolution report :: resolve 119ms :: artifacts dl 4ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.426 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.5.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	:: evicted modules:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 by [com.amazonaws#aws-java-sdk-bundle;1.12.426] in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   6   |   0   |   0   |   1  

In [5]:
df = spark.read.csv("s3a://traffy-troffi/traffy/fondue/traffy_fondue_latest.csv", header=True, multiLine=True,
                    mode="DROPMALFORMED")

25/05/07 14:39:25 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


In [6]:
df.printSchema()

root
 |-- ticket_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- organization: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- photo: string (nullable = true)
 |-- photo_after: string (nullable = true)
 |-- coords: string (nullable = true)
 |-- address: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- district: string (nullable = true)
 |-- province: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- state: string (nullable = true)
 |-- star: string (nullable = true)
 |-- count_reopen: string (nullable = true)
 |-- last_activity: string (nullable = true)



In [7]:
df.show(20)

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+-------------+--------------------+--------------+----+------------+--------------------+
|  ticket_id|                type|        organization|             comment|               photo|         photo_after|            coords|             address|subdistrict|district|     province|           timestamp|         state|star|count_reopen|       last_activity|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+-----------+--------+-------------+--------------------+--------------+----+------------+--------------------+
|2021-FYJTFP|         {ความสะอาด}|          เขตบางซื่อ|             ขยะเยอะ|https://storage.g...|                NULL|100.53084,13.81865|12/14 ถนน กรุงเทพ...|       NULL|    NULL|กรุงเทพมหานคร|

In [8]:
# spark.stop()

In [9]:
from pyspark.sql import functions as F

processed_df = df.filter((F.col('state') == 'เสร็จสิ้น') & (F.col("photo_after").isNotNull())).select(
    df['ticket_id'].alias("ticket_id"),
    df["comment"].alias("complaint"),
    F.to_timestamp(F.regexp_replace(F.col("last_activity").cast("string"), "\\+\\d{2}$", "")).alias("timestamp"),
    df["photo"].alias("image"),
    df["photo_after"].alias("image_after"),
    df["type"],
    # Parse coordinates to separate lat/long fields
    F.expr("cast(split(coords, ',')[1] as double)").alias("latitude"),
    F.expr("cast(split(coords, ',')[0] as double)").alias("longitude"),
    df["district"].alias("district"),
    df["subdistrict"].alias("subdistrict"),
).withColumn(
    "extracted_content",
    F.regexp_extract(F.col("type"), "\\{(.+)\\}", 1)
).withColumn(
    "categories",
    F.split(F.col("extracted_content"), ",")
).withColumn(
    "categories",
    F.transform(F.col("categories"), lambda x: F.trim(x))
    # Filter out empty strings
).withColumn(
    "categories",
    F.expr("filter(categories, x -> x != '')")
).drop("extracted_content", "type")

In [10]:
processed_df.sample(0.1).show(truncate=False)

+-----------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+-------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------+--------+---------+---------+-----------+--------------+
|ticket_id  |complaint                                                                                                                                                                                                                                                   

In [11]:
processed_df.printSchema()

root
 |-- ticket_id: string (nullable = true)
 |-- complaint: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- image: string (nullable = true)
 |-- image_after: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- district: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [12]:
processed_df.count()

                                                                                

618757

In [44]:
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import monotonically_increasing_id

# Add unique ID to original data
df_with_id = processed_df.withColumn("row_id", monotonically_increasing_id())

# Skip rows with empty categories arrays
df_with_categories = df_with_id.filter(F.size("categories") > 0)

# Explode array with ID
exploded = df_with_categories.select("row_id", "*", F.explode("categories").alias("category_item"))

# Filter out any remaining empty strings or nulls in category_item
exploded = exploded.filter((F.col("category_item") != "") & (F.col("category_item").isNotNull()))

# Fit indexer on unique categories only
indexer = StringIndexer().setInputCol("category_item").setOutputCol("category_idx").fit(
    exploded.select("category_item").distinct())

# Apply indexer
indexed = indexer.transform(exploded)

# Group back by ID to preserve the original structure while drop uncategorized
result_df = indexed.groupBy("row_id").agg(
    *[F.first(c).alias(c) for c in df_with_id.columns if c != "categories" and c != "row_id"],
    F.collect_list("category_item").alias("categories"),
    F.collect_list("category_idx").alias("categories_idx")
).drop("row_id")

# Re-add rows with empty categories that were filtered out (if you want to keep them)
empty_categories = df_with_id.filter(F.size("categories") == 0).drop("row_id")

                                                                                

In [45]:
result_df.select(F.explode("categories")).distinct().show(100)

[Stage 101:>                                                        (0 + 1) / 1]

+-----------+
|        col|
+-----------+
|    ห้องน้ำ|
|    คนจรจัด|
| การเดินทาง|
|      จราจร|
|     สอบถาม|
|  ป้ายจราจร|
|    ทางเท้า|
|ท่อระบายน้ำ|
|        ถนน|
|    กีดขวาง|
|  ความสะอาด|
|      สะพาน|
|     ต้นไม้|
|  ร้องเรียน|
|    เสนอแนะ|
| เสียงรบกวน|
| สัตว์จรจัด|
|ความปลอดภัย|
|      สายไฟ|
|   แสงสว่าง|
|       คลอง|
|    น้ำท่วม|
|      PM2.5|
|       ป้าย|
+-----------+



                                                                                

In [46]:
result_df.show()

[Stage 107:>                                                        (0 + 1) / 1]

+-----------+--------------------+--------------------+--------------------+--------------------+--------+---------+-----------+-----------+--------------------+-----------------+
|  ticket_id|           complaint|           timestamp|               image|         image_after|latitude|longitude|   district|subdistrict|          categories|   categories_idx|
+-----------+--------------------+--------------------+--------------------+--------------------+--------+---------+-----------+-----------+--------------------+-----------------+
|2021-CGPMUN|น้ำท่วมเวลาฝนตกแล...|2022-06-21 08:21:...|https://storage.g...|https://storage.g...|13.67891|100.66709|     ประเวศ|    หนองบอน|[น้ำท่วม, ร้องเรียน]|     [12.0, 15.0]|
|2022-FX8UWF|เเจ้งเรื่อง เสียง...|2022-12-01 09:39:...|https://storage.g...|https://storage.g...|13.82443|100.59393|   ลาดพร้าว|   ลาดพร้าว|        [เสียงรบกวน]|           [22.0]|
|2022-9PH3BZ|1. ซ่อมสายไฟ กรีด...|2022-06-29 08:47:...|https://storage.g...|https://storage.g...| 13

                                                                                

In [47]:
empty_categories.show()

+-----------+--------------------+--------------------+--------------------+--------------------+--------+---------+-----------+-----------+----------+
|  ticket_id|           complaint|           timestamp|               image|         image_after|latitude|longitude|   district|subdistrict|categories|
+-----------+--------------------+--------------------+--------------------+--------------------+--------+---------+-----------+-----------+----------+
|2021-4D9Y98|หน้าปากซอย ลาดพร้...|2023-03-14 12:09:...|https://storage.g...|https://storage.g...| 13.8091|100.59131|   ลาดพร้าว|   ลาดพร้าว|        []|
|2021-7U9RED|ยังไม่มีหน่วยงานไ...|2023-05-17 06:11:...|https://storage.g...|https://storage.g...|13.77832|100.50848|      ดุสิต|      ดุสิต|        []|
|2021-7K6QA3|ระยะหลังๆ นี้ พบเ...|2022-06-24 06:32:...|https://storage.g...|https://storage.g...|13.72812|100.65617|     ประเวศ|     ประเวศ|        []|
|2021-AKJBCU|แจ้งเรื่องพื้นผิว...|2022-09-28 08:35:...|https://storage.g...|https://stor

In [62]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType, DoubleType, FloatType

complaint_schema = StructType([
    StructField("ticket_id", StringType(), nullable=False, metadata={"description": "Ticket ID"}),
    StructField("complaint", StringType(), nullable=False, metadata={"description": "Complaint text"}),
    StructField("timestamp", TimestampType(), nullable=False, metadata={"description": "Timestamp of complaint"}),
    StructField("image", StringType(), nullable=False, metadata={"description": "Image URL"}),
    StructField("image_after", StringType(), nullable=False, metadata={"description": "Image URL after processing"}),
    StructField("latitude", DoubleType(), nullable=False, metadata={"description": "Latitude"}),
    StructField("longitude", DoubleType(), nullable=False, metadata={"description": "Longitude"}),
    StructField("district", StringType(), nullable=False,
                metadata={"description": "District", "ml_attr_type": "nominal"}),
    StructField("subdistrict", StringType(), nullable=False,
                metadata={"description": "Subdistrict", "ml_attr_type": "nominal"}),
    StructField("categories", ArrayType(StringType()), nullable=False, metadata={"description": "Categories"}),
    StructField("categories_idx", ArrayType(FloatType()), nullable=False,
                metadata={"description": "Categories indices"}),
])

In [63]:
null_analysis = result_df.select(
    *[F.count(F.when(F.col(c).isNull(), c)).alias(f"{c}_null_count") for c in result_df.columns],
)
null_analysis.show(truncate=False)

[Stage 153:>                                                      (0 + 10) / 11]

+--------------------+--------------------+--------------------+----------------+----------------------+-------------------+--------------------+-------------------+----------------------+---------------------+-------------------------+
|ticket_id_null_count|complaint_null_count|timestamp_null_count|image_null_count|image_after_null_count|latitude_null_count|longitude_null_count|district_null_count|subdistrict_null_count|categories_null_count|categories_idx_null_count|
+--------------------+--------------------+--------------------+----------------+----------------------+-------------------+--------------------+-------------------+----------------------+---------------------+-------------------------+
|4862                |4862                |0                   |58              |0                     |0                  |0                   |320                |318                   |0                    |0                        |
+--------------------+--------------------+---------

                                                                                

In [64]:
from pyspark.sql import functions as F

# Step 1: Get list of non-nullable columns from your DataFrame schema
non_nullable_columns = [field.name for field in complaint_schema.fields if not field.nullable]

# Step 2: Print the non-nullable columns for verification
print("Non-nullable columns:", non_nullable_columns)

# Step 3: Create a filter condition to keep only rows without nulls in these columns
# This builds a condition like: col1 IS NOT NULL AND col2 IS NOT NULL AND...
if non_nullable_columns:
    filter_condition = " AND ".join([f"{col} IS NOT NULL" for col in non_nullable_columns])

    # Step 4: Apply the filter to drop rows with nulls in non-nullable columns
    cleaned_df = result_df.filter(filter_condition)

    # Step 5: Check how many rows were dropped
    original_count = result_df.count()
    cleaned_count = cleaned_df.count()
    print(f"Original row count: {original_count}")
    print(f"Cleaned row count: {cleaned_count}")
    print(f"Dropped {original_count - cleaned_count} rows with nulls in non-nullable columns")
else:
    print("No non-nullable columns found in schema")
    cleaned_df = result_df

Non-nullable columns: ['ticket_id', 'complaint', 'timestamp', 'image', 'image_after', 'latitude', 'longitude', 'district', 'subdistrict', 'categories', 'categories_idx']


[Stage 163:>                                                        (0 + 1) / 1]

Original row count: 531791
Cleaned row count: 526552
Dropped 5239 rows with nulls in non-nullable columns


                                                                                

In [65]:
null_analysis = cleaned_df.select(
    *[F.count(F.when(F.col(c).isNull(), c)).alias(f"{c}_null_count") for c in result_df.columns],
)
null_analysis.show(truncate=False)

[Stage 169:>                                                        (0 + 1) / 1]

+--------------------+--------------------+--------------------+----------------+----------------------+-------------------+--------------------+-------------------+----------------------+---------------------+-------------------------+
|ticket_id_null_count|complaint_null_count|timestamp_null_count|image_null_count|image_after_null_count|latitude_null_count|longitude_null_count|district_null_count|subdistrict_null_count|categories_null_count|categories_idx_null_count|
+--------------------+--------------------+--------------------+----------------+----------------------+-------------------+--------------------+-------------------+----------------------+---------------------+-------------------------+
|0                   |0                   |0                   |0               |0                     |0                  |0                   |0                  |0                     |0                    |0                        |
+--------------------+--------------------+---------

                                                                                

In [66]:
structure_df = spark.createDataFrame(cleaned_df.rdd, schema=complaint_schema)
structure_df.show()

[Stage 175:>                                                        (0 + 1) / 1]

+-----------+--------------------+--------------------+--------------------+--------------------+--------+---------+-----------+-----------+--------------------+-----------------+
|  ticket_id|           complaint|           timestamp|               image|         image_after|latitude|longitude|   district|subdistrict|          categories|   categories_idx|
+-----------+--------------------+--------------------+--------------------+--------------------+--------+---------+-----------+-----------+--------------------+-----------------+
|2021-CGPMUN|น้ำท่วมเวลาฝนตกแล...|2022-06-21 08:21:...|https://storage.g...|https://storage.g...|13.67891|100.66709|     ประเวศ|    หนองบอน|[น้ำท่วม, ร้องเรียน]|     [12.0, 15.0]|
|2022-FX8UWF|เเจ้งเรื่อง เสียง...|2022-12-01 09:39:...|https://storage.g...|https://storage.g...|13.82443|100.59393|   ลาดพร้าว|   ลาดพร้าว|        [เสียงรบกวน]|           [22.0]|
|2022-9PH3BZ|1. ซ่อมสายไฟ กรีด...|2022-06-29 08:47:...|https://storage.g...|https://storage.g...| 13

                                                                                

In [68]:
structure_df.printSchema()

root
 |-- ticket_id: string (nullable = false)
 |-- complaint: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- image: string (nullable = false)
 |-- image_after: string (nullable = false)
 |-- latitude: double (nullable = false)
 |-- longitude: double (nullable = false)
 |-- district: string (nullable = false)
 |-- subdistrict: string (nullable = false)
 |-- categories: array (nullable = false)
 |    |-- element: string (containsNull = true)
 |-- categories_idx: array (nullable = false)
 |    |-- element: float (containsNull = true)



In [58]:
structure_df.write.mode("overwrite").parquet("s3a://traffy-troffi/spark/traffy/fondue")

25/05/07 14:54:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/05/07 14:54:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/05/07 14:54:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/05/07 14:55:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/05/07 14:55:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [70]:
structure_df.write.mode("append").jdbc(table='traffy_fondue',
                                       url="jdbc:postgresql://localhost:5432/traffy-troffi",
                                       properties={"user": "postgres", "password": "troffi",
                                                   "driver": "org.postgresql.Driver",
                                                   "currentSchema": "public"})

                                                                                