# Transform

In [0]:
# Import Delta Lake dependencies
from delta.tables import *

# Read Delta table into a DataFrame
df = spark.read.format("delta").load("dbfs:/user/hive/warehouse/delta-table")

# # Verify the data
# df.limit(5).display()

## drop unwanted column

In [0]:
columns_to_drop = ["Assessor Remarks", "OPM remarks"]
df_drop = df.drop(*columns_to_drop)

## Handle missing values

In [0]:
df_fill_null = df_drop.fillna({'Property_Type': 'Unknown', 'Residential_Type': 'Unknown','Non_Use_Code': 'Unknown', 'Location': 'Unknown'})

## Drop duplicates

In [0]:
# Drop duplicate rows based on Address and Date Recorded
df_rm_duplicate = df_fill_null.dropDuplicates(["Address","Location", "Date_Recorded"])

## Standardize the format

In [0]:
from pyspark.sql.functions import col, to_date

# Convert Date Recorded to YYYY-MM-DD format
df_date = df_rm_duplicate.withColumn("Date_Recorded", to_date(col("Date_Recorded"), "MM/dd/yyyy"))

## Custom binning

In [0]:
from pyspark.sql.functions import when

# Add price band column
df_price_group = df_date.withColumn("Price_Band", 
                   when(col("Sale_Amount") < 200000, "Low")
                   .when((col("Sale_Amount") >= 200000) & (col("Sale_Amount") < 500000), "Medium")
                   .otherwise("High"))

In [0]:
df_sales_perf = df_price_group.withColumn("Sale_Performance", 
                   when(col("Sales_Ratio") > 1, "Above Market Value")
                   .when(col("Sales_Ratio") == 1, "At Market Value")
                   .otherwise("Below Market Value"))

## Check constraints

In [0]:
# Filter out rows with negative Assessed Value or Sale Amount
df_no_neg = df_sales_perf.filter((col("Assessed_Value") >= 0) & (col("Sale_Amount") >= 0))

## Outlier Removal

In [0]:
from pyspark.sql.functions import mean, stddev

# Calculate mean and standard deviation
stats = df_no_neg.select(mean(col("Sale_Amount")).alias("mean"), stddev(col("Sale_Amount")).alias("stddev")).collect()
mean_value = stats[0]["mean"]
stddev_value = stats[0]["stddev"]

# Filter out outliers (3 standard deviations)
df_no_outlier = df_no_neg.filter((col("Sale_Amount") <= mean_value + 3 * stddev_value) & 
               (col("Sale_Amount") >= mean_value - 3 * stddev_value))

## Refine the data

In [0]:
df_cleaned = df_no_outlier.dropna()

In [0]:
df_cleaned = df_cleaned.filter( ~(col("Property_Type").like("Unknown")))

In [0]:
from pyspark.sql.functions import col
clean_df = df_cleaned.select([col(c).alias(c.replace(" ", "_")) for c in df.columns])

# Write to Delta Table
clean_df.write.format("delta").option("path", "dbfs:/user/hive/warehouse/delta-table").mode("overwrite").save()

## Store data for Later usage

In [0]:
# dbutils.fs.mount(
#   source = "wasbs://outputdata@realestatehexa.blob.core.windows.net",
#   mount_point = "/mnt/sample/output",
#   extra_configs = {"fs.azure.account.key.realestatehexa.blob.core.windows.net":"JfWT0BED8GvLWJtX46CL0ixm8kzMYMVcPSJ8Oc7oypZspQp81OQVy+DIjHU5lV7J33+Y85aQwiej+ASt0EIU0w=="})


True

In [0]:
# Specify the target path in Azure Data Lake
output_path = "/mnt/sample/output/trialoutput.csv"

# Write the DataFrame as a CSV file
clean_df.write.mode("overwrite").option("header", "true").csv(output_path)