# Transform and cleanse data for **Silver** table


In [31]:
BRONZE_DATA_PATH: str = "Files/bronze/nishiodens/japan-real-estate-transaction-prices/trade_prices"

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 33, Finished, Available, Finished)

In [32]:
print(BRONZE_DATA_PATH)

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 34, Finished, Available, Finished)

Files/bronze/nishiodens/japan-real-estate-transaction-prices/trade_prices


In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 35, Finished, Available, Finished)

In [34]:
from pyspark.sql.types import *

bronze_schema = StructType([
    StructField("No", IntegerType(), True),
    StructField("Type", StringType(), True),
    StructField("Region", StringType(), True),
    StructField("MunicipalityCode", StringType(), True),
    StructField("Prefecture", StringType(), True),
    StructField("Municipality", StringType(), True),
    StructField("DistrictName", StringType(), True),
    StructField("NearestStation", StringType(), True),
    StructField("TimeToNearestStation", StringType(), True),
    StructField("MinTimeToNearestStation", IntegerType(), True),
    StructField("MaxTimeToNearestStation", IntegerType(), True),
    StructField("TradePrice", LongType(), True),
    StructField("FloorPlan", StringType(), True),
    StructField("Area", DoubleType(), True),
    StructField("AreaIsGreaterFlag", BooleanType(), True),
    StructField("UnitPrice", LongType(), True),
    StructField("PricePerTsubo", DoubleType(), True),
    StructField("LandShape", StringType(), True),
    StructField("Frontage", DoubleType(), True),
    StructField("FrontageIsGreaterFlag", BooleanType(), True),
    StructField("TotalFloorArea", DoubleType(), True),
    StructField("TotalFloorAreaIsGreaterFlag", BooleanType(), True),
    StructField("BuildingYear", IntegerType(), True),
    StructField("PrewarBuilding", BooleanType(), True),
    StructField("Structure", StringType(), True),
    StructField("Use", StringType(), True),
    StructField("Purpose", StringType(), True),
    StructField("Direction", StringType(), True),
    StructField("Classification", StringType(), True),
    StructField("Breadth", DoubleType(), True),
    StructField("CityPlanning", StringType(), True),
    StructField("CoverageRatio", DoubleType(), True),
    StructField("FloorAreaRatio", DoubleType(), True),
    StructField("Period", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Quarter", StringType(), True),
    StructField("Renovation", StringType(), True),
    StructField("Remarks", StringType(), True)
])


StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 36, Finished, Available, Finished)

In [35]:
df_bronze = spark.read.format("csv").options(header=True, schema=bronze_schema).load(f"{BRONZE_DATA_PATH}/*.csv")

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 37, Finished, Available, Finished)

In [36]:
display(df_bronze.limit(3))


StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 38, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 5fb7c226-6c73-45f4-9689-95d5d90d38b9)

In [37]:
df_bronze.printSchema()

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 39, Finished, Available, Finished)

root
 |-- No: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- MunicipalityCode: string (nullable = true)
 |-- Prefecture: string (nullable = true)
 |-- Municipality: string (nullable = true)
 |-- DistrictName: string (nullable = true)
 |-- NearestStation: string (nullable = true)
 |-- TimeToNearestStation: string (nullable = true)
 |-- MinTimeToNearestStation: string (nullable = true)
 |-- MaxTimeToNearestStation: string (nullable = true)
 |-- TradePrice: string (nullable = true)
 |-- FloorPlan: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- AreaIsGreaterFlag: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- PricePerTsubo: string (nullable = true)
 |-- LandShape: string (nullable = true)
 |-- Frontage: string (nullable = true)
 |-- FrontageIsGreaterFlag: string (nullable = true)
 |-- TotalFloorArea: string (nullable = true)
 |-- TotalFloorAreaIsGreaterFlag: string (nullable = true)
 |-- Bui

## Create stats table for EDA

In [38]:
from pyspark.sql.functions import col

total_rows = df_bronze.count()
print(f"The bronze df has {total_rows} rows in total.")


StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 40, Finished, Available, Finished)

The bronze df has 3906518 rows in total.


In [39]:
from pyspark.sql.functions import col, when, isnan, isnull

