# Spark Configuration

In [1]:
import numpy
import pandas
import pyproj
import shapely
import sklearn

In [2]:
print(numpy.__version__)
print(pandas.__version__)
print(pyproj.__version__)
print(shapely.__version__)
print(sklearn.__version__)

1.26.4
1.5.3
3.7.0
2.0.6
1.5.2


In [3]:
# Import libraries
import h3
import h3_pyspark
import pandas as pd
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag, lead
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, TimestampType, StringType, DateType
from datetime import datetime, timedelta



pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 50)

In [4]:
import math
import os
from functools import reduce
from math import asin, atan2, cos, degrees, floor, radians, sin, sqrt

import geopandas as gpd
import geopy
import h3
import numpy as np
import pandas as pd
import pyspark
import sedona
# import rasterio
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window

from shapely.wkt import loads

print("pandas : ", pd.__version__)
print("geopandas : ", gpd.__version__)
print("pyspark : ", pyspark.__version__)
print("sedona : ", sedona.version)

pandas :  1.5.3
geopandas :  1.0.1
pyspark :  3.3.4
sedona :  1.2.1


In [5]:
os.environ["PYSPARK_PYTHON"] = "./env/bin/python"
os.environ["HADOOP_CONF_DIR"] = "/etc/spark3/conf.cloudera.spark3_on_yarn/yarn-conf"
os.environ["HADOOP_HOME"] = "/opt/cloudera/parcels/CDH/lib/hadoop"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK3-3.3.2.3.3.7190.4-1-1.p0.51021169/lib/spark3"
os.environ["SPARK_CONF_DIR"] = "/etc/spark3/conf.cloudera.spark3_on_yarn"

conf = (
    SparkConf()
    .setMaster("yarn")
    .setAppName("data_cerdas_tourism")
    .set("spark.dynamicAllocation.maxExecutors", "100")
    .set("spark.dynamicAllocation.minExecutors", "1")
    .set("spark.executor.cores", "10")
    .set("spark.executor.memory", "64g")
    .set("spark.sql.shuffle.partitions", "7000")
    .set("spark.yarn.queue", "root.pnt.hui_pnt_bpsint")
    .set(
        "spark.yarn.appMasterEnv.PYSPARK_PYTHON",
        "./env/bin/python",
    )
    .set(
        "spark.yarn.dist.archives", "hdfs://nsdiscovery/warehouse/tablespace/external/hive/pnt_bps_int.db/envs/mobility_310.tar.gz#env"
    )
)

In [6]:
sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/25 06:55:17 WARN  util.Utils: [Thread-3]: Service 'sparkDriver' could not bind on port 30060. Attempting port 30061.
25/03/25 06:55:18 WARN  util.Utils: [Thread-3]: Service 'SparkUI' could not bind on port 30072. Attempting port 30073.
25/03/25 06:55:20 WARN  conf.HiveConf: [Thread-3]: HiveConf of name hive.metastore.runworker.in does not exist
25/03/25 06:55:20 WARN  conf.HiveConf: [Thread-3]: HiveConf of name hive.masking.algo does not exist
25/03/25 06:55:23 WARN  util.Utils: [Thread-3]: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
25/03/25 06:55:24 WARN  ipc.Client: [Thread-3]: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state st

In [7]:
spark

In [8]:
def haversine_distance(lon1, lat1, lon2, lat2):
    R = 6371.0  # Earth's radius in kilometers
    dlat = f.radians(lat2 - lat1)
    dlon = f.radians(lon2 - lon1)
    a = f.pow(f.sin(dlat/2), 2) + \
        f.cos(f.radians(lat1)) * f.cos(f.radians(lat2)) * f.pow(f.sin(dlon/2), 2)
    c = 2 * f.atan2(f.sqrt(a), f.sqrt(1-a))
    return f.round(R * c, 2)

def to_parent_res9(h3_index):
    if h3_index != None:
        h3_index = str(h3_index)
        return h3.cell_to_parent(h3_index, 9)

