# 0. Initialization

## Spark Preparation (Google Colab Only)

In [17]:
try:
  import google.colab
  IN_COLAB = True
except:
  IN_COLAB = False

In [18]:
if IN_COLAB:
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
    !tar xf spark-3.3.2-bin-hadoop3.tgz
    !mv spark-3.3.2-bin-hadoop3 spark
    !pip install -q findspark

In [19]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark"

## Start a Local Cluster

In [20]:
import findspark
findspark.init()
spark_url = 'local'

In [21]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master(spark_url)\
        .appName('Spark ML')\
        .getOrCreate()

# 1. Data Engineering (DE) Spark

## Spark SQL Data Preparation

In [22]:
from pyspark import SparkFiles

# Data from https://www.traffy.in.th/?page_id=27351
# Update every 3hr

# url = 'https://publicapi.traffy.in.th/dump-csv-chadchart/bangkok_traffy.csv'
# spark.sparkContext.addFile(url)

In [23]:
# download data from google drive
# https://drive.google.com/file/d/1Fd1R_ZUBlRGuYwx4AH52u1VKWEbJpV1L/view?usp=share_link
# data at 12-May-2023 21:46
!gdown 1Fd1R_ZUBlRGuYwx4AH52u1VKWEbJpV1L

Downloading...
From: https://drive.google.com/uc?id=1Fd1R_ZUBlRGuYwx4AH52u1VKWEbJpV1L
To: /content/bangkok_traffy.csv
100% 357M/357M [00:01<00:00, 239MB/s]


In [24]:
path = '/content/bangkok_traffy.csv'

df = spark.read.option("delimiter", ",").option("multiline", "true").option("quote", '"').option("header", "true").option("escape", "\\").option("escape", '"').csv(path)
df.show()

# df = spark.read.option("delimiter", ",").option("multiline", "true").option("quote", '"').option("header", "true").option("escape", "\\").option("escape", '"').csv("file://" + SparkFiles.get("bangkok_traffy.csv"))
# df.show()

+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------------+-----------------+--------------------+--------------------+--------------+----+------------+--------------------+
|  ticket_id|                type|        organization|             comment|               photo|         photo_after|            coords|             address|     subdistrict|         district|            province|           timestamp|         state|star|count_reopen|       last_activity|
+-----------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+--------------------+----------------+-----------------+--------------------+--------------------+--------------+----+------------+--------------------+
|2021-9LHDM6|                  {}|                null|            ไม่มีภาพ|https://storage.g...|                null|100.48661,13

In [25]:
df.count()

270632

In [26]:
df.printSchema()

root
 |-- ticket_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- organization: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- photo: string (nullable = true)
 |-- photo_after: string (nullable = true)
 |-- coords: string (nullable = true)
 |-- address: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- district: string (nullable = true)
 |-- province: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- state: string (nullable = true)
 |-- star: string (nullable = true)
 |-- count_reopen: string (nullable = true)
 |-- last_activity: string (nullable = true)



In [27]:
# drop unused column
cols = ['ticket_id','photo', 'photo_after']
df = df.drop(*cols)

 ## Convert to proper data type

In [28]:
from pyspark.sql.functions import col
cols = ['star', 'count_reopen']
for c in cols:
    df = df.withColumn(c, col(c).cast('int'))

In [29]:
cols = ['timestamp', 'last_activity']
for c in cols:
    df = df.withColumn(c, col(c).cast('timestamp'))

In [30]:
from pyspark.sql.functions import split, regexp_replace
cols = ['type']
for c in cols:
    df = df.withColumn(c, split(regexp_replace(col(c), "[{}]", ""), ","))

In [31]:
cols = ['organization', 'coords']
for c in cols:
  df = df.withColumn(c, split(col(c), ","))

In [33]:
from pyspark.sql.functions import col, round

# Split the coords array into separate latitude and longitude columns
df = df.withColumn("latitude", col("coords").getItem(0).cast("double")) \
             .withColumn("latitude", round(col("latitude"), 2)) \
             .withColumn("longitude", col("coords").getItem(1).cast("double")) \
             .withColumn("longitude", round(col("longitude"), 2)) \
             .drop("coords")

