In [1]:
import json
from typing import Dict, Set

import pandas as pd
import yaml
from IPython.display import display

pd.set_option("display.max_rows", 50)
pd.set_option("display.max_columns", 50)

psg_directory = "../resources/"
psg_data_file = "psgc_2025-08-07.csv"

df = pd.read_csv(psg_directory + psg_data_file)

df["psgc_id"] = df["psgc_id"].astype(str).str.zfill(10)
df = df.map(lambda x: x.strip() if isinstance(x, str) else x)

geographic_level_map = {
    "Reg": "region",
    "City": "city",
    "Mun": "municipality",
    "Prov": "province",
    "SubMun": "submunicipality",
    "Bgy": "barangay",
}
df["geographic_level"] = df["geographic_level"].replace(geographic_level_map)

df["barangay_code"] = df["psgc_id"].str[-3:]
df["municipality_or_city_code"] = df["psgc_id"].str[-5:-3]
df["province_or_highly_urbanized_city_code"] = df["psgc_id"].str[-8:-5]
df["region_code"] = df["psgc_id"].str[-10:-8]

df["barangay_mapper"] = df["psgc_id"].str[-10:]
df["municipality_or_city_mapper"] = df["psgc_id"].str[-10:-3]
df["province_or_highly_urbanized_city_mapper"] = df["psgc_id"].str[-10:-5]
df["region_mapper"] = df["psgc_id"].str[-10:-8]

regions_filter = (
    (df["province_or_highly_urbanized_city_code"] == "000")
    & (df["municipality_or_city_code"] == "00")
    & (df["barangay_code"] == "000")
)
regions_mapper = (
    df.loc[regions_filter, ["region_mapper", "name"]]
    .sort_values("region_mapper")
    .set_index("region_mapper", drop=True)
    .to_dict()["name"]
)

province_or_huc_filter = (
    ~(df["province_or_highly_urbanized_city_code"] == "000")
    & (df["municipality_or_city_code"] == "00")
    & (df["barangay_code"] == "000")
)

province_or_huc_mapper = (
    df.loc[province_or_huc_filter, ["province_or_highly_urbanized_city_mapper", "name"]]
    .sort_values("province_or_highly_urbanized_city_mapper")
    .set_index("province_or_highly_urbanized_city_mapper")
    .to_dict()["name"]
)

municipal_or_city_filter = (
    ~(df["province_or_highly_urbanized_city_code"] == "000")
    & ~(df["municipality_or_city_code"] == "00")
    & (df["barangay_code"] == "000")
)

municipal_or_city_mapper = (
    df.loc[municipal_or_city_filter, ["municipality_or_city_mapper", "name"]]
    .sort_values("municipality_or_city_mapper")
    .set_index("municipality_or_city_mapper")
    .to_dict()["name"]
)

df["region"] = df["region_mapper"].map(regions_mapper)
df["province_or_highly_urbanized_city"] = df["province_or_highly_urbanized_city_mapper"].map(province_or_huc_mapper)
df["municipality_or_city"] = df["municipality_or_city_mapper"].map(
    municipal_or_city_mapper
)

barangay_df = df[df["geographic_level"] == "barangay"].reset_index(drop=True)

# Forging SQLite dump

In [2]:
import numpy as np

In [3]:
barangay_df["correspondence_code"] = barangay_df["correspondence_code"].astype("Int64")
not_empty = (barangay_df["correspondence_code"].notna())
barangay_df.loc[not_empty, "correspondence_code"] = barangay_df[not_empty]["correspondence_code"].astype(str).str.zfill(9).replace("<NA>", np.nan)
barangay_df["population"] = pd.to_numeric(barangay_df["population"].str.replace(",","").str.replace('-','')).astype("Int64")
barangay_df["settlement_type"] = barangay_df["settlement_type"].replace("U","urban").replace("R","rural")
barangay_df = barangay_df.rename({"Unnamed: 9":"psgc_extras","old_names":"legacy_name","correspondence_code":"legacy_psgc_id"}, axis=1)

In [4]:
barangay_df[barangay_df["geographic_level"]=="barangay"].head(3)

