In [2]:
from pyspark.sql import Row
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.functions import col, split,hour, when,udf,array_contains,dayofweek
from pyspark.sql.types import ArrayType, StringType
import ast
import os
import shutil
from pyspark.sql import SparkSession
import requests
import shutil, glob

In [29]:
from pyspark.sql import SparkSession
import requests
import glob
import shutil

spark = SparkSession.builder.appName("711_busstop").getOrCreate()

query = """
[out:json][timeout:100];
(
    node["shop"="convenience"]["name"="7-Eleven"](13,100.3,14.5,101.5);
    node["amenity"="bus_station"](13,100.3,14.5,101.5);
    node["amenity"="place_of_worship"](13,100.3,14.5,101.5);
    node["amenity"="police"](13,100.3,14.5,101.5);
    node["amenity"="waste_disposal"](13,100.3,14.5,101.5);
    node["amenity"="marketplace"](13,100.3,14.5,101.5);
    node["amenity"="toilets"](13,100.3,14.5,101.5);
    node["amenity"="cafe"](13,100.3,14.5,101.5);
    node["amenity"="restaurant"](13,100.3,14.5,101.5);
    node["amenity"="fuel"](13,100.3,14.5,101.5);
    node["amenity"="taxi"](13,100.3,14.5,101.5);
    node["amenity"="hospital"](13,100.3,14.5,101.5);
);
out body;
"""
url = 'https://overpass-api.de/api/interpreter'
res = requests.get(url, params={'data': query})

kind_mapping = {
    'shop': {'convenience': '7-Eleven'},
    'amenity': {
        'bus_station': 'bus_station',
        'place_of_worship': 'PlaceOfWorship',
        'police': 'PoliceStation',
        'marketplace': 'marketplace',
        'toilets': 'toilets',
        'cafe': 'cafe',
        'fuel': 'gas_station',
        'taxi': 'taxi_terminal',
        'hospital': 'hospital',
        'waste_disposal': 'waste_disposal'
    }
}

if res.status_code == 200:
    data = res.json()
    if 'elements' in data:
        records = []
        for elem in data['elements']:
            tags = elem.get('tags', {})
            name = tags.get('name', 'N/A')
            lat = elem['lat']
            lon = elem['lon']

            kind = 'Other'
            
            if tags.get('shop') == 'convenience' and name == '7-Eleven':
                kind = '7-Eleven'
            elif 'amenity' in tags:
                kind = kind_mapping['amenity'].get(tags['amenity'], kind)
    
            records.append({'name': name, 'lat': lat, 'lon': lon, 'type': kind})

        # To Spark DataFrame
        df = spark.createDataFrame(records)
        df.orderBy("lon").select("name", "lat", "lon", "type").show(1)
        df.show()

        # Save single CSV
        df.coalesce(1).write.csv('combined', header=True, mode='overwrite')
        part_file = glob.glob('combined/part-*.csv')[0]
        shutil.move(part_file, 'location.csv')
        shutil.rmtree('combined')
    else:
        print("No results found.")
else:
    print(f"Error: {res.status_code}")

+-----------------+----------+-----------+--------+
|             name|       lat|        lon|    type|
+-----------------+----------+-----------+--------+
|โรงพยาบาลมหาชัย 2|13.7072628|100.3009969|hospital|
+-----------------+----------+-----------+--------+
only showing top 1 row

+----------+-----------+--------------+-----------+
|       lat|        lon|          name|       type|
+----------+-----------+--------------+-----------+
|13.7477023|100.5345542|         Manna|      Other|
|13.6512018|100.4876955|      7-Eleven|   7-Eleven|
| 13.650673|100.4882352|      7-Eleven|   7-Eleven|
|13.7185734|100.5673621|         Shell|gas_station|
|13.7282777|100.5330447|      7-Eleven|   7-Eleven|
|13.7281063|100.5331414|           Zen|      Other|
|13.7272399|100.5335418|     Pola Pola|       cafe|
|13.7284508|100.5358805|      7-Eleven|   7-Eleven|
|13.7289158|100.5355833|  Cafe Bangrak|       cafe|
|13.7288589|100.5356641|      7-Eleven|   7-Eleven|
|13.7272956|100.5364037|        Mimy's| 