In [34]:
df.printSchema()

root
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- organization: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- comment: string (nullable = true)
 |-- address: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- district: string (nullable = true)
 |-- province: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- star: integer (nullable = true)
 |-- count_reopen: integer (nullable = true)
 |-- last_activity: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [35]:
df.show()

+--------------------+--------------------+--------------------+--------------------+----------------+-----------------+--------------------+--------------------+--------------+----+------------+--------------------+--------+---------+
|                type|        organization|             comment|             address|     subdistrict|         district|            province|           timestamp|         state|star|count_reopen|       last_activity|latitude|longitude|
+--------------------+--------------------+--------------------+--------------------+----------------+-----------------+--------------------+--------------------+--------------+----+------------+--------------------+--------+---------+
|                  []|                null|            ไม่มีภาพ|1867 จรัญสนิทวงศ์...|         บางพลัด|          บางพลัด|       กรุงเทพมหานคร|2021-09-01 10:44:...|กำลังดำเนินการ|null|        null|2022-02-22 04:59:...|  100.49|    13.79|
|         [ความสะอาด]|        [เขตบางซื่อ]|             

## Drop null

In [36]:
# count null
import pyspark.sql.functions as F

df_agg = df.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns])

In [37]:
df_agg.show()

+----+------------+-------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+--------+---------+
|type|organization|comment|address|subdistrict|district|province|timestamp|state|  star|count_reopen|last_activity|latitude|longitude|
+----+------------+-------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+--------+---------+
|  97|        1002|   2648|   2648|         72|      74|      25|        0|    0|171624|      117695|            0|       0|        0|
+----+------------+-------+-------+-----------+--------+--------+---------+-----+------+------------+-------------+--------+---------+



In [38]:
df.filter("last_activity is NULL").show()

+----+------------+-------+-------+-----------+--------+--------+---------+-----+----+------------+-------------+--------+---------+
|type|organization|comment|address|subdistrict|district|province|timestamp|state|star|count_reopen|last_activity|latitude|longitude|
+----+------------+-------+-------+-----------+--------+--------+---------+-----+----+------------+-------------+--------+---------+
+----+------------+-------+-------+-----------+--------+--------+---------+-----+----+------------+-------------+--------+---------+



In [39]:
# drop rows where last_activity = null
df = df.na.drop(subset=["last_activity"])

In [40]:
df.count()

270632

## filter latitude and longitude

In [41]:
# Select only latitude and longitude inside Bangkok area
df = df.filter((col("latitude") >= 100.32) & (col("latitude") <= 100.96) & \
               (col("longitude") >= 13.40) & (col("longitude") <= 13.96))

df.count()

270299

## calculate date_to_state

In [42]:
from pyspark.sql.functions import datediff

# calculate date_to_state
df = df.withColumn("date_to_state", datediff(col("last_activity"), col("timestamp")))
df.show()

+--------------------+--------------------+--------------------+--------------------+----------------+-----------------+--------------------+--------------------+--------------+----+------------+--------------------+--------+---------+-------------+
|                type|        organization|             comment|             address|     subdistrict|         district|            province|           timestamp|         state|star|count_reopen|       last_activity|latitude|longitude|date_to_state|
+--------------------+--------------------+--------------------+--------------------+----------------+-----------------+--------------------+--------------------+--------------+----+------------+--------------------+--------+---------+-------------+
|                  []|                null|            ไม่มีภาพ|1867 จรัญสนิทวงศ์...|         บางพลัด|          บางพลัด|       กรุงเทพมหานคร|2021-09-01 10:44:...|กำลังดำเนินการ|null|        null|2022-02-22 04:59:...|  100.49|    13.79|          174|


## (Run everything before this, after this is optional)

## Visualization Start here

In [43]:
df.printSchema()

root
 |-- type: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- organization: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- comment: string (nullable = true)
 |-- address: string (nullable = true)
 |-- subdistrict: string (nullable = true)
 |-- district: string (nullable = true)
 |-- province: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- state: string (nullable = true)
 |-- star: integer (nullable = true)
 |-- count_reopen: integer (nullable = true)
 |-- last_activity: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- date_to_state: integer (nullable = true)