def calc_stats(df):
    """
    This function is used to summarize useful statistical columns for data in a spark df
    """
    stats_list = []
    for col_name in df.columns:
        
        empty_string_count = df.filter(col(col_name) == "").count()
        null_count = df.filter(col(col_name).isNull()).count()

        missing_count = empty_string_count + null_count
        missing_percentage = (missing_count / total_rows) * 100

        unique_count = df.select(col_name).distinct().count()
        
        data_type = dict(df.dtypes)[col_name]
        

        stats_list.append((
            col_name,
            data_type,
            missing_count,
            missing_percentage,
            null_count,
            empty_string_count,
            unique_count
        ))

    # Create DataFrame with comprehensive statistics
    stats_df = spark.createDataFrame(stats_list, [
            "col_name",
            "data_type",
            "missing_count",
            "missing_percentage",
            "null_count",
            "empty_string_count",
            "unique_count"
    ])
    return stats_df

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 41, Finished, Available, Finished)

In [40]:
bronze_stats_df = calc_stats(df_bronze)

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 42, Finished, Available, Finished)

In [41]:
from pyspark.sql.functions import desc
display(bronze_stats_df.orderBy(desc("missing_percentage")))

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 43, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1c3f783f-158d-4024-b4b0-7e10d59fe83b)

In [42]:
bronze_stats_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze_profile")

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 44, Finished, Available, Finished)

In [43]:
columns_to_drop = []
missing_data = bronze_stats_df.select(col("col_name"), col("missing_percentage"))

print("Column with high missing value percentage:")
columns_to_drop = missing_data.filter(col("missing_percentage") >= 60.0)
display(columns_to_drop.orderBy(desc("missing_percentage")))

# display(columns_to_drop.select("col_name"))

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 45, Finished, Available, Finished)

Column with high missing value percentage:


SynapseWidget(Synapse.DataFrame, a6df6c81-5ec9-44d8-91ce-b2df615f6bcc)

In [44]:

df_dropped_missing = df_bronze.drop(*[row.col_name for row in columns_to_drop.select("col_name").collect()])
display(df_dropped_missing.limit(2))

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 46, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, e6441f54-13c8-4106-b0ad-27080774c4e4)

## Manually drop some unused columns

In [45]:
df_dropped_missing.printSchema()

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 47, Finished, Available, Finished)

root
 |-- No: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- MunicipalityCode: string (nullable = true)
 |-- Prefecture: string (nullable = true)
 |-- Municipality: string (nullable = true)
 |-- DistrictName: string (nullable = true)
 |-- NearestStation: string (nullable = true)
 |-- TimeToNearestStation: string (nullable = true)
 |-- MinTimeToNearestStation: string (nullable = true)
 |-- MaxTimeToNearestStation: string (nullable = true)
 |-- TradePrice: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- AreaIsGreaterFlag: string (nullable = true)
 |-- LandShape: string (nullable = true)
 |-- Frontage: string (nullable = true)
 |-- FrontageIsGreaterFlag: string (nullable = true)
 |-- TotalFloorAreaIsGreaterFlag: string (nullable = true)
 |-- BuildingYear: string (nullable = true)
 |-- PrewarBuilding: string (nullable = true)
 |-- Structure: string (nullable = true)
 |-- Use: string (nullable = true)
 |-- Direction:

In [46]:
columns_not_included = [
    "No",
    "Region",
    "MunicipalityCode",
    "DistrictName",
    "NearestStation",
    "TimeToNearestStation",
    "MinTimeToNearestStation",
    "MaxTimeToNearestStation",
    "Area",
    "AreaIsGreaterFlag",
    "Frontage",
    "FrontageIsGreaterFlag",
    "TotalFloorAreaIsGreaterFlag",
    "PrewarBuilding",
    "Breadth",
    "CoverageRatio",
    "FloorAreaRatio",
    "Period"
]

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 48, Finished, Available, Finished)

In [47]:
df_distilled = df_dropped_missing.drop(*columns_not_included)
display(df_distilled.orderBy(["Year", "Quarter"], ascending=[False, False]))

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 49, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 02ecf9e5-0c51-45b1-8e39-8470e6e7f2d8)

In [48]:
df_distilled.printSchema()

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 50, Finished, Available, Finished)

root
 |-- Type: string (nullable = true)
 |-- Prefecture: string (nullable = true)
 |-- Municipality: string (nullable = true)
 |-- TradePrice: string (nullable = true)
 |-- LandShape: string (nullable = true)
 |-- BuildingYear: string (nullable = true)
 |-- Structure: string (nullable = true)
 |-- Use: string (nullable = true)
 |-- Direction: string (nullable = true)
 |-- Classification: string (nullable = true)
 |-- CityPlanning: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Quarter: string (nullable = true)