In [None]:
#spark data transformation
spark = SparkSession.builder.appName("fondue").getOrCreate()
df = spark.read.csv(
    "bangkok_traffy.csv",  # Replace with your actual file path
    header=True,
    inferSchema=True,
    multiLine=True,
    quote='"',
    escape='"',
    ignoreLeadingWhiteSpace=True,
    ignoreTrailingWhiteSpace=True,
    timestampFormat="yyyy-MM-dd HH:mm:ss.SSSSSS+00"
)
sample_data = df.head(3)
sampledf = spark.createDataFrame(sample_data, df.schema)


def parse_set(value):
    if value is not None:
        return value.strip('{}').split(',')
    else:
        return [None]
parse_set_udf = udf(parse_set, ArrayType(StringType()))
df = df.withColumn("type", parse_set_udf(col("type")))

type_list = ['ห้องน้ำ','คนจรจัด','การเดินทาง','จราจร','สอบถาม','ป้ายจราจร','ทางเท้า','ท่อระบายน้ำ','ถนน','กีดขวาง','ความสะอาด',
             'สะพาน','ต้นไม้','ร้องเรียน','เสนอแนะ','เสียงรบกวน','สัตว์จรจัด','ความปลอดภัย','สายไฟ','แสงสว่าง','คลอง','น้ำท่วม','PM2.5','ป้าย']
for category in type_list:
    df = df.withColumn(category, when(array_contains(col("type"),category), 1).otherwise(0))
df = df.drop('type')

#change เสร็จสิ้น to 0,1
df = df.withColumn(
    "state",
    F.when(F.col("state") == "เสร็จสิ้น", 1)
    .when(F.col("state") == "กำลังดำเนินการ", 0)
      # keep the original value for other states
)




#timestamp
#df = df.withColumn("type", explode(df["type"]))
df = df.withColumn(
    "timestamp_category",
    when((hour(col("timestamp")) >= 5) & (hour(col("timestamp")) < 12), "Morning")
    .when((hour(col("timestamp")) >= 12) & (hour(col("timestamp")) < 17), "Afternoon")
    .when((hour(col("timestamp")) >= 17) & (hour(col("timestamp")) < 20), "Evening")
    .when((hour(col("timestamp")) >= 20) & (hour(col("timestamp")) < 24), "Night")
    .otherwise("Late Night")  #noted that it will gmt+7 as thai timezone
)
df = df.withColumn(
    "last_activity_category",
    when((hour(col("last_activity")) >= 5) & (hour(col("last_activity")) < 12), "Morning")
    .when((hour(col("last_activity")) >= 12) & (hour(col("last_activity")) < 17), "Afternoon")
    .when((hour(col("last_activity")) >= 17) & (hour(col("last_activity")) < 20), "Evening")
    .when((hour(col("last_activity")) >= 20) & (hour(col("last_activity")) < 24), "Night")
    .otherwise("Late Night")  #noted that it will gmt+7 as thai timezone
)
df = df.withColumn("timestamp_is_weekend", dayofweek(col("timestamp")).isin([1, 7]))
df = df.withColumn("last_activity_is_weekend", dayofweek(col("last_activity")).isin([1, 7]))


df.select("state").show(3)
df.filter(df.timestamp_is_weekend == True).limit(1).show()
df.groupBy('organization').count().orderBy(F.desc('count')).show(10)
df.printSchema()
print(df.count())
#how can we detect photo before and after

#df.groupBy("district").count().orderBy(F.desc("count")).show(5)
#df.groupBy("district").count().orderBy(F.asc("count")).show(5)