In [44]:
from pyspark.sql.functions import explode

df_vis = df.select(explode('type').alias('type_exploded'), "*").drop('type','organization','comment')
df_vis.show()

df_vis.write.option("header", "true").csv(path="/content/spark_output/vis")

+-------------+--------------------+--------------+---------+--------------------+--------------------+--------------+----+------------+--------------------+--------+---------+-------------+
|type_exploded|             address|   subdistrict| district|            province|           timestamp|         state|star|count_reopen|       last_activity|latitude|longitude|date_to_state|
+-------------+--------------------+--------------+---------+--------------------+--------------------+--------------+----+------------+--------------------+--------+---------+-------------+
|             |1867 จรัญสนิทวงศ์...|       บางพลัด|  บางพลัด|       กรุงเทพมหานคร|2021-09-01 10:44:...|กำลังดำเนินการ|null|        null|2022-02-22 04:59:...|  100.49|    13.79|          174|
|    ความสะอาด|12/14 ถนน กรุงเทพ...|          null|     null|       กรุงเทพมหานคร|2021-09-03 12:51:...|     เสร็จสิ้น|null|        null|2022-06-04 15:34:...|  100.53|    13.82|          274|
|        สายไฟ|335/31 ลาดพร้าว แ...|     สามเ

## Focus on state

In [45]:
df_state_1 = df.groupby('state').count()
df_state_1.show()

+--------------+------+
|         state| count|
+--------------+------+
|   รอรับเรื่อง| 18501|
|กำลังดำเนินการ| 49431|
|     เสร็จสิ้น|202367|
+--------------+------+



In [46]:
df_state_1.write.option("header", "true").csv(path="/content/spark_output/state_1")

In [47]:
from pyspark.sql.functions import min, avg, max

df_state_2 = df.groupby('state').agg(min('date_to_state').alias('min_date_to_state'),
                         avg('date_to_state').alias('avg_date_to_state'),
                         max('date_to_state').alias('max_date_to_state'))
df_state_2.show(20, False)

+--------------+-----------------+------------------+-----------------+
|state         |min_date_to_state|avg_date_to_state |max_date_to_state|
+--------------+-----------------+------------------+-----------------+
|รอรับเรื่อง   |0                |11.590508621155614|336              |
|กำลังดำเนินการ|0                |85.57648034634137 |491              |
|เสร็จสิ้น     |0                |43.693438159383696|464              |
+--------------+-----------------+------------------+-----------------+



In [48]:
df_state_2.write.option("header", "true").csv(path="/content/spark_output/state_2")

In [49]:
from pyspark.sql.functions import year, month

df_state_3 = df.groupBy(year('timestamp').alias('year'), month('timestamp').alias('month')).count().orderBy('year', 'month')
df_state_3.show()

+----+-----+-----+
|year|month|count|
+----+-----+-----+
|2021|    9|   12|
|2021|   10|    1|
|2021|   11|    7|
|2021|   12|  162|
|2022|    1|  160|
|2022|    2|  172|
|2022|    3|  108|
|2022|    4|   24|
|2022|    5| 2214|
|2022|    6|61602|
|2022|    7|40821|
|2022|    8|26959|
|2022|    9|22306|
|2022|   10|13504|
|2022|   11|11471|
|2022|   12|11371|
|2023|    1|18572|
|2023|    2|18559|
|2023|    3|18193|
|2023|    4|17588|
+----+-----+-----+
only showing top 20 rows



In [50]:
df_state_3.write.option("header", "true").csv(path="/content/spark_output/state_3")

In [51]:
from pyspark.sql.functions import year, month, count

df_state_4 = df.groupBy(year('timestamp').alias('year'), month('timestamp').alias('month'), 'state')\
  .agg(count('date_to_state').alias('row_count'),
       min('date_to_state').alias('min_date_to_state'),
       avg('date_to_state').alias('avg_date_to_state'),
       max('date_to_state').alias('max_date_to_state'))\
  .orderBy('state','year', 'month')
df_state_4.show(100, False)

