In [0]:
%run ../residents/_udf_utils_residents

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col, regexp_extract, make_date, lit, when

In [0]:
spark = SparkSession.builder.appName("silver_visitors_agegroup_nationality").getOrCreate()

In [0]:
# read data
bronze_df = spark.table("workspace.growth_poc.bronze_visitors_agegroup_nationality")

bronze_df.printSchema()

In [0]:
%sql
select * from workspace.growth_poc.bronze_visitors_agegroup_nationality limit 10 

In [0]:
# read data
bronze_df = spark.table("workspace.growth_poc.bronze_visitors_agegroup_nationality")

key_columns = ["연령", "국적"]
# get all non-key columns -> will be used for dynamic sql
columns_to_be_unpivoted = ["`"+c+"`" for c in bronze_df.columns if c not in key_columns]

# build the dynamic column list 
unpivot_columns = ", ".join(columns_to_be_unpivoted)

dynamic_sql = f"""
SELECT REPLACE(`연령`, " ", "") AS age_group,
    REPLACE(`국적`, " ", "") AS nationality,
    visit_month_kor,
    amount
FROM workspace.growth_poc.bronze_visitors_agegroup_nationality v 
UNPIVOT (
    amount for visit_month_kor in ({unpivot_columns})
)
"""

# Execute the dynamic SQL
unpivoted_df = spark.sql(dynamic_sql).filter((col("visit_month_kor") != "계") & \
                                            (~col("nationality").isin(["전체"])) & \
                                            (~col("age_group").isin(["승무원", "전체", "소계"]))) # exclude total value


In [0]:
clean_date_df = unpivoted_df.select(
    "*",
    regexp_extract(col("visit_month_kor"), r"^(\d{4})", 1).alias("visit_year"),
    regexp_extract(col("visit_month_kor"), r"\D+(\d{2})", 1).alias("visit_month"),
    regexp_extract(col("age_group"), r"^(\d{2}|\d{1})", 1).alias("min_age"),
    when(col("age_group").endswith("~"), lit(80)) \
        .otherwise(regexp_extract(col("age_group"), r"(\d{2})$", 1)).alias("max_age")
 ).withColumn("visit_date", make_date(col("visit_year"), col("visit_month"), lit(1)))

In [0]:
# clean nationality and map with country code and language code
clean_nationality_df = clean_nationality(clean_date_df)
#clean_nationality_df.printSchema()
mapping_df = map_nationality(clean_nationality_df)

# remove grouping values
nationality_to_remove = [
"미주", 
"아프리카주",
"아시아주",
"구주",
"중동",
"전체",
"대양주",
]

remove_grouping_df = mapping_df.filter(~col("Nationality").isin(nationality_to_remove))

In [0]:
clean_agegroup_df = remove_grouping_df.withColumn("mid_age", (col("min_age")+col("max_age"))/2)

In [0]:
# add timestamp
final_df = clean_agegroup_df.withColumn("TimeStamp", current_timestamp())

final_df.write \
        .format("delta") \
        .mode("overwrite")\
        .option("mergeSchema", True)\
        .saveAsTable("workspace.growth_poc.silver_visitors_agegroup_nationality")