Unnamed: 0,psgc_id,name,legacy_psgc_id,geographic_level,legacy_name,city_class,income_classification,settlement_type,population,psgc_extras,barangay_status,barangay_code,municipality_or_city_code,province_or_highly_urbanized_city_code,region_code,barangay_mapper,municipality_or_city_mapper,province_or_highly_urbanized_city_mapper,region_mapper,region,province_or_highly_urbanized_city,municipality_or_city
0,1380100001,Barangay 1,137501001,barangay,,,,urban,2319,,,1,0,801,13,1380100001,1380100,13801,13,National Capital Region (NCR),City of Caloocan,
1,1380100002,Barangay 2,137501002,barangay,,,,urban,5156,,,2,0,801,13,1380100002,1380100,13801,13,National Capital Region (NCR),City of Caloocan,
2,1380100003,Barangay 3,137501003,barangay,,,,urban,2497,,,3,0,801,13,1380100003,1380100,13801,13,National Capital Region (NCR),City of Caloocan,


In [5]:
barangay_df[barangay_df["geographic_level"]=="barangay"]["income_classification"].value_counts(dropna=False)

income_classification
NaN    42011
Name: count, dtype: int64

In [6]:
barangay_df["region_mapper"] = barangay_df["region_mapper"].str.ljust(10, "0")
barangay_df["province_or_highly_urbanized_city_mapper"] = barangay_df["province_or_highly_urbanized_city_mapper"].str.ljust(10, "0")
barangay_df["municipality_or_city_mapper"] = barangay_df["municipality_or_city_mapper"].str.ljust(10, "0")


In [7]:
col_ord = [
    "psgc_id",
    "name",
    "geographic_level",
    "settlement_type",
    "population",
    "psgc_extras",
    "barangay_status",
    "barangay_code",
    "barangay_mapper",
    "municipality_or_city_code",
    "municipality_or_city_mapper",
    "province_or_highly_urbanized_city_code",
    "province_or_highly_urbanized_city_mapper",
    "region_code",
    "region_mapper",
    "legacy_psgc_id",
    "legacy_name",

]
barangay_df[col_ord].sample(10)

Unnamed: 0,psgc_id,name,geographic_level,settlement_type,population,psgc_extras,barangay_status,barangay_code,barangay_mapper,municipality_or_city_code,municipality_or_city_mapper,province_or_highly_urbanized_city_code,province_or_highly_urbanized_city_mapper,region_code,region_mapper,legacy_psgc_id,legacy_name
38933,1606801017,San Jose,barangay,rural,1154,,,17,1606801017,1,1606801000,68,1606800000,16,1600000000,166801017,
26063,701235024,San Vicente,barangay,rural,910,,,24,701235024,35,701235000,12,701200000,7,700000000,71235024,
15757,1704005052,Tagum,barangay,rural,994,,,52,1704005052,5,1704005000,40,1704000000,17,1700000000,174005052,
6598,201513056,Bolos Point,barangay,rural,919,,,56,201513056,13,201513000,15,201500000,2,200000000,21513056,
24223,1804517009,Masulog,barangay,rural,3813,,,9,1804517009,17,1804517000,45,1804500000,18,1800000000,64517009,
34829,1004201023,Monterico,barangay,rural,556,,,23,1004201023,1,1004201000,42,1004200000,10,1000000000,104201023,
25875,701229005,Bonbon Lower,barangay,rural,222,,,5,701229005,29,701229000,12,701200000,7,700000000,71229005,
32786,907308022,Mahayahay,barangay,rural,1292,,,22,907308022,8,907308000,73,907300000,9,900000000,97308022,
3078,102809018,Medina,barangay,rural,1298,,,18,102809018,9,102809000,28,102800000,1,100000000,12809018,
1936,1400120001,Bazar,barangay,rural,514,,,1,1400120001,20,1400120000,1,1400100000,14,1400000000,140120001,


In [8]:
barangay_table = barangay_df[col_ord]

In [9]:
barangay_table.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 42011 entries, 0 to 42010
Data columns (total 17 columns):
 #   Column                                    Non-Null Count  Dtype 