+----+-----+--------------+---------+-----------------+------------------+-----------------+
|year|month|state         |row_count|min_date_to_state|avg_date_to_state |max_date_to_state|
+----+-----+--------------+---------+-----------------+------------------+-----------------+
|2021|9    |กำลังดำเนินการ|9        |147              |155.33333333333334|174              |
|2021|11   |กำลังดำเนินการ|7        |89               |96.57142857142857 |107              |
|2021|12   |กำลังดำเนินการ|154      |54               |72.57792207792208 |491              |
|2022|1    |กำลังดำเนินการ|141      |1                |40.94326241134752 |436              |
|2022|2    |กำลังดำเนินการ|135      |0                |19.637037037037036|34               |
|2022|3    |กำลังดำเนินการ|56       |0                |7.910714285714286 |14               |
|2022|4    |กำลังดำเนินการ|3        |0                |0.0               |0                |
|2022|5    |กำลังดำเนินการ|300      |0                |189.08666666666

In [52]:
df_state_4.write.option("header", "true").csv(path="/content/spark_output/state_4")

In [53]:
from pyspark.sql.functions import year, month, count, sum, col

df_count = df.groupBy(year('timestamp').alias('year'), month('timestamp').alias('month'), 'state') \
             .agg(count('date_to_state').alias('row_count'))

df_total_count = df_count.groupBy('year', 'month') \
                         .agg(sum('row_count').alias('total_count'))

df_ratio = df_count.join(df_total_count, ['year', 'month']) \
                   .withColumn('state_ratio', col('row_count') / col('total_count')) \
                   

df_state_5 = df_ratio.orderBy('year', 'month','state')
df_state_5.show(100, False)

+----+-----+--------------+---------+-----------+--------------------+
|year|month|state         |row_count|total_count|state_ratio         |
+----+-----+--------------+---------+-----------+--------------------+
|2021|9    |กำลังดำเนินการ|9        |12         |0.75                |
|2021|9    |เสร็จสิ้น     |3        |12         |0.25                |
|2021|10   |เสร็จสิ้น     |1        |1          |1.0                 |
|2021|11   |กำลังดำเนินการ|7        |7          |1.0                 |
|2021|12   |กำลังดำเนินการ|154      |162        |0.9506172839506173  |
|2021|12   |เสร็จสิ้น     |8        |162        |0.04938271604938271 |
|2022|1    |กำลังดำเนินการ|141      |160        |0.88125             |
|2022|1    |เสร็จสิ้น     |19       |160        |0.11875             |
|2022|2    |กำลังดำเนินการ|135      |172        |0.7848837209302325  |
|2022|2    |รอรับเรื่อง   |8        |172        |0.046511627906976744|
|2022|2    |เสร็จสิ้น     |29       |172        |0.1686046511627907  |
|2022|

In [54]:
df_state_5.write.option("header", "true").csv(path="/content/spark_output/state_5")

In [55]:
df_state_6 = df_ratio.orderBy('state','year', 'month')
df_state_6.show(100, False)

+----+-----+--------------+---------+-----------+--------------------+
|year|month|state         |row_count|total_count|state_ratio         |
+----+-----+--------------+---------+-----------+--------------------+
|2021|9    |กำลังดำเนินการ|9        |12         |0.75                |
|2021|11   |กำลังดำเนินการ|7        |7          |1.0                 |
|2021|12   |กำลังดำเนินการ|154      |162        |0.9506172839506173  |
|2022|1    |กำลังดำเนินการ|141      |160        |0.88125             |
|2022|2    |กำลังดำเนินการ|135      |172        |0.7848837209302325  |
|2022|3    |กำลังดำเนินการ|56       |108        |0.5185185185185185  |
|2022|4    |กำลังดำเนินการ|3        |24         |0.125               |
|2022|5    |กำลังดำเนินการ|300      |2214       |0.13550135501355012 |
|2022|6    |กำลังดำเนินการ|10050    |61602      |0.16314405376448818 |
|2022|7    |กำลังดำเนินการ|6378     |40821      |0.15624311016388623 |
|2022|8    |กำลังดำเนินการ|3721     |26959      |0.13802440743351013 |
|2022|

In [56]:
df_state_6.write.option("header", "true").csv(path="/content/spark_output/state_6")

## Focus on type

In [57]:
from pyspark.sql.functions import explode, size

