### Silver Notebook
This notebook reads data from bronze tables, performs transformations and then stores them in silver tables

### Lets define the scehma for our json object in order for spark to process it from string

In [0]:
from pyspark.sql.functions import col, expr, from_json, explode, posexplode, cast, row_number
from pyspark.sql.types import StructType, StructField, MapType, StringType, ArrayType, IntegerType
from pyspark.sql import Window


schema = StructType([
    StructField("dataset", StructType([
        StructField("dimension", StructType([
            StructField("id", ArrayType(StringType()), True),
            StructField("size", ArrayType(IntegerType()), True),
            StructField("measure", StructType([
                StructField("category", StructType([
                    StructField("index", MapType(StringType(), StringType()), True),
                    StructField("label", MapType(StringType(), StringType()), True)
                ]))
            ])),

            StructField("alue", StructType([
                StructField("category", StructType([
                    StructField("index", MapType(StringType(), StringType()), True),
                    StructField("label", MapType(StringType(), StringType()), True)
                ]))
            ])),
            StructField("ammatti", StructType([
                StructField("category", StructType([
                    StructField("index", MapType(StringType(), StringType()), True),
                    StructField("label", MapType(StringType(), StringType()), True)
                ]))
            ])),
            StructField("palvelumuoto", StructType([
                StructField("category", StructType([
                    StructField("index", MapType(StringType(), StringType()), True),
                    StructField("label", MapType(StringType(), StringType()), True)
                ]))
            ])),
            StructField("aika", StructType([
                StructField("category", StructType([
                    StructField("index", MapType(StringType(), StringType()), True),
                    StructField("label", MapType(StringType(), StringType()), True)
                ]))
            ])),
        ])),
        StructField("value", MapType(StringType(), StringType()))
    ]))
])


### Defining the transformations

In [0]:
def transform (table_df):
  # Step 2: Parse JSON
  parsed = table_df.withColumn("data", from_json(col("json_response"), schema)).drop("json_response")

  # Step 3: Extract dimensions and values as key-value pairs and then sort them with respect to the sort index
  values_df = parsed.selectExpr("explode(data.dataset.value) as (key, value)")

  regions = parsed.selectExpr("""
      explode(data.dataset.dimension.alue.category.label) as (region_id, region_name)
  """).join(
      parsed.selectExpr("explode(data.dataset.dimension.alue.category.index) as (region_id, sort_index_range)"),
      on="region_id",
      how="left"
  ).withColumn(
      "sort_index_range", col("sort_index_range").cast("int")
  ).orderBy("sort_index_range").drop(col("region_id"))

  professions = parsed.selectExpr("""
      explode(data.dataset.dimension.ammatti.category.label) as (profession_id, profession_name)
  """).join(
      parsed.selectExpr("explode(data.dataset.dimension.ammatti.category.index) as (profession_id, sort_index_profession)"),
      on="profession_id",
      how="left"
  ).withColumn(
      "sort_index_profession", col("sort_index_profession").cast("int")
  ).orderBy("sort_index_profession").drop(col("profession_id"))

  service_types = parsed.selectExpr("""
      explode(data.dataset.dimension.palvelumuoto.category.label) as (service_type_id, service_type_name)
  """).join(
      parsed.selectExpr("explode(data.dataset.dimension.palvelumuoto.category.index) as (service_type_id, sort_index_service_type)"),
      on="service_type_id",
      how="left"
  ).withColumn(
      "sort_index_service_type", col("sort_index_service_type").cast("int")
  ).orderBy("sort_index_service_type").drop(col("service_type_id"))

  years = parsed.selectExpr("""
      explode(data.dataset.dimension.aika.category.label) as (year_id, year)
  """).join(
      parsed.selectExpr("explode(data.dataset.dimension.aika.category.index) as (year_id, sort_index_year)"),
      on="year_id",
      how="left"
  ).withColumn(
      "sort_index_year", col("sort_index_year").cast("int")
  ).orderBy("sort_index_year").drop(col("year_id"))

  # Step 4: Cross join with your original data to get dimensions
  base_df = regions.crossJoin(professions).crossJoin(service_types).crossJoin(years)
  w = Window.orderBy("sort_index_range", "sort_index_profession", "sort_index_service_type", "sort_index_year")
  base_df = base_df.withColumn("row_number", row_number().over(w)-1).drop("sort_index_range", "sort_index_profession", "sort_index_service_type", "sort_index_year")

  # Step 5: Join with values df to get one final
  result_df = base_df.join(
      values_df.select(
          col("key").alias("row_number"),
          col("value")
      ), 
      "row_number", 
      "left"  # Use left join to keep all combinations, null for missing values
  ).select(
      "region_name",
      "profession_name", 
      "service_type_name", 
      "year", 
      "value"
  )

  # Step 6: Removing the total aggregate data
  result_df = result_df.filter(col('region_name')!='Kaikki kunnat')
  result_df = result_df.filter(col('profession_name')!='Kaikki ammatit')
  result_df = result_df.filter(col('service_type_name')!='Kaikki palvelumuodot')
  result_df = result_df.filter(
      (col('year') != 'Kaikki vuodet')
  )

  # Step 7: Changing the data type of certain columns
  result_df = result_df.withColumn("value", expr("try_cast(value as double)")).withColumn("year", expr("try_cast(year as int)"))

  return result_df

In [0]:
# Setting up the catalog and database
spark.catalog.setCurrentCatalog("silver")
spark.catalog.setCurrentDatabase("avohilmo")


In [0]:
%sql
-- Create an empty table with data types
CREATE TABLE IF NOT EXISTS visits_processed (
  region_name STRING,
  profession_name STRING,
  service_type_name STRING,
  year INT,
  value INT
);

CREATE TABLE IF NOT EXISTS customers_processed (
  region_name STRING,
  profession_name STRING,
  service_type_name STRING,
  year INT,
  value INT
);

CREATE TABLE IF NOT EXISTS visits_customers_processed (
  region_name STRING,
  profession_name STRING,
  service_type_name STRING,
  year INT,
  value DOUBLE
);

In [0]:
# Main runner code
visits_raw_df = spark.table("bronze.avohilmo.visits_raw")
visits_raw_df = visits_raw_df.orderBy(col("created_at").desc()).limit(1)
visits_raw_df = transform(visits_raw_df)

# Cast value to int
visits_raw_df = visits_raw_df.withColumn(
    "value", col("value").cast("int")
)
visits_raw_df.write.mode("overwrite").saveAsTable("silver.avohilmo.visits_processed")

In [0]:
customers_raw_df = spark.table("bronze.avohilmo.customers_raw")
customers_raw_df = customers_raw_df.orderBy(col("created_at").desc()).limit(1)
customers_raw_df = transform(customers_raw_df)

# Cast value to int
customers_raw_df = customers_raw_df.withColumn(
    "value", col("value").cast("int")
)
customers_raw_df.write.mode("overwrite").saveAsTable("silver.avohilmo.customers_processed")

In [0]:
visits_customers_raw_df = spark.table("bronze.avohilmo.visits_customers_raw")
visits_customers_raw_df = visits_customers_raw_df.orderBy(col("created_at").desc()).limit(1)
visits_customers_raw_df = transform(visits_customers_raw_df)
visits_customers_raw_df.write.mode("overwrite").saveAsTable("silver.avohilmo.visits_customers_processed")

In [None]:
%sql
select * from silver.avohilmo.visits_processed;

In [None]:
%sql
select * from silver.avohilmo.customers_processed;

In [None]:
%sql
select * from silver.avohilmo.visits_customers_processed;   