# Housing Sales Count
This notebook imports Housing Sales Count data from [Zillow](https://files.zillowstatic.com/research/public_csvs/sales_count_now/Metro_sales_count_now_uc_sfrcondo_month.csv?t=1746553689) and performs the following operations:

- **Data Transformation:** Converts date columns into rows for improved accessibility.
- **Data Cleaning:** Fills missing StateName values when the region type is categorized as Country.
- **Data Storage:** Saves the processed data to an existing Unity Catalog.

In [0]:
# Importing all functions from pyspark.sql.functions module
from pyspark.sql.functions import *

In [0]:
# Define the path to the CSV file
data_path = "/Volumes/generaldata/dataanalysis/upload/Metro_sales_count_now_uc_sfrcondo_month.csv"

# Define catalog, schema, and table names for saving the DataFrame
u_catalog = "generaldata"
u_schema = "dataanalysis"
u_table = "Metro_Sales_Count"

# Read the CSV file into a DataFrame with header and inferred schema
df = spark.read.csv(data_path, header=True, inferSchema=True)

# Filter out rows where RegionType is "country"
df = df.filter(df.RegionType != "country")

# Initialize a list to store column names that contain "20"
col_to_row_list = []
for col in df.columns:
    if "20" in col:
        col_to_row_list.append(col)

# Transform the DataFrame from wide to long format using melt
sales_df = df.melt(
    ids=["RegionID", "SizeRank", "RegionName", "RegionType", "StateName"], values=col_to_row_list,
    variableColumnName="Date", valueColumnName="Count"
)

# Drop the original columns that were melted
sales_df = sales_df.drop(*col_to_row_list)

# Fill missing values in 'StateName' column with 'USA'
sales_df = sales_df.fillna({'StateName': 'USA'})

# Save the transformed DataFrame as a table in the specified catalog and schema
sales_df.write\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .saveAsTable(f"{u_catalog}.{u_schema}.{u_table}")