In [50]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql.functions import col, when, regexp_replace, trim


df_selected = df_distilled.select(
    trim(col("Type")).alias("Type"),
    trim(col("Prefecture")).alias("Prefecture"),
    trim(col("Municipality")).alias("Municipality"),
    # remove any non-numeric characters except decimal point
    regexp_replace(col("TradePrice"), "[^0-9.]", "").alias("TradePrice_clean"),
    trim(col("LandShape")).alias("LandShape"),
    # remove any non-numeric characters
    regexp_replace(col("BuildingYear"), "[^0-9]", "").alias("BuildingYear_clean"),
    trim(col("Structure")).alias("Structure"),
    trim(col("Use")).alias("Use"),
    trim(col("Direction")).alias("Direction"),
    trim(col("Classification")).alias("Classification"),
    trim(col("CityPlanning")).alias("CityPlanning"),
    regexp_replace(col("Year"), "[^0-9]", "").alias("Year_clean"),
    regexp_replace(col("Quarter"), "[^0-9]", "").alias("Quarter_clean")
)

df_type_casted = df_selected.select(
    col("Type"),
    col("Prefecture"),
    col("Municipality"),
    when(col("TradePrice_clean") == "", None)
        .otherwise(col("TradePrice_clean").cast(DoubleType())).alias("TradePrice"),
    col("LandShape"),
    when(col("BuildingYear_clean") == "", None)
        .otherwise(col("BuildingYear_clean").cast(IntegerType())).alias("BuildingYear"),
    col("Structure"),
    col("Use"),
    col("Direction"),
    col("Classification"),
    col("CityPlanning"),
    when(col("Year_clean") == "", None)
        .otherwise(col("Year_clean").cast(IntegerType())).alias("Year"),
    when(col("Quarter_clean") == "", None)
        .otherwise(col("Quarter_clean").cast(IntegerType())).alias("Quarter")
)

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 52, Finished, Available, Finished)

In [51]:
display(df_type_casted.limit(5))

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 53, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d462eba9-a35c-469f-8ab5-61051ba0ee44)

In [52]:
display(calc_stats(df_type_casted))

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 54, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 897ceadf-7600-470d-95ab-d52a81909541)

## Imputation - Making senses for Nulls 

**Columns that shouldnt allow Nulls**

- TradePrice
- Prefecture
- Year
- Quarter

In [53]:
# df_type_casted.printSchema()


StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 55, Finished, Available, Finished)

In [54]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Imputer
from pyspark.ml import Pipeline



categorical_fillna_map = {
    "LandShape": "Unknown",
    "Structure": "Unknown",
    "Use": "Not Available",
    "Direction": "Unknown",
    "Classification": "Not Available",
    "CityPlanning": "Not Available"
}

df_fillna_cat = df_type_casted.fillna(categorical_fillna_map)


StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 56, Finished, Available, Finished)

In [55]:
display(df_fillna_cat.limit(5))

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 57, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1d95d70f-1eba-4210-9f9c-c24506d08d49)

## Generate hash key for each row
hash(row_content + row index)

In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, row_number, sha2, concat, lit
from pyspark.sql.window import Window

windowSpec = Window.orderBy(col("Year").desc(), col("Quarter").desc())

df_with_id = df_fillna_cat.withColumn(
    "row_num", 
    row_number().over(windowSpec)
).withColumn(
    "row_content",
    concat_ws("|", 
        col("Type"),
        col("Prefecture"), 
        col("Municipality"),
        col("TradePrice"),
        col("LandShape"),
        col("BuildingYear"),
        col("Structure"),
        col("Use"),
        col("Direction"),
        col("Classification"),
        col("CityPlanning"),
        col("Year"),
        col("Quarter")
    )
).withColumn(
    "ID",
    sha2(concat(col("row_content"), lit("_"), col("row_num")), 256)
)


StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 65, Finished, Available, Finished)

In [65]:
display(df_with_id.limit(2))

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 67, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2a0aa798-6bd8-41f0-a0bd-fb2d4c40d789)

## Write silver table through upsert operation

In [66]:
df_silver = df_with_id.drop("row_content", "row_num")
df_silver.printSchema()

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 68, Finished, Available, Finished)