df_exploded = df.select(explode('type').alias('type_exploded'), 'state', 'date_to_state')

df_grouped = df_exploded.groupBy('type_exploded', 'state') \
                        .agg(count('date_to_state').alias('row_count'),
                             min('date_to_state').alias('min_date_to_state'),
                             avg('date_to_state').alias('avg_date_to_state'),
                             max('date_to_state').alias('max_date_to_state')) \
                        .orderBy('type_exploded')

df_type_1 = df_grouped
df_type_1.show(100, False)

+-------------+--------------+---------+-----------------+------------------+-----------------+
|type_exploded|state         |row_count|min_date_to_state|avg_date_to_state |max_date_to_state|
+-------------+--------------+---------+-----------------+------------------+-----------------+
|             |รอรับเรื่อง   |4414     |0                |12.421159945627549|330              |
|             |กำลังดำเนินการ|8608     |0                |109.98013475836431|491              |
|             |เสร็จสิ้น     |49221    |0                |47.833485707320044|456              |
|PM2.5        |เสร็จสิ้น     |848      |0                |9.441037735849056 |96               |
|PM2.5        |กำลังดำเนินการ|227      |0                |7.488986784140969 |81               |
|PM2.5        |รอรับเรื่อง   |56       |0                |2.125             |27               |
|การเดินทาง   |รอรับเรื่อง   |84       |0                |3.6785714285714284|156              |
|การเดินทาง   |กำลังดำเนินการ|154      |

In [58]:
df_type_1.write.option("header", "true").csv(path="/content/spark_output/type_1")

## Focus on organization

In [59]:
from pyspark.sql.functions import explode, size

df_exploded = df.select(explode('organization').alias('organization_exploded'), 'state', 'date_to_state')

df_grouped = df_exploded.groupBy('organization_exploded', 'state') \
                        .agg(count('date_to_state').alias('row_count'),
                             min('date_to_state').alias('min_date_to_state'),
                             avg('date_to_state').alias('avg_date_to_state'),
                             max('date_to_state').alias('max_date_to_state')) \
                        .filter(col('state') == 'เสร็จสิ้น') \
                        .orderBy('row_count', ascending=False)

df_org_1 = df_grouped
df_org_1.show(100, False)

+-----------------------------------------------------+---------+---------+-----------------+------------------+-----------------+
|organization_exploded                                |state    |row_count|min_date_to_state|avg_date_to_state |max_date_to_state|
+-----------------------------------------------------+---------+---------+-----------------+------------------+-----------------+
|กลุ่มกรุงเทพใต้ (นายชาตรี วัฒนเขจร)                  |เสร็จสิ้น|32475    |0                |68.27729022324866 |340              |
|กลุ่มกรุงเทพตะวันออก (นายณรงค์ เรืองศรี)             |เสร็จสิ้น|32027    |0                |41.602772660567645|343              |
|กลุ่มกรุงเทพเหนือ (นางวันทนีย์ วัฒนะ)                |เสร็จสิ้น|27202    |0                |56.427505330490405|335              |
|กลุ่มกรุงเทพกลาง (นายสุขสันต์ กิตติศุภกร)            |เสร็จสิ้น|21876    |0                |38.814774181751694|346              |
|สำนักการโยธา กทม.                                    |เสร็จสิ้น|19246    |0       

In [60]:
df_org_1.write.option("header", "true").csv(path="/content/spark_output/org_1")

## Focus on district

In [61]:
df_grouped = df.groupBy('district', 'state') \
                        .agg(count('date_to_state').alias('row_count'),
                             min('date_to_state').alias('min_date_to_state'),
                             avg('date_to_state').alias('avg_date_to_state'),
                             max('date_to_state').alias('max_date_to_state')) \
                        .orderBy('district')

df_dist_1 = df_grouped
df_dist_1.show(100, False)