def to_parent_res8(h3_index):
    if h3_index != None:
        h3_index = str(h3_index)
        return h3.cell_to_parent(h3_index, 8)

def to_parent_res7(h3_index):
    if h3_index != None:
        h3_index = str(h3_index)
        return h3.cell_to_parent(h3_index, 7)

geo_to_h3 = F.udf(h3.latlng_to_cell)

In [9]:

to_parent_res9_udf = f.udf(to_parent_res9, StringType())
to_parent_res8_udf = f.udf(to_parent_res8, StringType())
to_parent_res7_udf = f.udf(to_parent_res7, StringType())

operator_name = 'TSEL'

# Tourism

In [10]:
def generate_tourism(mobility_1_trip,jak_zone,execute_month,suffix):
    
    mobility_1_trip = mobility_1_trip.filter(F.col('execute_month')==execute_month)
    mobility_1_trip = mobility_1_trip.filter(F.col('suffix')==suffix)
    
    part2 = Window.partitionBy(['msisdn']).orderBy(['start_datetime']).rowsBetween(Window.unboundedPreceding, Window.currentRow)
    part3 = Window.partitionBy(['msisdn','move_trip'])
    
    part4 = Window.partitionBy(['msisdn','move_trip']).orderBy(['start_datetime'])
    part5 = Window.partitionBy(['msisdn','move_trip']).orderBy(['start_datetime']).rowsBetween(Window.unboundedPreceding, Window.currentRow)

    part6 = Window.partitionBy(['msisdn','move_trip','move_visit'])
    part7 = Window.partitionBy(['msisdn','move_trip','move_visit_poi'])
    part7b= Window.partitionBy(['msisdn','move_trip','move_visit_event'])
    part7c= Window.partitionBy(['msisdn','move_trip','move_visit_cfd'])
    
    visit_hour_mark = 2.0
    
    ############ Mobility 2 ###############

    ## Define POI visit
    mobility_2_visit = mobility_1_trip\
                        .drop('h3_10')\
                        .withColumn('poi_id', f.when(col('poi_id').isNotNull(), col('poi_id')).otherwise('-') )\
                        .withColumn('prev_poi_id', f.lag('poi_id').over(part4))\
                        .withColumn('flag_move_poi', f.when(col('poi_id') != col('prev_poi_id'), 1).otherwise(0) )\
                        .withColumn('poi_id', f.when(col('poi_id')!='-', col('poi_id')).otherwise(None) )\
                        .withColumn('move_visit_poi', f.sum('flag_move_poi').over(part5))\
                        .withColumn('first_datetime_visit_poi', f.when(col('poi_id').isNotNull(), f.first('start_datetime').over(part7)).otherwise(None))\
                        .withColumn('last_datetime_visit_poi', f.when(col('poi_id').isNotNull(), f.last('end_datetime').over(part7)).otherwise(None))\
                        .withColumn('interval_visit_seconds_poi', f.to_timestamp(col('last_datetime_visit_poi')).cast('double') - f.to_timestamp(col('first_datetime_visit_poi')).cast('double') )\
                        .withColumn('interval_visit_hour_poi', f.round(col('interval_visit_seconds_poi')/3600, 2) )\
                        .withColumn('interval_visit_day_poi', f.datediff(col('last_datetime_visit_poi'), col('first_datetime_visit_poi')) )\
                        .withColumn('flag_visit_poi', f.when( (col('flag_env') != 1)
                              & ( (col('interval_visit_seconds_poi') >= (3600 * visit_hour_mark)) ), 1 ).otherwise(0)
                          )

    ## Define h3_7 visit and filter visit eligible only (>=2 hours or overstay-night) *in other hand non-visit removal
    mobility_2_visit = mobility_2_visit\
                        .withColumn('prev_h3_7', f.lag('h3_7').over(part4))\
                        .withColumn('flag_move_h3_7', f.when(col('h3_7') != col('prev_h3_7'), 1).otherwise(0) )\
                        .withColumn('move_visit', f.sum('flag_move_h3_7').over(part5))\
                        .withColumn('first_datetime_visit_h3_7', f.first('start_datetime').over(part6))\
                        .withColumn('last_datetime_visit_h3_7', f.last('end_datetime').over(part6))\
                        .withColumn('interval_visit_seconds', f.to_timestamp(col('last_datetime_visit_h3_7')).cast('double') - f.to_timestamp(col('first_datetime_visit_h3_7')).cast('double') )\
                        .withColumn('interval_visit_day', f.datediff(col('last_datetime_visit_h3_7'), col('first_datetime_visit_h3_7')))\
                        .withColumn('flag_visit'   , f.when( (col('flag_env') != 1)
                                    & ( (col('interval_visit_seconds') >= (3600 * visit_hour_mark)) | (col('interval_visit_day') > 0 ) )
                                    , 1 ).otherwise(0)
                                  )\
                        .filter(col('flag_visit')==1)

    # Re-define h3_7 visit ==> So if visit a h3_7 and then in the middle of the visit have to transit (<2h) to another h3_7 but return to the previous h3_7, then the Visit-LOS will be accumulated
    mobility_2_visit = mobility_2_visit\
                        .withColumn('prev_h3_7', f.lag('h3_7').over(part4))\
                        .withColumn('flag_move_h3_7', f.when(col('h3_7') != col('prev_h3_7'), 1).otherwise(0) )\
                        .withColumn('move_visit', f.sum('flag_move_h3_7').over(part5))\
                        .withColumn('first_datetime_visit_h3_7', f.first('start_datetime').over(part6))\
                        .withColumn('last_datetime_visit_h3_7', f.last('end_datetime').over(part6))\
                        .withColumn('interval_visit_seconds', f.to_timestamp(col('last_datetime_visit_h3_7')).cast('double') - f.to_timestamp(col('first_datetime_visit_h3_7')).cast('double') )\
                        .withColumn('interval_visit_day', f.datediff(col('last_datetime_visit_h3_7'), col('first_datetime_visit_h3_7')))

    # Filter only trip with visit Jakarta exist
    mobility_2_visit = mobility_2_visit\
                        .withColumn('flag_jkt', f.when(col('activity_prov')=="31", 1).otherwise(0) )\
                        .withColumn('visit_jkt', f.sum('flag_jkt').over(part3) )\
                        .filter(col('visit_jkt') > 0)
    
    
    ############ Mobility 3 ###############
    
    
    # Mobility 3 (Tourism)
    part3 = Window.partitionBy(['msisdn','move_trip'])
    
    max_day_trip = 365 # 12 month

    # Remove Usual Enviroment record from tourism calculation & set max trip duration (12 month)
    mobility_3_tourism = mobility_2_visit\
                          .filter(col('flag_env')==0)\
                            .filter(col('interval_trip_day') < max_day_trip)

    # Set visit category
    mobility_3_tourism = mobility_3_tourism\
                          .withColumn('max_los_visit', f.max('interval_visit_seconds').over(part3))\
                          .withColumn('destination_category',
                                          f.when( (col('interval_visit_seconds') >= (3600 * visit_hour_mark)) & (col('interval_visit_seconds') == col('max_los_visit')), 'main' )\
                                          .when( (col('interval_visit_seconds') >= (3600 * visit_hour_mark)), 'secondary' )\
                                          .otherwise('overstay_night')
                            )
    
    ############ Mobility 4 ###############
    mark_circular = 4

    # Filter rows where destination_category is 'main' for each trip
    main_dest = mobility_3_tourism\
                      .filter(col('destination_category') == 'main') \
                      .select('event_month','msisdn', 'move_trip', col('h3_7').alias('main_dest_h3_7'))
    main_dest = main_dest.dropDuplicates(main_dest.columns)

    # Count trip each main destination kab and mark the circular trip
    main_dest_count = main_dest\
                      .groupBy('event_month','msisdn', 'main_dest_h3_7')\
                      .agg(f.countDistinct('move_trip').alias('main_dest_h3_7_n_trip'))\
                      .withColumn('flag_circular_trip', f.when(col('main_dest_h3_7_n_trip') >= mark_circular, 1).otherwise(0))

    # Join Datasets --> inner join, so when a trip have no main destination will be removed
    cols = list(set(mobility_3_tourism.columns) - {'event_month','msisdn','move_trip'})
    mobility_4_tourism = mobility_3_tourism.alias('m3')\
                          .join(
                                main_dest.alias('md'),
                                  (col('m3.event_month') == col('md.event_month'))
                                  & (col('m3.msisdn') == col('md.msisdn'))
                                  & (col('m3.move_trip') == col('md.move_trip')),
                                how='inner'
                                )\
                            .select(col('m3.event_month'),col('m3.msisdn'),*cols,col('m3.move_trip'),'main_dest_h3_7')

    cols = list(set(mobility_4_tourism.columns) - {'event_month','msisdn','main_dest_h3_7'})
    mobility_4_tourism = mobility_4_tourism.alias('m4')\
                            .join(
                                  main_dest_count.alias('mdc'),
                                    (col('m4.msisdn') == col('mdc.msisdn'))
                                    & (col('m4.event_month') == col('mdc.event_month'))
                                    & (col('m4.main_dest_h3_7') == col('mdc.main_dest_h3_7')),
                                  how='left'
                               )\
                               .select(col('m4.event_month'),col('m4.msisdn'),*cols,col('m4.main_dest_h3_7'),'main_dest_h3_7_n_trip','flag_circular_trip')
    
    ############ Mobility 5 (Overstay) ###############
    
    part3b = Window.partitionBy(['msisdn','date'])
    part3c = Window.partitionBy(['msisdn','date']).orderBy(f.asc('interval_visit_h3_9'))
    part3d = Window.partitionBy(['msisdn','date']).orderBy(f.desc('candidate_score_prev'))
    part3e = Window.partitionBy(['msisdn','date']).orderBy(f.desc('candidate_score_next'))

    part6b = Window.partitionBy(['msisdn','move_trip','move_visit_h9'])

    mobility_overstay = mobility_4_tourism\
                      .select([
                          'msisdn', 'move_trip', 'move_visit', 'h3_7', 'h3_9', 'start_datetime', 'end_datetime'
                      ])\
                      .withColumn('prev_h3_9', f.lag('h3_9').over(part4))\
                      .withColumn('flag_move_h3_9', f.when(col('h3_9') != col('prev_h3_9'), 1).otherwise(0) )\
                      .withColumn('move_visit_h9', f.sum('flag_move_h3_9').over(part5))\
                      .withColumn('first_datetime_visit_h3_9', f.first('start_datetime').over(part6b))\
                      .withColumn('last_datetime_visit_h3_9', f.last('end_datetime').over(part6b))

    start_overstay_hours = 22
    end_overstay_hours = 3
    cols = ['msisdn','move_trip','h3_7','move_visit','move_visit_h9','h3_9','first_datetime_visit_h3_9','last_datetime_visit_h3_9']
    mobility_overstay = mobility_overstay\
                          .select(cols)\
                          .dropDuplicates(cols)\
                          .withColumn('interval_visit_h3_9', f.to_timestamp(col('last_datetime_visit_h3_9')).cast('double') - f.to_timestamp(col('first_datetime_visit_h3_9')).cast('double') )\
                          .withColumn('start_date', f.to_date(col('first_datetime_visit_h3_9')) )\
                          .withColumn('end_date', f.to_date(col('last_datetime_visit_h3_9')) )\
                          .withColumn('date_seq', f.sequence(col('start_date'), col('end_date')))\
                          .withColumn('date', f.explode(col('date_seq')))\
                          .withColumn('start_hour', f.hour(col('first_datetime_visit_h3_9')))\
                          .withColumn('end_hour', f.hour(col('last_datetime_visit_h3_9')))\
                          .withColumn('hour_seq',
                                      f.when(col('start_date')==col('end_date'), f.sequence(col('start_hour'), col('end_hour')))\
                                      .when(col('date')==col('start_date'), f.sequence(col('start_hour'), f.lit(23)))\
                                      .when(col('date')==col('end_date'), f.sequence(f.lit(0), col('end_hour')))\
                                      .otherwise(f.sequence(f.lit(0), f.lit(23))))\
                          .withColumn('rank_spent_time_x10', f.row_number().over(part3c)*10)\
                          .withColumn(
                                "prev_nhour_overstay",
                                f.size(f.expr(f"filter(hour_seq, x -> x <= {end_overstay_hours})"))
                            )\
                          .withColumn(
                                "next_nhour_overstay",
                                f.size(f.expr(f"filter(hour_seq, x -> x >= {start_overstay_hours})"))
                            )\
                          .drop('date_seq','first_date_visit_h3_9','last_date_visit_h3_9')\
                          .withColumn("candidate_score_prev", col('rank_spent_time_x10') * (col('prev_nhour_overstay')))\
                          .withColumn("candidate_score_next", col('rank_spent_time_x10') * (col('next_nhour_overstay')))\
                          .withColumn('max_candidate_prev', f.max('candidate_score_prev').over(part3b))\
                          .withColumn('max_candidate_next', f.max('candidate_score_next').over(part3b))\
                          .withColumn('rank_within_date_prev', f.rank().over(part3d)) \
                          .withColumn('overstay_h3_9_prev', f.when((col('candidate_score_prev') == col('max_candidate_prev')) &
                                (col('max_candidate_prev') > 0) &
                                (col('rank_within_date_prev') == 1),
                                col('h3_9')))\
                          .withColumn('rank_within_date_next', f.rank().over(part3e)) \
                          .withColumn('overstay_h3_9_next', f.when((col('candidate_score_next') == col('max_candidate_next')) &
                                (col('max_candidate_next') > 0) &
                                (col('rank_within_date_next') == 1),
                                col('h3_9')))
    
    mobility_overstay_daily =  mobility_overstay.groupBy('msisdn','date')\
                            .agg(f.max(col('overstay_h3_9_prev')).alias('overstay_h3_9_prev'),
                                f.max(col('overstay_h3_9_next')).alias('overstay_h3_9_next'))\
                            .join(
                              jak_zone.select([col('h3_9').alias('overstay_h3_9_prev'), col('zone_id').alias('zone_id_prev'), col('zone_category').alias('zone_category_prev')]),
                              ['overstay_h3_9_prev'], how='left'
                            )\
                            .join(
                                jak_zone.select([col('h3_9').alias('overstay_h3_9_next'), col('zone_id').alias('zone_id_next'), col('zone_category').alias('zone_category_next')]),
                                ['overstay_h3_9_next'], how='left'
                            )\
                            .select(['msisdn','date','overstay_h3_9_prev','zone_id_prev','zone_category_prev','overstay_h3_9_next','zone_id_next','zone_category_next'])

    mobility_overstay = mobility_overstay\
                        .select(['msisdn','move_trip','move_visit','date'])\
                        .join(
                            mobility_overstay_daily, ['msisdn','date'], how='left'
                        )
    
    ############ Mobility 5 ###############
    # Remove duplicate
    cols = ['event_month','msisdn','country','home_kab','home_nmkab','home_kec','home_nmkec','work_kab','home_h3_7','work_h3_7',
            'h3_7','activity_kab','activity_kec','nmkab','nmkec','flag_env',
            'move_trip', 'start_trip_datetime', 'end_trip_datetime', 'interval_trip_seconds', 'interval_trip_hour', 'interval_trip_day',
            'move_visit', 'first_datetime_visit_h3_7', 'last_datetime_visit_h3_7', 'interval_visit_seconds', 'interval_visit_day', 'flag_visit',
            'h3_7_poi', 'poi_id', 'poi',
            'move_visit_poi', 'first_datetime_visit_poi', 'last_datetime_visit_poi', 'interval_visit_seconds_poi', 'interval_visit_hour_poi', 'interval_visit_day_poi', 'flag_visit_poi',
            'main_dest_h3_7', 'main_dest_h3_7_n_trip', 'destination_category', 'flag_circular_trip'
            ]

    mobility_5_tourism = mobility_4_tourism\
                          .select(cols)\
                          .dropDuplicates(cols)\
                          .repartition(1000)\
                          .join(
                              mobility_overstay.repartition(1000), ['msisdn','move_trip','move_visit'], how='right'
                          )
    
    ############ Mobility 6 ###############
    mobility_6_tourism = mobility_5_tourism\
                          .withColumn('visit_overstay_night', f.when( col('interval_visit_day') > 0 , 1 ).otherwise(0))\
                          .withColumn('event_month', f.format_string("%04d-M%02d", f.year(col('last_datetime_visit_h3_7')), f.month(col('last_datetime_visit_h3_7'))) )

    mobility_6_tourism = mobility_6_tourism.withColumn('suffix',F.substring('msisdn',-1,1))
    mobility_6_tourism = mobility_6_tourism.withColumn('execute_month',F.substring(F.lit(execute_month),1,7))
    mobility_6_tourism = mobility_6_tourism.filter(F.col('event_month')==execute_month[:4]+'-M'+execute_month[-2:])
    return mobility_6_tourism

