In [1]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
    !tar xf spark-3.3.2-bin-hadoop3.tgz
    !mv spark-3.3.2-bin-hadoop3 spark
    !pip install -q findspark
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark"

In [2]:
import pandas as pd

import findspark
findspark.init()
spark_url = 'local'
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import split, col
from pyspark.sql import functions as F

In [3]:
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark Data Prep')\
        .config('spark.ui.port', '4040')\
        .config("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true") \
        .config("spark.driver.memory", "16g") \
        .config("spark.executor.memory", "16g") \
        .getOrCreate()
sc = spark.sparkContext
path = 'bangkok_traffy.csv'

In [4]:
df = spark.read.option("multiline", "true") \
               .option("quote", "\"") \
               .option("escape", "\"") \
               .csv(path, header=True, inferSchema=True)

# Data preprocessing

In [5]:
df = df.dropDuplicates()

In [6]:
df = df.dropna(subset=['ticket_id', 'organization','timestamp','last_activity','state','coords', 'type'], how='any')

## preprocessing state and completion time calculation

In [7]:
df = df.filter(col('state')=="เสร็จสิ้น")

In [8]:
df = df.withColumn('completion_time_hours', 
                   (F.unix_timestamp('last_activity') - F.unix_timestamp('timestamp')) / 3600)

## preprocessing coordinations

In [9]:
# Split the 'coords' column into two new columns 'latitude' and 'longitude'
df = df.withColumn('longitude', split(df['coords'], ',').getItem(0).cast('double')) \
       .withColumn('latitude', split(df['coords'], ',').getItem(1).cast('double'))


## deal with out of range coordination (outlier)

In [10]:
# Define the latitude and longitude bounds mainly in กรุงเทพ
latitude_min = 13.4
latitude_max = 14.3
longitude_min = 99.8
longitude_max = 101

# keep only the rows within the defined bounds
df = df.filter(
    (df['latitude'] >= latitude_min) & (df['latitude'] <= latitude_max) &
    (df['longitude'] >= longitude_min) & (df['longitude'] <= longitude_max))

## preprocessing province

In [11]:
# keep only rows that province are กรุงเทพ และ ปริมณฑล
pattern = "(?i)กรุงเทพ|Bangkok|นนทบุรี|นครปฐม|ปทุมธานี|สมุทรปราการ|สมุทรสาคร"
df = df.filter(col("province").isNotNull() & col("province").rlike(pattern))

In [12]:
# Apply pattern matching and replace values
df = df.withColumn(
    "province",
    F.when(
        F.col("province").rlike("(?i).*กรุงเทพ.*|.*Bangkok.*"), "กรุงเทพมหานคร"
    )
    .when(F.col("province").rlike("(?i).*นนทบุรี.*"), "นนทบุรี")
    .when(F.col("province").rlike("(?i).*สมุทรปราการ.*"), "สมุทรปราการ")
    .when(F.col("province").rlike("(?i).*ปทุมธานี.*"), "ปทุมธานี")
    .when(F.col("province").rlike("(?i).*สมุทรสาคร.*"), "สมุทรสาคร")
    .when(F.col("province").rlike("(?i).*นครปฐม.*"), "นครปฐม")
    .otherwise(F.col("province"))
)


## preprocessing organization

In [13]:
# 'organization' to array
df = df.withColumn('organization_array', split(F.regexp_replace(F.col('organization'), '\\s*,\\s*', ','), ','))

In [14]:
# remove organization from the array that look like person names in parentheses, e.g. "... (นาง...)"/"... (นาย...)"
person_pattern = r'\( *?(นาย|นาง|น\.ส\.|นางสาว)[^\)]*\)'

# trim items, remove empties and remove items matching the person pattern
df = df.withColumn(
    "organization_array",
    F.filter(
        F.transform(F.col("organization_array"), lambda x: F.trim(x)),
        lambda x: (x != "") & (~F.lower(x).rlike(person_pattern))
    )
)

In [16]:
pathOrgsLoc = 'org_with_loc_v2.csv'
dfOrg = spark.read.csv(pathOrgsLoc, header=True, inferSchema=True)
dfOrg.printSchema()

# prepare org location dictionary for fast lookup
org_loc = {row['organization_raw']: (row['latitude'], row['longitude']) for row in dfOrg.collect()}

root
 |-- organization_raw: string (nullable = true)
 |-- displayName: string (nullable = true)
 |-- formattedAddress: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)