+-------------+--------------+---------+-----------------+------------------+-----------------+
|district     |state         |row_count|min_date_to_state|avg_date_to_state |max_date_to_state|
+-------------+--------------+---------+-----------------+------------------+-----------------+
|null         |กำลังดำเนินการ|4        |147              |195.75            |275              |
|null         |รอรับเรื่อง   |4        |0                |0.75              |2                |
|null         |เสร็จสิ้น     |35       |0                |85.62857142857143 |329              |
|กระทุ่มแบน   |รอรับเรื่อง   |3        |0                |0.0               |0                |
|คลองสาน      |เสร็จสิ้น     |2228     |0                |46.864901256732495|341              |
|คลองสาน      |รอรับเรื่อง   |260      |0                |14.457692307692307|321              |
|คลองสาน      |กำลังดำเนินการ|433      |0                |101.64665127020785|329              |
|คลองสามวา    |กำลังดำเนินการ|1214     |

In [62]:
df_dist_1.write.option("header", "true").csv(path="/content/spark_output/dist_1")

In [63]:
# rank by avg_date_to_state

df_grouped = df.groupBy('district', 'state') \
                        .agg(count('date_to_state').alias('row_count'),
                             min('date_to_state').alias('min_date_to_state'),
                             avg('date_to_state').alias('avg_date_to_state'),
                             max('date_to_state').alias('max_date_to_state')) \
                        .filter((col('state') == 'เสร็จสิ้น') & (col('row_count') > 50)) \
                        .orderBy('avg_date_to_state')

df_dist_2 = df_grouped
df_dist_2.show(100, False)

+-----------------+---------+---------+-----------------+------------------+-----------------+
|district         |state    |row_count|min_date_to_state|avg_date_to_state |max_date_to_state|
+-----------------+---------+---------+-----------------+------------------+-----------------+
|ราษฎร์บูรณะ      |เสร็จสิ้น|2075     |0                |15.88867469879518 |326              |
|บางคอแหลม        |เสร็จสิ้น|2466     |0                |16.714517437145176|328              |
|บางแค            |เสร็จสิ้น|7803     |0                |20.16403947199795 |341              |
|สัมพันธวงศ์      |เสร็จสิ้น|1074     |0                |20.459962756052143|346              |
|บางขุนเทียน      |เสร็จสิ้น|5134     |0                |21.030775223996883|329              |
|ทุ่งครุ          |เสร็จสิ้น|2723     |0                |21.808666911494676|341              |
|คันนายาว         |เสร็จสิ้น|1969     |0                |23.39207719654647 |306              |
|บางเขน           |เสร็จสิ้น|6863     |0          

In [64]:
df_dist_2.write.option("header", "true").csv(path="/content/spark_output/dist_2")

## Focus on star

In [65]:
df_grouped = df.groupBy('star', 'state') \
                        .agg(count('date_to_state').alias('row_count'),
                             min('date_to_state').alias('min_date_to_state'),
                             avg('date_to_state').alias('avg_date_to_state'),
                             max('date_to_state').alias('max_date_to_state')) \
                        .filter((col('state') == "เสร็จสิ้น")) \
                        .orderBy('star')

df_star_1 = df_grouped
df_star_1.show(100, False)

+----+---------+---------+-----------------+------------------+-----------------+
|star|state    |row_count|min_date_to_state|avg_date_to_state |max_date_to_state|
+----+---------+---------+-----------------+------------------+-----------------+
|null|เสร็จสิ้น|104732   |0                |45.65467096971317 |464              |
|1   |เสร็จสิ้น|13772    |0                |50.977708393842576|434              |
|2   |เสร็จสิ้น|4629     |0                |51.606826528407865|325              |
|3   |เสร็จสิ้น|10394    |0                |48.483740619588225|343              |
|4   |เสร็จสิ้น|21080    |0                |41.2600569259962  |342              |
|5   |เสร็จสิ้น|47760    |0                |36.55674204355109 |343              |
+----+---------+---------+-----------------+------------------+-----------------+



In [66]:
df_star_1.write.option("header", "true").csv(path="/content/spark_output/star_1")

## AI/ML Start predict time to เสร็จสิ้น here

In [67]:
df_ml = df.filter(df.state == "เสร็จสิ้น")
df_ml.show()

