In [2]:
import os
import logging
import multiprocessing
from datetime import datetime
# import traceback

import pandas as pd
import polars as pl
import geopandas as gpd

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count_distinct, broadcast, expr

from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

In [3]:
logging.captureWarnings(True)
num_cores = multiprocessing.cpu_count()
os.environ["POLARS_NUM_THREADS"] = str(num_cores//2)

In [4]:
spark = (
    SparkSession
    .builder
    .master("local[*]")
    .appName("Sedona App")
    .config("spark.serializer", KryoSerializer.getName)
    .config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
    .config("spark.driver.memory", "15g")
    .getOrCreate()
)

In [5]:
SedonaRegistrator.registerAll(spark)

True

In [5]:
# PREDEFINED VARIABLES
ORIGINAL_CRS = 4326
PROJECTED_CRS = 3857  # WGS84 Web Mercator (Auxiliary Sphere)
BUFFER_RADIUS = 10  # in meter

# People movement
START_DATE = datetime(2023, 1, 24, 7, 0, 0)  # year, month, date, GMT+7
END_DATE = datetime(2023, 2, 23, 7, 0, 0)

# Kantor Cabang

In [5]:
kc = spark.\
    read.\
    option("header", "true").\
    csv("kantorcabang.csv")

kc.createOrReplaceTempView("kc")
kc.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- No_: string (nullable = true)
 |-- Nama: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- geometry: string (nullable = true)



In [6]:
kc_buffer = spark.sql(
    f"""
    SELECT
        CAST(No_ AS INT) AS no,
        Nama,
        ST_Transform(
            ST_Buffer(
                ST_Transform(
                    ST_POINT(Latitude, Longitude)
                    , 'EPSG:{ORIGINAL_CRS}', 'EPSG:{PROJECTED_CRS}'
                ), 
                5000
            )
            , 'EPSG:{PROJECTED_CRS}', 'EPSG:{ORIGINAL_CRS}'
        ) AS kc_buffer_geom
    FROM
        kc
    """
)
kc_buffer.createOrReplaceTempView("kc_buffer")
kc_buffer.show()

+---+--------------------+--------------------+
| no|                Nama|      kc_buffer_geom|
+---+--------------------+--------------------+
|  1|Pasar Induk Krama...|POLYGON ((-6.2952...|
|  2|Bursa Efek Jakart...|POLYGON ((-6.2233...|
+---+--------------------+--------------------+



# POI

In [6]:
poi = (
    spark
    .read
    .option("header", "true")
    .parquet(r"path\to\*poi.parquet")
)

poi.createOrReplaceTempView("poi")
poi.printSchema()
# poi.show(5)

root
 |-- PlaceID: string (nullable = true)
 |-- Official_Name: string (nullable = true)
 |-- Primary_Category_ID: string (nullable = true)
 |-- Primary_Category_Name: string (nullable = true)
 |-- Primary_Food_Type_Name: string (nullable = true)
 |-- Chain_Name: string (nullable = true)
 |-- House_Number: string (nullable = true)
 |-- Full_Street_Name: string (nullable = true)
 |-- Admin_Level_2: string (nullable = true)
 |-- Admin_Level_3: string (nullable = true)
 |-- Admin_Level_4: string (nullable = true)
 |-- Postal_Code: double (nullable = true)
 |-- Display_Latitude: double (nullable = true)
 |-- Display_Longitude: double (nullable = true)
 |-- Phone: double (nullable = true)
 |-- TollFree_Phone: string (nullable = true)
 |-- Mobile_Phone: double (nullable = true)
 |-- URL: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Credit_CardAccepted: string (nullable = true)
 |-- Credit_Card_Type: string (nullable = true)
 |-- Place_Description_Text: string (nullable 

In [7]:
poi_buffer = spark.sql(f"""
SELECT
    PlaceID as place_id,
    Official_Name as official_name,
    Display_Longitude as longitude, 
    Display_Latitude as latitude, 
    ST_FlipCoordinates(ST_Transform(
        ST_Buffer(
            ST_Transform(
                ST_FlipCoordinates(ST_POINT(Display_Longitude, Display_Latitude))
                , 'EPSG:{ORIGINAL_CRS}', 'EPSG:{PROJECTED_CRS}'
            ), 
            {BUFFER_RADIUS}
        )
        , 'EPSG:{PROJECTED_CRS}', 'EPSG:{ORIGINAL_CRS}'
    )) AS poi_buffer_geom
FROM
    poi
""")
poi_buffer.createOrReplaceTempView("poi_buffer")
poi_buffer.printSchema()
# poi_buffer.show(5)

root
 |-- place_id: string (nullable = true)
 |-- official_name: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- poi_buffer_geom: geometry (nullable = true)



# People Movement

In [8]:
pm = (
    spark
    .read
    .option("header", "true")
    .parquet(r"path\to\*_pm_raw.parquet")
)

pm.createOrReplaceTempView("pm")
pm.printSchema()
# pm.show(5)

root
 |-- OID_: long (nullable = true)
 |-- Hashed_Device_ID: string (nullable = true)
 |-- Lat_of_Observation_Point: double (nullable = true)
 |-- Local_Date_of_Observation_Point: string (nullable = true)
 |-- Local_Day_of_Week_of_Observation_Point: string (nullable = true)
 |-- Local_Time_of_Day_of_Observation_Point: string (nullable = true)
 |-- Local_Timezone_of_Observation_Point: string (nullable = true)
 |-- Lon_of_Observation_Point: double (nullable = true)
 |-- Polygon_ID: string (nullable = true)
 |-- Unix_Timestamp_of_Observation_Point: long (nullable = true)



In [9]:
pm_g = spark.sql(f"""--sql
SELECT
    Hashed_Device_ID AS device_id,
    CAST(Local_Date_of_Observation_Point AS Date) AS obs_date,
    ST_POINT(Lon_of_Observation_Point, Lat_of_Observation_Point, {ORIGINAL_CRS}) AS pm_coordinate
FROM
    pm
WHERE
    TRUE
    AND Unix_Timestamp_of_Observation_Point BETWEEN {int(START_DATE.timestamp())} AND {int(END_DATE.timestamp())};
""")
pm_g.createOrReplaceTempView("pm_g")
# pm_g.show(5)

# Counting Footfall

## Straigforward

In [26]:
# Spatial join between poi and people movement data
spatial_join = spark.sql("""
SELECT
    poi_buffer.place_id,
    poi_buffer.official_name,
    poi_buffer.longitude,
    poi_buffer.latitude,
    pm_g.device_id,
    pm_g.obs_date
FROM
    poi_buffer, pm_g
WHERE
    TRUE
    AND ST_Contains(poi_buffer.poi_buffer_geom, pm_g.pm_coordinate)
""")
spatial_join.createOrReplaceTempView("spatial_join")
spatial_join.printSchema()
# spatial_join.show()

root
 |-- place_id: string (nullable = true)
 |-- official_name: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- device_id: string (nullable = true)
 |-- obs_date: date (nullable = true)



In [29]:
# last run: 5 mins
footfall_count = spark.sql(
    """
    SELECT
        DISTINCT place_id,
        official_name,
        COUNT(DISTINCT device_id) as num_of_visit
    FROM 
        spatial_join
    GROUP BY
        1,2
    ORDER BY
        place_id
    """
)
footfall_count.printSchema()
# footfall_count.show(5)

root
 |-- place_id: string (nullable = true)
 |-- official_name: string (nullable = true)
 |-- num_of_visit: long (nullable = false)



In [31]:
# export to parquet, can also convert to pandas first
# footfall_count.repartition(1).write.format("parquet").mode("append").save("footfall.parquet")
footfall_count_df = footfall_count.toPandas()#.to_parquet("footfall.parquet")
footfall_count_df.to_parquet('result/temp.parquet')
footfall_count_df.head(3)

## With GeoParquet

In [6]:
if not os.path.exists("result/poi_buffer_geohashed.parquet"):
    poi_buffer_geohashed = spark.sql(f"""
    SELECT
        *
        , ST_GeoHash(poi_buffer_geom, 33) as geohash
    FROM
        poi_buffer
    ORDER BY
    geohash
    """)
    poi_buffer_geohashed.write.format("geoparquet").save("result" + "/poi_buffer_geohashed.parquet")
else:
    poi_buffer_geohashed = (
        spark
        .read
        .option("header", "true")
        .parquet("result/poi_buffer_geohashed.parquet")
    )

poi_buffer_geohashed.createOrReplaceTempView("poi_buffer_geohashed")
poi_buffer_geohashed.printSchema()
# poi_buffer.show(5)

root
 |-- place_id: string (nullable = true)
 |-- official_name: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- poi_buffer_geom: geometry (nullable = true)
 |-- geohash: string (nullable = true)



In [7]:
if not os.path.exists("result/pm_geohashed.parquet"):
    pm_geohashed = spark.sql(f"""
    SELECT
        *
        , ST_GeoHash(pm_coordinate, 33) as geohash
    FROM
        pm_g
    ORDER BY
        geohash
    """)
    pm_geohashed.write.format("geoparquet").save("result" + "/pm_geohashed.parquet")
else:
    pm_geohashed = (
        spark
        .read
        .option("header", "true")
        .parquet("result/pm_geohashed.parquet")
    )

pm_geohashed.createOrReplaceTempView("pm_geohashed")
pm_geohashed.printSchema()

root
 |-- device_id: string (nullable = true)
 |-- obs_date: date (nullable = true)
 |-- pm_coordinate: geometry (nullable = true)
 |-- geohash: string (nullable = true)



In [9]:
if not os.path.exists("result/spatial_join.parquet"):
    spatial_join = (
        poi_buffer_geohashed.alias("poi_buffer")
        .join(
            pm_geohashed.alias("pm_g"),
            expr("ST_Contains(poi_buffer.poi_buffer_geom, pm_g.pm_coordinate)")
        ).select(
            "poi_buffer.place_id",
            "poi_buffer.official_name",
            "poi_buffer.longitude",
            "poi_buffer.latitude",
            "poi_buffer.geohash",
            "pm_g.device_id",
            "pm_g.obs_date"
        ).orderBy(
            "poi_buffer.geohash"
        )
    )
else:
    spatial_join = (
        spark
        .read
        .option("header", "true")
        .parquet("result/spatial_join.parquet")
    )

# spatial_join.write.format("geoparquet").save("result" + "/spatial_join.parquet")
spatial_join.printSchema()
# spatial_join.show()

root
 |-- place_id: string (nullable = true)
 |-- official_name: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- geohash: string (nullable = true)
 |-- device_id: string (nullable = true)
 |-- obs_date: date (nullable = true)



In [11]:
# Perform the equivalent PySpark operations
footfall_count = (
    spatial_join.select("place_id", "official_name", "device_id")
    .groupBy("place_id", "official_name")
    .agg(count_distinct("device_id").alias("num_of_visit"))
    .orderBy("num_of_visit")
)

footfall_count.show(5)

## With Polars

In [None]:
spatial_join = (
    spark
    .read
    .option("header", "true")
    .parquet("result/spatial_join.parquet")
)

In [6]:
polars_spatial_join_df = pl.read_parquet("result/spatial_join.parquet/*.parquet")

In [None]:
footfall_count = polars_spatial_join_df.groupby(["place_id"]).agg(pl.col("device_id").n_unique().alias("unique_device_count"))
footfall_count.head()

In [None]:
# # Spatial join between poi and people movement data
# spatial_join = spark.sql("""
# SELECT
#     poi_buffer.place_id,
#     poi_buffer.official_name,
#     poi_buffer.longitude,
#     poi_buffer.latitude,
#     pm_g.device_id,
#     pm_g.obs_date
# FROM
#     poi_buffer, pm_g
# WHERE
#     TRUE
#     AND ST_Contains(poi_buffer.poi_buffer_geom, pm_g.pm_coordinate)
# """)
# spatial_join.createOrReplaceTempView("spatial_join")
# spatial_join.printSchema()

In [27]:
# spatial_join.write.format("parquet").mode("overwrite").save("result/spatial_join.parquet")

# Placeholder

In [30]:
# # Perform the equivalent PySpark operations
# footfall_count = (
#     spatial_join.select("place_id", "official_name", "device_id")
#     .groupBy("place_id", "official_name")
#     .agg(count_distinct("device_id").alias("num_of_visit"))
#     .orderBy("place_id")
# )

# # footfall_count.show(5)

In [13]:
# poi_buffer.alias("poi_buffer").join(broadcast(pm_g).alias("pm_g"), expr("ST_Contains(poi_buffer.poi_buffer_geom, pm_g.pm_coordinate)")).show()

In [78]:
# pm_g.write.format("geoparquet").save("result" + "/pm_geohashed.parquet")

In [21]:
# # Export to csv
# footfall_count.write.mode("overwrite").csv("footfall.csv", header=True)

In [None]:
# export to shp