# Parameter - Ganti Bulan Di sini

In [11]:
mobility_1_trip = spark.read.table('pnt_bps_int_stg.data_cerdas_mobility_trip_sample')
execute_month = '2024-11'
suffix = '0'

jak_zone = pd.read_csv('data/jak_zone.csv')
jak_zone = spark.createDataFrame(jak_zone)

tourism = generate_tourism(mobility_1_trip,jak_zone,execute_month,suffix)

25/03/25 06:55:54 WARN  conf.HiveConf: [Thread-3]: HiveConf of name hive.metastore.runworker.in does not exist
25/03/25 06:55:54 WARN  conf.HiveConf: [Thread-3]: HiveConf of name hive.masking.algo does not exist
25/03/25 06:55:54 WARN  client.HiveClientImpl: [Thread-3]: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = f17ef94d-c649-4cb1-9a3f-ace7ad3f6a07
25/03/25 06:55:56 WARN  conf.HiveConf: [Thread-3]: HiveConf of name hive.metastore.runworker.in does not exist
25/03/25 06:55:56 WARN  conf.HiveConf: [Thread-3]: HiveConf of name hive.masking.algo does not exist
  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


# Looping

In [12]:
import logging

# Set up logging
logging.basicConfig(filename='logs/logging_mobility_tourism_sample.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Example usage
logging.info('This is an info message.')
logging.info('Another info message.')

In [13]:
import time

In [14]:
for i in range(0,10):
    suffix = str(i) 
    logging.info(f'PROCESS EVENT MONTH: {execute_month}, SUFFIX: {suffix}')
    tourism = generate_tourism(mobility_1_trip,jak_zone,execute_month,suffix)
    tourism.repartition(100).write.partitionBy('execute_month','event_month','suffix').mode('append').saveAsTable('pnt_bps_int_stg.data_cerdas_tourism_sample')

25/03/25 06:56:18 WARN  util.package: [Thread-3]: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/03/25 06:58:32 WARN  conf.HiveConf: [SparkCatalogEventProcessor-thread]: HiveConf of name hive.metastore.runworker.in does not exist
25/03/25 06:58:32 WARN  conf.HiveConf: [SparkCatalogEventProcessor-thread]: HiveConf of name hive.masking.algo does not exist
25/03/25 06:58:33 WARN  sql.CommandsHarvester$: [SparkExecutionPlanProcessor-thread]: Missing unknown leaf node: LogicalRDD [Unnamed: 0#96L, h3_8#97, h3_9#98, zone_id#99, zone_code#100, zone_category#101, h3_population_constrained#102L, n_accomadation#103L, unique_accomadation_category#104L], false

25/03/25 06:58:33 WARN  sql.CommandsHarvester$: [SparkExecutionPlanProcessor-thread]: Missing unknown leaf node: LogicalRDD [Unnamed: 0#6611L, h3_8#6612, h3_9#6613, zone_id#6614, zone_code#6615, zone_category#6616, h3_population_constraine

## Sanity Check -  Mobility Tourism

In [15]:
q = """SHOW PARTITIONS pnt_bps_int_stg.data_cerdas_tourism_sample"""
spark.sql(q).toPandas().tail(11)

25/03/25 07:16:00 WARN  sql.CommandsHarvester$: [SparkExecutionPlanProcessor-thread]: Missing unknown leaf node: LogicalRDD [Unnamed: 0#96L, h3_8#97, h3_9#98, zone_id#99, zone_code#100, zone_category#101, h3_population_constrained#102L, n_accomadation#103L, unique_accomadation_category#104L], false

25/03/25 07:16:00 WARN  sql.CommandsHarvester$: [SparkExecutionPlanProcessor-thread]: Missing unknown leaf node: LogicalRDD [Unnamed: 0#43225L, h3_8#43226, h3_9#43227, zone_id#43228, zone_code#43229, zone_category#43230, h3_population_constrained#43231L, n_accomadation#43232L, unique_accomadation_category#43233L], false



Unnamed: 0,partition
0,execute_month=2024-11/event_month=2024-M11/suf...
1,execute_month=2024-11/event_month=2024-M11/suf...
2,execute_month=2024-11/event_month=2024-M11/suf...
3,execute_month=2024-11/event_month=2024-M11/suf...
4,execute_month=2024-11/event_month=2024-M11/suf...
5,execute_month=2024-11/event_month=2024-M11/suf...
6,execute_month=2024-11/event_month=2024-M11/suf...
7,execute_month=2024-11/event_month=2024-M11/suf...
8,execute_month=2024-11/event_month=2024-M11/suf...
9,execute_month=2024-11/event_month=2024-M11/suf...


In [16]:
tourism = spark.read.table('pnt_bps_int_stg.data_cerdas_tourism_sample')
# tourism = tourism.filter(F.col('execute_month')=='2024-12')
tourism = tourism.filter(F.col('execute_month')==F.concat(F.substring('event_month',1,4),F.lit('-'),F.substring('event_month',7,2))) 

In [17]:
%%time
cek = tourism.groupBy('event_month').agg(
    F.countDistinct('msisdn').alias('num_msisdn'),
    F.countDistinct('msisdn','move_trip').alias('num_trip'),
    F.countDistinct('msisdn','move_trip','move_visit').alias('num_visit'),
    F.count('msisdn').alias('num_row'),
).toPandas()



CPU times: user 24.4 ms, sys: 8.58 ms, total: 33 ms
Wall time: 8.73 s


                                                                                

In [18]:
cek.sort_values('event_month')

Unnamed: 0,event_month,num_msisdn,num_trip,num_visit,num_row
0,2024-M11,347798,1438390,3163030,32237636


In [19]:
tourism = spark.read.table('pnt_bps_int_stg.data_cerdas_mobility_trip_sample')
# tourism = tourism.filter(F.col('execute_month')=='2024-12')
tourism = tourism.filter(F.col('execute_month')==F.concat(F.substring('event_month',1,4),F.lit('-'),F.substring('event_month',7,2))) 

In [20]:
%%time
cek = tourism.groupBy('event_month').agg(
    F.countDistinct('msisdn').alias('num_msisdn'),
    F.countDistinct('msisdn','move_trip').alias('num_trip'),
    F.count('msisdn').alias('num_row'),
).toPandas()



CPU times: user 39.2 ms, sys: 5.42 ms, total: 44.6 ms
Wall time: 12.5 s


                                                                                

In [21]:
cek.sort_values('event_month')

Unnamed: 0,event_month,num_msisdn,num_trip,num_row
0,2024-M11,4436394,62418589,380470326