+--------------------+--------------------+--------------------+--------------------+-----------+--------+-------------+--------------------+---------+----+------------+--------------------+--------+---------+-------------+
|                type|        organization|             comment|             address|subdistrict|district|     province|           timestamp|    state|star|count_reopen|       last_activity|latitude|longitude|date_to_state|
+--------------------+--------------------+--------------------+--------------------+-----------+--------+-------------+--------------------+---------+----+------------+--------------------+--------+---------+-------------+
|         [ความสะอาด]|        [เขตบางซื่อ]|             ขยะเยอะ|12/14 ถนน กรุงเทพ...|       null|    null|กรุงเทพมหานคร|2021-09-03 12:51:...|เสร็จสิ้น|null|        null|2022-06-04 15:34:...|  100.53|    13.82|          274|
|[น้ำท่วม, ร้องเรียน]|[เขตประเวศ, ฝ่ายโ...|น้ำท่วมเวลาฝนตกแล...|189 เฉลิมพระเกียร...|    หนองบอน|  ประเว

In [68]:
df_ml.count()

202367

# 2. AI/ML

In [70]:
# find correlation between comment length and date_to_state
from pyspark.sql.functions import length, expr, corr
time_format = "yyyy-MM-dd HH:mm:ss"
df = df.withColumn("comment_len", length("comment"))
df = df.withColumn("date_to_state_sec", expr("cast(date_to_state as long)"))
correlation = df.select(corr("comment_len", "date_to_state_sec").alias("correlation")).collect()[0]["correlation"]
print(correlation)

-0.06673141980196019


In [98]:
df = df_ml.sample(False, 0.001, seed=0)

In [99]:
from pyspark.sql.functions import col, explode, when, length

df = df.drop('star', 'state', 'address', 'last_activity', 'subdistrict', 'district', 'province', 'organization', 'timestamp', 'count_reopen')

df = df.withColumn('comment_len', length(col('comment')))
df = df.drop('comment')

# Drop rows with null 'latitude', 'longitude', 'timestamp' and 'date_to_state'
df = df.dropna(subset=['date_to_state', 'type', 'latitude', 'longitude'])

# Explode 'type' and 'organization' columns
df = df.dropna(subset=['type'])
df = df.select('*', explode('type').alias('type_exploded'))
df = df.drop('type')
df = df.withColumn("type_exploded", when(col("type_exploded") == "", "unknown").otherwise(col("type_exploded")))

In [100]:
df.show()

+--------+---------+-------------+-----------+-------------+
|latitude|longitude|date_to_state|comment_len|type_exploded|
+--------+---------+-------------+-----------+-------------+
|  100.59|    13.93|           12|         65|      unknown|
|  100.59|    13.82|            2|         83|   เสียงรบกวน|
|  100.44|    13.67|            4|         75|      เสนอแนะ|
|  100.34|    13.69|           14|          9|      unknown|
|  100.68|    13.67|           11|        118|      กีดขวาง|
|  100.51|    13.72|           76|         53|      unknown|
|  100.64|    13.67|           11|         81|          ถนน|
|  100.62|    13.67|          134|         15|          ถนน|
|  100.61|    13.69|            4|        133|      unknown|
|  100.53|    13.81|          215|         33|      ทางเท้า|
|  100.53|    13.81|          215|         33|  ท่อระบายน้ำ|
|   100.6|    13.81|            0|         39|      น้ำท่วม|
|   100.7|    13.83|            6|        123|      กีดขวาง|
|   100.7|    13.83|    

In [101]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Update the list of input columns for StringIndexer
input_columns = ['type_exploded']

# Create StringIndexer for categorical columns
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep").fit(df) for column in input_columns]

# OneHotEncoder for categorical columns
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol()+"_encoded") for indexer in indexers]

# VectorAssembler to combine all features
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + ['latitude', 'longitude', 'comment_len'], outputCol="features")

# Regression model
rf = RandomForestRegressor(labelCol='date_to_state', featuresCol='features')

# Define a pipeline model
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# Split the data
(train_data, test_data) = df.randomSplit([0.8, 0.2], seed = 2023)

# Train the model
model = pipeline.fit(train_data)

# Make predictions
predictions = model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol="date_to_state", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data: ", rmse)

Root Mean Squared Error (RMSE) on test data:  57.27392046445446


In [75]:
5698797/60/60/24

65.95829861111112

In [76]:
15562591/60/60/24

180.12258101851853