root
 |-- Type: string (nullable = true)
 |-- Prefecture: string (nullable = true)
 |-- Municipality: string (nullable = true)
 |-- TradePrice: double (nullable = true)
 |-- LandShape: string (nullable = false)
 |-- BuildingYear: integer (nullable = true)
 |-- Structure: string (nullable = false)
 |-- Use: string (nullable = false)
 |-- Direction: string (nullable = false)
 |-- Classification: string (nullable = false)
 |-- CityPlanning: string (nullable = false)
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- ID: string (nullable = true)



In [69]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

silver_schema = StructType([
    StructField("Type", StringType(), True),
    StructField("Prefecture", StringType(), False),
    StructField("Municipality", StringType(), True),
    StructField("TradePrice", DoubleType(), False),
    StructField("LandShape", StringType(), True),
    StructField("BuildingYear", IntegerType(), True),
    StructField("Structure", StringType(), True),
    StructField("Use", StringType(), True),
    StructField("Direction", StringType(), True),
    StructField("Classification", StringType(), True),
    StructField("CityPlanning", StringType(), True),
    StructField("Year", IntegerType(), True),
    StructField("Quarter", IntegerType(), True),
    StructField("ID", StringType(), False) 
])

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 71, Finished, Available, Finished)

In [70]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col


delta_table_name = "silver_data"

print(f"Processing data for Delta table: {delta_table_name}")

try:
    deltaTable = DeltaTable.forName(spark, delta_table_name)
    print(f"Delta table '{delta_table_name}' already exists. Proceeding with upsert.")
except Exception as e:
    print(f"Delta table '{delta_table_name}' does not exist. Creating it now using df_silver.")
    df_silver.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(delta_table_name)
    print(f"Delta table '{delta_table_name}' created successfully.")
    deltaTable = DeltaTable.forName(spark, delta_table_name)


StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 72, Finished, Available, Finished)

Processing data for Delta table: silver_data
Delta table 'silver_data' does not exist. Creating it now using df_silver.
Delta table 'silver_data' created successfully.


## 

In [71]:

print(f"Starting upsert operation into '{delta_table_name}'...")


deltaTable.alias("target") \
    .merge(
        source=df_silver.alias("source"),
        condition="target.ID = source.ID"
    ) \
    .whenMatchedUpdate(set={
        "Type": col("source.Type"),
        "Prefecture": col("source.Prefecture"),
        "Municipality": col("source.Municipality"),
        "TradePrice": col("source.TradePrice"),
        "LandShape": col("source.LandShape"),
        "BuildingYear": col("source.BuildingYear"),
        "Structure": col("source.Structure"),
        "Use": col("source.Use"),
        "Direction": col("source.Direction"),
        "Classification": col("source.Classification"),
        "CityPlanning": col("source.CityPlanning"),
        "Year": col("source.Year"),
        "Quarter": col("source.Quarter")
    }) \
    .whenNotMatchedInsert(values={
        "Type": col("source.Type"),
        "Prefecture": col("source.Prefecture"),
        "Municipality": col("source.Municipality"),
        "TradePrice": col("source.TradePrice"),
        "LandShape": col("source.LandShape"),
        "BuildingYear": col("source.BuildingYear"),
        "Structure": col("source.Structure"),
        "Use": col("source.Use"),
        "Direction": col("source.Direction"),
        "Classification": col("source.Classification"),
        "CityPlanning": col("source.CityPlanning"),
        "Year": col("source.Year"),
        "Quarter": col("source.Quarter"),
        "ID": col("source.ID")
    }) \
    .execute()

print(f"Upsert operation completed for table '{delta_table_name}'.")

StatementMeta(, 35adddfb-cc47-4d53-8cd1-066fc37271c6, 73, Finished, Available, Finished)

Starting upsert operation into 'silver_data'...
Upsert operation completed for table 'silver_data'.


## exit values

In [1]:
num_rows = df_silver.count()
num_columns = len(df_silver.columns)
exit_status = "SUCCESS"


exit_values = {
    "status": exit_status,
    "num_rows": num_rows,
    "num_columns": num_columns,
    "table_name": "silver_data" #
}


json_exit_values = json.dumps(exit_values, indent=4)

print("\n--- Notebook Exit Values (JSON) ---")
print(json_exit_values)

StatementMeta(, b2dce12c-36dc-4f23-b140-91b2f7ccd451, 3, Finished, Available, Finished)

NameError: name 'df_silver' is not defined

In [None]:

print(f"Exiting notebook with value: {exit_value_json}")
mssparkutils.notebook.exit(exit_value_json)

print("This line will not be printed after exit.")