In [17]:
from pyspark.sql.types import ArrayType, StringType
# only keep organizations that have valid locations in the org_loc dictionary
def filter_orgs(orgs):
    filtered = []
    if orgs is None:
        return filtered
    for org in orgs:
        loc = org_loc.get(org)
        if loc and loc[0] != "Not Found" and loc[1] != "Not Found":
            filtered.append(org)
    return filtered

filter_orgs_udf = F.udf(filter_orgs, ArrayType(StringType()))

# apply filter to the array
df = df.withColumn('organization_array', filter_orgs_udf(col('organization_array')))

## calculate orgs distance

In [18]:
from pyspark.sql.types import ArrayType, DoubleType
import math
from pyspark.sql.functions import udf, col

# calculate haversine distance between two points
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in km
    lat1, lon1, lat2, lon2 = map(float, [lat1, lon1, lat2, lon2])
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi = math.radians(lat2 - lat1)
    dlambda = math.radians(lon2 - lon1)
    a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlambda/2)**2
    return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))




def calc_distances(orgs, lat, lon):
    if orgs is None or lat is None or lon is None:
        return []
    distances = set()
    for org in orgs:
        loc = org_loc.get(org)
        if loc and loc[0] != "Not Found" and loc[1] != "Not Found":
            try:
                dist = haversine(lat, lon, float(loc[0]), float(loc[1]))
                distances.add(dist)
            except Exception:
                continue
    return list(distances)

calc_distances_udf = udf(calc_distances, ArrayType(DoubleType()))

# calculate distances from ticket latitude and longitude to each organization's location
df = df.withColumn(
    'organization_distances',
    calc_distances_udf(col('organization_array'), col('latitude'), col('longitude'))
)



## preprocessing type

In [19]:
# remove rows with empty type
df = df.filter(df['type'] != '{}')
# Remove '{' and '}' and split the 'type' column by ','
df = df.withColumn('type_array', F.split(F.regexp_replace(df['type'], '[\{\}]', ''), ','))

In [20]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import explode
from pyspark.ml.feature import OneHotEncoder

# a row will will get exploded into multiple rows, one for each type in the type_array
df_exploded = df.withColumn("type_array", explode(df["type_array"]))

indexer = StringIndexer(inputCol="type_array", outputCol="type_index")
df_indexed = indexer.fit(df_exploded).transform(df_exploded)


encoder = OneHotEncoder(inputCol="type_index", outputCol="type_onehot")
df_encoded = encoder.fit(df_indexed).transform(df_indexed)

In [21]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import functions as F

# Aggregate one-hot encoded vectors for each ID
df_combined = df_encoded.groupBy("ticket_id").agg(
    F.collect_list("type_onehot").alias("type_onehots")
)

df = df.join(df_combined, on='ticket_id', how='left')


# Final data

In [22]:
df.filter(df.organization == "").show()

+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+----+------------+-------------+---------------------+---------+--------+------------------+----------------------+----------+------------+
|ticket_id|type|organization|comment|photo|photo_after|coords|address|subdistrict|district|province|timestamp|state|star|count_reopen|last_activity|completion_time_hours|longitude|latitude|organization_array|organization_distances|type_array|type_onehots|
+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+----+------------+-------------+---------------------+---------+--------+------------------+----------------------+----------+------------+
+---------+----+------------+-------+-----+-----------+------+-------+-----------+--------+--------+---------+-----+----+------------+-------------+---------------------+---------+--------+------------------+----------------------+-

In [23]:
df.count()

544599

In [24]:
df.printSchema()

root
 |-- ticket_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- organization: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- photo: string (nullable = true)
 |-- photo_after: string (nullable = true)
 |-- coords: string (nullable = true)
 |-- address: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- district: string (nullable = true)
 |-- province: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- star: integer (nullable = true)
 |-- count_reopen: integer (nullable = true)
 |-- last_activity: timestamp (nullable = true)
 |-- completion_time_hours: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- organization_array: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- organization_distances: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- type_ar

In [25]:
dfProcessed = df.select(df.latitude, df.longitude, df.organization_array, df.organization_distances, df.type_array, df.type_onehots, df.completion_time_hours)
train_data, test_data = dfProcessed.randomSplit([0.8, 0.2], seed=1234)


In [None]:
dfProcessed.coalesce(1).write.mode("overwrite").json('file:///C:/Users/Noon/Documents/DSDE/projectTraffy/raw_processed.json')

In [None]:
train_data.coalesce(1).write.mode("overwrite").json('file:///C:/Users/Noon/Documents/DSDE/projectTraffy/train_data.json')

In [None]:
test_data.coalesce(1).write.mode("overwrite").json('file:///C:/Users/Noon/Documents/DSDE/projectTraffy/test_data.json')