---  ------                                    --------------  ----- 
 0   psgc_id                                   42011 non-null  object
 1   name                                      42011 non-null  object
 2   geographic_level                          42011 non-null  object
 3   settlement_type                           42011 non-null  object
 4   population                                41988 non-null  Int64 
 5   psgc_extras                               33 non-null     object
 6   barangay_status                           2773 non-null   object
 7   barangay_code                             42011 non-null  object
 8   barangay_mapper                           42011 non-null  object
 9   municipality_or_city_code                 42011 non-null  object
 10  municipality_or_city_mapper               4201

In [10]:
import sqlite3

database_name: str = "psgc.db"
conn = sqlite3.connect(database_name)

In [11]:
table_name = "barangay"
barangay_df[col_ord].to_sql(name=table_name,con=conn, if_exists='replace', index=False)

42011

In [12]:
import os
from dotenv import load_dotenv

load_dotenv()


path_string = "abfss://{container}@{storage_account}.dfs.core.windows.net/{path}"


container = os.getenv("container")
storage_account = os.getenv("storage_account")
path = os.getenv("cloud_path")
sas_token = os.getenv("sas_token")
storage_key = os.getenv("storage_key")

azure_path = path_string.format(
    container=container, storage_account=storage_account, path=path
)

In [13]:
azure_path

'abfss://geography@papdwh.dfs.core.windows.net/gold/barangay/'

In [14]:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

builder = (
    SparkSession.builder.appName("DeltaWriter")
    .config(
        "spark.jars.packages",
        "org.apache.hadoop:hadoop-azure:3.3.4,"
        "org.apache.hadoop:hadoop-azure-datalake:3.3.4,"
        "com.microsoft.azure:azure-storage:8.6.6,"
        "com.azure:azure-storage-common:12.24.0,"
        "io.delta:delta-spark_2.13:4.0.0",
    )
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
)

spark = builder.getOrCreate()
# spark.sparkContext._jsc.hadoopConfiguration().set(
#     f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "SAS"
# )
# spark.sparkContext._jsc.hadoopConfiguration().set(
#     f"fs.azure.sas.token.provider.type.{storage_account}.dfs.core.windows.net",
#     "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider",
# )
# spark.sparkContext._jsc.hadoopConfiguration().set(
#     f"fs.azure.sas.fixed.token.{storage_account}.dfs.core.windows.net",
#     sas_token,
# )
spark.sparkContext._jsc.hadoopConfiguration().set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", f"{storage_key}"
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/08/18 19:45:32 WARN Utils: Your hostname, daisuke, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/18 19:45:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/hawitsu/.cache/pypoetry/virtualenvs/barangay-doYNjmMH-py3.12/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/hawitsu/.ivy2.5.2/cache
The jars for the packages stored in: /home/hawitsu/.ivy2.5.2/jars
org.apache.hadoop#hadoop-azure added as a dependency
org.apache.hadoop#hadoop-azure-datalake added as a dependency
com.microsoft.azure#azure-storage added as a dependency
com.azure#azure-storage-common added as a dependency
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-622f717a-f278-4002-8

In [15]:
spark_df = spark.createDataFrame(barangay_table)
spark_df.show()

+----------+-----------+----------------+---------------+----------+-----------+---------------+-------------+---------------+-------------------------+---------------------------+--------------------------------------+----------------------------------------+-----------+-------------+--------------+-----------+
|   psgc_id|       name|geographic_level|settlement_type|population|psgc_extras|barangay_status|barangay_code|barangay_mapper|municipality_or_city_code|municipality_or_city_mapper|province_or_highly_urbanized_city_code|province_or_highly_urbanized_city_mapper|region_code|region_mapper|legacy_psgc_id|legacy_name|
+----------+-----------+----------------+---------------+----------+-----------+---------------+-------------+---------------+-------------------------+---------------------------+--------------------------------------+----------------------------------------+-----------+-------------+--------------+-----------+
|1380100001| Barangay 1|        barangay|          urban| 

                                                                                

In [16]:
spark_df.write.save(azure_path, "delta","overwrite")

25/08/18 19:45:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/08/18 19:45:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/08/18 19:45:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/08/18 19:45:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/08/18 19:45:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
25/08/18 19:45:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
25/08/18 19:45:41 WARN MemoryManager: Total allocation exceeds 95.