In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [14]:
spark = SparkSession.builder\
         .appName("ExplorationAndPrepration")\
         .master("spark://spark-master:7077")\
         .getOrCreate()

In [15]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("borough", StringType(), True),
    StructField("neighborhood", StringType(), True),
    StructField("building_class_category", StringType(), True),
    StructField("tax_class_at_present", StringType(), True),
    StructField("block", StringType(), True),
    StructField("lot", StringType(), True),
    StructField("ease_ment", StringType(), True),
    StructField("building_class_at_present", StringType(), True),
    StructField("address", StringType(), True),
    StructField("apartment_number", StringType(), True),
    StructField("zip_code", StringType(), True),
    StructField("residential_units", StringType(), True),
    StructField("commercial_units", StringType(), True),
    StructField("total_units", StringType(), True),
    StructField("land_square_feet", DoubleType(), True),  # will clean later
    StructField("gross_square_feet", DoubleType(), True),
    StructField("year_built", StringType(), True),
    StructField("tax_class_at_time_of_sale", StringType(), True),
    StructField("building_class_at_time_of_sale", StringType(), True),
    StructField("sale_price", DoubleType(), True),         # will convert to numeric
    StructField("sale_date", StringType(), True)           # will parse to date
])


In [16]:
df = (
    spark.read.option("header", True)
               .csv(["hdfs://namenode:8020/data/bronze/2022", "hdfs://namenode:8020/data/bronze/2023", "hdfs://namenode:8020/data/bronze/2024"],schema = schema)
               .drop("apartment_number", "easement")
)

In [17]:
columns_to_cast = ["zip_code", "block", "lot", "residential_units", "commercial_units","year_built","tax_class_at_time_of_sale"]

for col_name in columns_to_cast:
    df = df.withColumn(
        col_name,
        regexp_replace(col(col_name), r"\.0$", "").cast("int")
    )


In [18]:
df.count()

KeyboardInterrupt: 

In [None]:
null_count = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

In [None]:
null_count.show()

+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+
|borough|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|
+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+
|      0|           0|                      0|                 112|    0|  0|                      112|      0|      22|    

`tax_class_category` & `tax_class_at_present` : Drop rows (They show in a small portion in the data)


In [None]:
df.filter(col("year_built").isNull()).show()

+-------+------------------+-----------------------+--------------------+-----+----+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|borough|      neighborhood|building_class_category|tax_class_at_present|block| lot|building_class_at_present|             address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|
+-------+------------------+-----------------------+--------------------+-----+----+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|    4.0|AIRPORT LA GUARDIA|   12 CONDOS - WALKU...|         

In [None]:
df = df.withColumn(
    "total_units",
    coalesce(col("residential_units"), lit(0)) + coalesce(col("commercial_units"), lit(0))
)

In [None]:
df.select(count(when(col("total_units").isNull(), "total_units")).alias("total_units")).show()

+-----------+
|total_units|
+-----------+
|          0|
+-----------+



In [None]:
df.filter(col("building_class_at_present").isNull()).show()

+-------+------------------+-----------------------+--------------------+-----+----+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|borough|      neighborhood|building_class_category|tax_class_at_present|block| lot|building_class_at_present|             address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|
+-------+------------------+-----------------------+--------------------+-----+----+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|    4.0|    FLUSHING-NORTH|   13 CONDOS - ELEVA...|         

In [None]:
test = df.filter(col("residential_units").isNull() & col("commercial_units").isNull())

In [None]:
test.select("building_class_category").distinct().show(truncate = False)

+--------------------------------+
|building_class_category         |
+--------------------------------+
|02 TWO FAMILY DWELLINGS         |
|17 CONDO COOPS                  |
|08 RENTALS - ELEVATOR APARTMENTS|
|09 COOPS - WALKUP APARTMENTS    |
|01 ONE FAMILY DWELLINGS         |
|21 OFFICE BUILDINGS             |
|10 COOPS - ELEVATOR APARTMENTS  |
+--------------------------------+



In [None]:
df.select("building_class_category").orderBy("building_class_category").distinct().show(1000,truncate=False)

+------------------------------------------+
|building_class_category                   |
+------------------------------------------+
|02 TWO FAMILY DWELLINGS                   |
|17 CONDO COOPS                            |
|49 CONDO WAREHOUSES/FACTORY/INDUS         |
|40 SELECTED GOVERNMENTAL FACILITIES       |
|31 COMMERCIAL VACANT LAND                 |
|27 FACTORIES                              |
|30 WAREHOUSES                             |
|28 COMMERCIAL CONDOS                      |
|47 CONDO NON-BUSINESS STORAGE             |
|36 OUTDOOR RECREATIONAL FACILITIES        |
|34 THEATRES                               |
|13 CONDOS - ELEVATOR APARTMENTS           |
|38 ASYLUMS AND HOMES                      |
|08 RENTALS - ELEVATOR APARTMENTS          |
|29 COMMERCIAL GARAGES                     |
|09 COOPS - WALKUP APARTMENTS              |
|03 THREE FAMILY DWELLINGS                 |
|32 HOSPITAL AND HEALTH FACILITIES         |
|48 CONDO TERRACES/GARDENS/CABANAS         |
|12 CONDOS

In [None]:
df.filter(col("residential_units").isNull()).select("building_class_category").distinct().show(truncate=False,n = 1000)

+-----------------------------------------+
|building_class_category                  |
+-----------------------------------------+
|02 TWO FAMILY DWELLINGS                  |
|17 CONDO COOPS                           |
|49 CONDO WAREHOUSES/FACTORY/INDUS        |
|28 COMMERCIAL CONDOS                     |
|47 CONDO NON-BUSINESS STORAGE            |
|08 RENTALS - ELEVATOR APARTMENTS         |
|09 COOPS - WALKUP APARTMENTS             |
|48 CONDO TERRACES/GARDENS/CABANAS        |
|44 CONDO PARKING                         |
|43 CONDO OFFICE BUILDINGS                |
|45 CONDO HOTELS                          |
|41 TAX CLASS 4 - OTHER                   |
|01 ONE FAMILY DWELLINGS                  |
|46 CONDO STORE BUILDINGS                 |
|21 OFFICE BUILDINGS                      |
|10 COOPS - ELEVATOR APARTMENTS           |
|42 CONDO CULTURAL/MEDICAL/EDUCATIONAL/ETC|
+-----------------------------------------+



In [None]:
df.filter(col("residential_units").isNull() & col("building_class_category").isin(["02 TWO FAMILY DWELLINGS","08 RENTALS - ELEVATOR APARTMENTS","COOPS - WALKUP APARTMENTS","01 ONE FAMILY DWELLINGS","10 COOPS - ELEVATOR APARTMENTS"])).count()

41129

In [None]:
df.filter(col("building_class_category").isin(["02 TWO FAMILY DWELLINGS","08 RENTALS - ELEVATOR APARTMENTS","COOPS - WALKUP APARTMENTS","01 ONE FAMILY DWELLINGS","10 COOPS - ELEVATOR APARTMENTS"])).show()

+-------+------------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|borough|      neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|             address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|
+-------+------------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|    4.0|AIRPORT LA GUARDIA|   01 ONE FAMILY DWE...|            

In [None]:
df.filter(col("building_class_category").isin(["08 RENTALS - ELEVATOR APARTMENTS","COOPS - WALKUP APARTMENTS","10 COOPS - ELEVATOR APARTMENTS"])).show()

+-------+------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|borough|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|             address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|
+-------+------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|    4.0|     ARVERNE|   08 RENTALS - ELEV...|                   2|15890| 97|     

In [None]:
df = df.withColumn(
    "residential_units",
    when(
        ((col("residential_units").isNull()) | (col("residential_units") == 0)) &
        (col("building_class_category") == "01 ONE FAMILY DWELLINGS"),
        1
    ).when(
        ((col("residential_units").isNull()) | (col("residential_units") == 0)) &
        (col("building_class_category") == "02 TWO FAMILY DWELLINGS"),
        2
    ).when(
        ((col("residential_units").isNull()) | (col("residential_units") == 0)) &
        (col("building_class_category").isin(
            "08 RENTALS - ELEVATOR APARTMENTS",
            "09 COOPS - WALKUP APARTMENTS",
            "10 COOPS - ELEVATOR APARTMENTS"
        )) &
        (col("total_units").isNotNull()),
        col("total_units")
    ).when(
        ((col("residential_units").isNull()) | (col("residential_units") == 0)) &
        (col("building_class_category").isin(
            "17 CONDO COOPS",
            "49 CONDO WAREHOUSES/FACTORY/INDUS",
            "28 COMMERCIAL CONDOS",
            "47 CONDO NON-BUSINESS STORAGE",
            "48 CONDO TERRACES/GARDENS/CABANAS",
            "44 CONDO PARKING",
            "43 CONDO OFFICE BUILDINGS",
            "45 CONDO HOTELS",
            "41 TAX CLASS 4 - OTHER",
            "46 CONDO STORE BUILDINGS",
            "21 OFFICE BUILDINGS",
            "42 CONDO CULTURAL/MEDICAL/EDUCATIONAL/ETC"
        )), 
        0
    ).otherwise(col("residential_units"))
)

In [None]:
null_count = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

In [None]:
null_count.show()

+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+
|borough|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|
+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+
|      0|           0|                      0|                 112|    0|  0|                      112|      0|      22|    

In [None]:
df.filter(col("building_class_category").isin(["08 RENTALS - ELEVATOR APARTMENTS","COOPS - WALKUP APARTMENTS","10 COOPS - ELEVATOR APARTMENTS"])).show()

+-------+------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|borough|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|             address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|
+-------+------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|    4.0|     ARVERNE|   08 RENTALS - ELEV...|                   2|15890| 97|     

In [None]:
df.printSchema()

root
 |-- borough: double (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- building_class_category: string (nullable = true)
 |-- tax_class_at_present: string (nullable = true)
 |-- block: integer (nullable = true)
 |-- lot: integer (nullable = true)
 |-- building_class_at_present: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- residential_units: integer (nullable = true)
 |-- commercial_units: integer (nullable = true)
 |-- total_units: integer (nullable = false)
 |-- land_square_feet: double (nullable = true)
 |-- gross_square_feet: double (nullable = true)
 |-- year_built: integer (nullable = true)
 |-- tax_class_at_time_of_sale: integer (nullable = true)
 |-- building_class_at_time_of_sale: string (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- sale_date: string (nullable = true)



## Now about 50K from the residential_units missing data solved

## The other records can't have residential units so the value will be 0

In [None]:
df.filter(col("residential_units").isNull()).select("building_class_category").distinct().show(truncate=False,n = 1000)

+-----------------------+
|building_class_category|
+-----------------------+
+-----------------------+



## Now the commercial units

In [None]:
df.filter(col("commercial_units").isNull()).select("building_class_category").distinct().show(truncate=False,n = 1000)

+------------------------------------------+
|building_class_category                   |
+------------------------------------------+
|02 TWO FAMILY DWELLINGS                   |
|17 CONDO COOPS                            |
|13 CONDOS - ELEVATOR APARTMENTS           |
|08 RENTALS - ELEVATOR APARTMENTS          |
|09 COOPS - WALKUP APARTMENTS              |
|12 CONDOS - WALKUP APARTMENTS             |
|15 CONDOS - 2-10 UNIT RESIDENTIAL         |
|16 CONDOS - 2-10 UNIT WITH COMMERCIAL UNIT|
|04 TAX CLASS 1 CONDOS                     |
|01 ONE FAMILY DWELLINGS                   |
|11 SPECIAL CONDO BILLING LOTS             |
|21 OFFICE BUILDINGS                       |
|10 COOPS - ELEVATOR APARTMENTS            |
+------------------------------------------+



In [None]:
df = df.withColumn(
    "commercial_units",
    when(
        (col("commercial_units").isNull()) &
        (col("building_class_category").isin(
            "01 ONE FAMILY DWELLINGS",
            "02 TWO FAMILY DWELLINGS",
            "03 THREE FAMILY DWELLINGS",
            "08 RENTALS - ELEVATOR APARTMENTS",
            "09 COOPS - WALKUP APARTMENTS",
            "10 COOPS - ELEVATOR APARTMENTS",
            "12 CONDOS - WALKUP APARTMENTS",
            "13 CONDOS - ELEVATOR APARTMENTS",
            "15 CONDOS - 2-10 UNIT RESIDENTIAL",
            "04 TAX CLASS 1 CONDOS",
            "11 SPECIAL CONDO BILLING LOTS",
            "17 CONDO COOPS"
        )),
        "0.0"
    ).when(
        (col("commercial_units").isNull()) &
        (col("building_class_category") == "16 CONDOS - 2-10 UNIT WITH COMMERCIAL UNIT"),
        "1.0"
    ).when(
        (col("commercial_units").isNull()) &
        (col("building_class_category") == "21 OFFICE BUILDINGS") &
        (col("total_units").isNotNull()),
        col("total_units")
    ).otherwise(col("commercial_units"))
)

In [None]:
df.select(count(when(col("commercial_units").isNull(), "commercial_units")).alias("commercial_units")).show()

+----------------+
|commercial_units|
+----------------+
|               0|
+----------------+



In [None]:
df.filter(col("commercial_units").isNull()).select("building_class_category").distinct().show(truncate=False,n = 1000)

+-----------------------+
|building_class_category|
+-----------------------+
+-----------------------+



## Land and gross Square feet imputation

In [None]:
null_count = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_count.show()

+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+
|borough|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|
+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+
|      0|           0|                      0|                 112|    0|  0|                      112|      0|      22|    

In [None]:
df.filter(col("land_square_feet").isNull()).select("building_class_category").distinct().show(truncate=False , n=1000)

+------------------------------------------+
|building_class_category                   |
+------------------------------------------+
|02 TWO FAMILY DWELLINGS                   |
|17 CONDO COOPS                            |
|49 CONDO WAREHOUSES/FACTORY/INDUS         |
|28 COMMERCIAL CONDOS                      |
|47 CONDO NON-BUSINESS STORAGE             |
|13 CONDOS - ELEVATOR APARTMENTS           |
|08 RENTALS - ELEVATOR APARTMENTS          |
|09 COOPS - WALKUP APARTMENTS              |
|48 CONDO TERRACES/GARDENS/CABANAS         |
|12 CONDOS - WALKUP APARTMENTS             |
|15 CONDOS - 2-10 UNIT RESIDENTIAL         |
|44 CONDO PARKING                          |
|16 CONDOS - 2-10 UNIT WITH COMMERCIAL UNIT|
|43 CONDO OFFICE BUILDINGS                 |
|45 CONDO HOTELS                           |
|04 TAX CLASS 1 CONDOS                     |
|01 ONE FAMILY DWELLINGS                   |
|46 CONDO STORE BUILDINGS                  |
|11 SPECIAL CONDO BILLING LOTS             |
|21 OFFICE

In [None]:
df.filter(col("land_square_feet").isNotNull()).show()

+-------+------------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|borough|      neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|             address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|
+-------+------------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+
|    4.0|AIRPORT LA GUARDIA|   01 ONE FAMILY DWE...|            

## Smart Imputation

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, expr, lit

def smart_imputation(df):
    # List of building class categories where land_square_feet is typically irrelevant
    zero_land_categories = [
        "CONDO", "COOPS", "CONDO PARKING", "CONDO NON-BUSINESS STORAGE",
        "CONDO OFFICE BUILDINGS", "CONDO STORE BUILDINGS", "CONDO HOTELS",
        "CONDO TERRACES/GARDENS/CABANAS", "CONDO WAREHOUSES/FACTORY/INDUS",
        "CONDO CULTURAL/MEDICAL/EDUCATIONAL/ETC"
    ]

    # Create boolean column to identify condo-like properties
    df = df.withColumn(
        "is_condo_type",
        col("building_class_category").rlike("(?i)" + "|".join(zero_land_categories))
    )

    # Define windows with increasing generalization
    window_detailed = Window.partitionBy("building_class_category", "borough", "neighborhood")
    window_medium = Window.partitionBy("building_class_category", "borough")
    window_basic = Window.partitionBy("building_class_category")

    # Calculate medians
    df = (df
        .withColumn("median_gross_detailed", expr("percentile_approx(gross_square_feet, 0.5)").over(window_detailed))
        .withColumn("median_gross_medium", expr("percentile_approx(gross_square_feet, 0.5)").over(window_medium))
        .withColumn("median_gross_basic", expr("percentile_approx(gross_square_feet, 0.5)").over(window_basic))
        .withColumn("median_land_detailed", expr("percentile_approx(land_square_feet, 0.5)").over(window_detailed))
        .withColumn("median_land_medium", expr("percentile_approx(land_square_feet, 0.5)").over(window_medium))
        .withColumn("median_land_basic", expr("percentile_approx(land_square_feet, 0.5)").over(window_basic))
    )

    # Set flags BEFORE imputation (based on current nulls)
    df = df.withColumn("gross_sqft_imputed_flag", col("gross_square_feet").isNull())
    df = df.withColumn("land_sqft_imputed_flag", col("land_square_feet").isNull())

    # Impute gross_square_feet
    df = df.withColumn(
        "gross_square_feet",
        when(col("gross_square_feet").isNotNull(), col("gross_square_feet"))
        .when(col("median_gross_detailed").isNotNull(), col("median_gross_detailed"))
        .when(col("median_gross_medium").isNotNull(), col("median_gross_medium"))
        .otherwise(col("median_gross_basic"))
    )

    # Impute land_square_feet with conditional logic for condo-like categories
    df = df.withColumn(
        "land_square_feet",
        when(col("land_square_feet").isNotNull(), col("land_square_feet"))
        .when(col("is_condo_type"), lit(0))  # For condo/coops etc.
        .when(col("median_land_detailed").isNotNull(), col("median_land_detailed"))
        .when(col("median_land_medium").isNotNull(), col("median_land_medium"))
        .otherwise(col("median_land_basic"))
    )

    # Clean up helper columns
    df = df.drop(
        "median_gross_detailed", "median_gross_medium", "median_gross_basic",
        "median_land_detailed", "median_land_medium", "median_land_basic",
        "is_condo_type"
    )

    return df


In [None]:
df_processed = smart_imputation(df)

In [None]:
df_processed = df_processed.withColumn(
    "gross_square_feet",
    when(col("gross_square_feet").isNull() & col("building_class_category").contains("VACANT"), 0)
    .otherwise(col("gross_square_feet"))
).withColumn(
    "land_square_feet",
    when(col("land_square_feet").isNull() & col("building_class_category").contains("CONDO"), 0)
    .otherwise(col("land_square_feet"))
)


In [None]:
null_count = df_processed.select([count(when(col(c).isNull(), c)).alias(c) for c in df_processed.columns])
null_count.show()

+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+-----------------------+----------------------+
|borough|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|gross_sqft_imputed_flag|land_sqft_imputed_flag|
+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+-----------------------+-------

In [None]:
df_processed.filter(
    (col("land_sqft_imputed_flag") == True) | (col("gross_sqft_imputed_flag") == True)
).show()

+-------+-------------+-----------------------+--------------------+-----+----+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+----------+-----------------------+----------------------+
|borough| neighborhood|building_class_category|tax_class_at_present|block| lot|building_class_at_present|             address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|gross_sqft_imputed_flag|land_sqft_imputed_flag|
+-------+-------------+-----------------------+--------------------+-----+----+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+-----

In [None]:
df_remaining_nulls = df_processed.filter(col("gross_square_feet").isNull() | col("land_square_feet").isNull())
df_remaining_nulls.groupBy("building_class_category").count().orderBy("count", ascending=False).show(50,truncate=False)


+------------------------------------------+-----+
|building_class_category                   |count|
+------------------------------------------+-----+
|10 COOPS - ELEVATOR APARTMENTS            |40672|
|13 CONDOS - ELEVATOR APARTMENTS           |40265|
|09 COOPS - WALKUP APARTMENTS              |7704 |
|04 TAX CLASS 1 CONDOS                     |4699 |
|15 CONDOS - 2-10 UNIT RESIDENTIAL         |4388 |
|44 CONDO PARKING                          |4238 |
|17 CONDO COOPS                            |3661 |
|12 CONDOS - WALKUP APARTMENTS             |2652 |
|47 CONDO NON-BUSINESS STORAGE             |1245 |
|43 CONDO OFFICE BUILDINGS                 |920  |
|28 COMMERCIAL CONDOS                      |700  |
|46 CONDO STORE BUILDINGS                  |518  |
|16 CONDOS - 2-10 UNIT WITH COMMERCIAL UNIT|331  |
|45 CONDO HOTELS                           |258  |
|48 CONDO TERRACES/GARDENS/CABANAS         |107  |
|49 CONDO WAREHOUSES/FACTORY/INDUS         |73   |
|42 CONDO CULTURAL/MEDICAL/EDUC

In [None]:
df_processed.filter(col("gross_square_feet").isNull()) \
  .groupBy("building_class_category") \
  .count() \
  .orderBy("count", ascending=False) \
  .show(50, truncate=False)


+------------------------------------------+-----+
|building_class_category                   |count|
+------------------------------------------+-----+
|10 COOPS - ELEVATOR APARTMENTS            |40672|
|13 CONDOS - ELEVATOR APARTMENTS           |40265|
|09 COOPS - WALKUP APARTMENTS              |7704 |
|04 TAX CLASS 1 CONDOS                     |4699 |
|15 CONDOS - 2-10 UNIT RESIDENTIAL         |4388 |
|44 CONDO PARKING                          |4238 |
|17 CONDO COOPS                            |3661 |
|12 CONDOS - WALKUP APARTMENTS             |2652 |
|47 CONDO NON-BUSINESS STORAGE             |1245 |
|43 CONDO OFFICE BUILDINGS                 |920  |
|28 COMMERCIAL CONDOS                      |700  |
|46 CONDO STORE BUILDINGS                  |518  |
|16 CONDOS - 2-10 UNIT WITH COMMERCIAL UNIT|331  |
|45 CONDO HOTELS                           |258  |
|48 CONDO TERRACES/GARDENS/CABANAS         |107  |
|49 CONDO WAREHOUSES/FACTORY/INDUS         |73   |
|42 CONDO CULTURAL/MEDICAL/EDUC

In [None]:
def impute_gross_square_feet(df):
    # 1️⃣ Special case: Impute zero for non-residential condo types
    df = df.withColumn(
        "gross_square_feet",
        when(
            col("building_class_category").isin(
                "44 CONDO PARKING",
                "47 CONDO NON-BUSINESS STORAGE",
                "48 CONDO TERRACES/GARDENS/CABANAS"
            ),
            lit(0)
        ).otherwise(col("gross_square_feet"))
    )

    # 2️⃣ Smart imputation by neighborhood + building class
    window_neigh_class = Window.partitionBy("neighborhood", "building_class_category")
    df = df.withColumn(
        "median_gross_local",
        expr("percentile_approx(gross_square_feet, 0.5)").over(window_neigh_class)
    )
    df = df.withColumn(
        "gross_square_feet",
        when(col("gross_square_feet").isNotNull(), col("gross_square_feet"))
        .otherwise(col("median_gross_local"))
    ).drop("median_gross_local")

    # 3️⃣ Fallback imputation by building_class_category only
    window_class = Window.partitionBy("building_class_category")
    df = df.withColumn(
        "median_gross_category",
        expr("percentile_approx(gross_square_feet, 0.5)").over(window_class)
    )
    df = df.withColumn(
        "gross_square_feet",
        when(col("gross_square_feet").isNotNull(), col("gross_square_feet"))
        .otherwise(col("median_gross_category"))
    ).drop("median_gross_category")

    return df


In [None]:
df_processed = impute_gross_square_feet(df_processed)

In [None]:
null_count = df_processed.select([count(when(col(c).isNull(), c)).alias(c) for c in df_processed.columns])
null_count.show()

+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+-----------------------+----------------------+
|borough|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|year_built|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|gross_sqft_imputed_flag|land_sqft_imputed_flag|
+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+----------+-------------------------+------------------------------+----------+---------+-----------------------+-------

In [None]:
# Step 1: Impute based on (borough, neighborhood, building_class_category)
w1 = Window.partitionBy("borough", "neighborhood", "building_class_category")
df_processed = df_processed.withColumn("year_built_lvl1", round(avg("year_built").over(w1)))

# Step 2: Impute based on (borough, building_class_category)
w2 = Window.partitionBy("borough", "building_class_category")
df_processed = df_processed.withColumn("year_built_lvl2", round(avg("year_built").over(w2)))

# Step 3: Impute based on (building_class_category)
w3 = Window.partitionBy("building_class_category")
df_processed = df_processed.withColumn("year_built_lvl3", round(avg("year_built").over(w3)))

# Step 4: Overall median as fallback
overall_median = df_processed.approxQuantile("year_built", [0.5], 0.01)[0]

# Step 5: Apply hierarchical imputation
df_processed = df_processed.withColumn(
    "year_built_imputed",
    when(col("year_built").isNotNull(), col("year_built"))
    .when(col("year_built_lvl1").isNotNull(), col("year_built_lvl1"))
    .when(col("year_built_lvl2").isNotNull(), col("year_built_lvl2"))
    .when(col("year_built_lvl3").isNotNull(), col("year_built_lvl3"))
)

# Optional: Replace original year_built column
df_processed = df_processed.drop("year_built")

df_processed = df_processed.withColumn(
    "year_built", 
    col("year_built_imputed").cast(IntegerType())
)

# Clean up helper columns
df_processed = df_processed.drop("year_built_lvl1", "year_built_lvl2", "year_built_lvl3")

In [None]:
df_cleaned = df_processed.na.drop(subset=["tax_class_at_present", "building_class_at_present", "zip_code","tax_class_at_time_of_sale", "building_class_at_time_of_sale", "sale_price", "sale_date"])


In [None]:
null_count = df_cleaned.select([count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])
null_count.show()

+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+-------------------------+------------------------------+----------+---------+-----------------------+----------------------+------------------+----------+
|borough|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|gross_sqft_imputed_flag|land_sqft_imputed_flag|year_built_imputed|year_built|
+-------+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+-------------------------+------------------------------+----------+---------+----

In [None]:
df_cleaned.show()

+-------+-------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+-------------------------+------------------------------+----------+----------+-----------------------+----------------------+------------------+----------+
|borough| neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|             address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|gross_sqft_imputed_flag|land_sqft_imputed_flag|year_built_imputed|year_built|
+-------+-------------+-----------------------+--------------------+-----+---+-------------------------+--------------------+--------+-----------------+----------------+-----------+----------------+-----------------+-------------------------+------------

In [None]:
df_cleaned.printSchema()

root
 |-- borough: double (nullable = true)
 |-- neighborhood: string (nullable = true)
 |-- building_class_category: string (nullable = true)
 |-- tax_class_at_present: string (nullable = true)
 |-- block: integer (nullable = true)
 |-- lot: integer (nullable = true)
 |-- building_class_at_present: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- residential_units: integer (nullable = true)
 |-- commercial_units: string (nullable = true)
 |-- total_units: integer (nullable = false)
 |-- land_square_feet: double (nullable = true)
 |-- gross_square_feet: double (nullable = true)
 |-- tax_class_at_time_of_sale: integer (nullable = true)
 |-- building_class_at_time_of_sale: string (nullable = true)
 |-- sale_price: double (nullable = true)
 |-- sale_date: string (nullable = true)
 |-- gross_sqft_imputed_flag: boolean (nullable = false)
 |-- land_sqft_imputed_flag: boolean (nullable = false)
 |-- year_built_imputed: double (nulla

## Fix Borough name

In [None]:
df_cleaned= df_cleaned.withColumn("borough_name",
    when(col("borough") == 1.0, "MANHATTAN")
    .when(col("borough") == 2.0, "BRONX")
    .when(col("borough") == 3.0, "BROOKLYN")
    .when(col("borough") == 4.0, "QUEENS")
    .when(col("borough") == 5.0, "STATEN ISLAND")
    .otherwise("UNKNOWN")
).drop("borough","year_built_imputed","land_sqft_imputed_flag","gross_sqft_imputed_flag")

In [None]:
df_cleaned.show(5)

+-------------+-----------------------+--------------------+-----+---+-------------------------+-------------+--------+-----------------+----------------+-----------+----------------+-----------------+-------------------------+------------------------------+----------+----------+----------+------------+
| neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|      address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price| sale_date|year_built|borough_name|
+-------------+-----------------------+--------------------+-----+---+-------------------------+-------------+--------+-----------------+----------------+-----------+----------------+-----------------+-------------------------+------------------------------+----------+----------+----------+------------+
|ALPHABET CITY|   02 TWO FAMILY DWE...|                   1|  377|  1|               

In [None]:
null_count = df_cleaned.select([count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])
null_count.show()

+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+-------------------------+------------------------------+----------+---------+----------+------------+
|neighborhood|building_class_category|tax_class_at_present|block|lot|building_class_at_present|address|zip_code|residential_units|commercial_units|total_units|land_square_feet|gross_square_feet|tax_class_at_time_of_sale|building_class_at_time_of_sale|sale_price|sale_date|year_built|borough_name|
+------------+-----------------------+--------------------+-----+---+-------------------------+-------+--------+-----------------+----------------+-----------+----------------+-----------------+-------------------------+------------------------------+----------+---------+----------+------------+
|           0|                      0|                   0|    0|  0|                        0|      0|      

In [None]:
df_cleaned.write.mode("overwrite").csv("hdfs://namenode:8020/data/silver/nyc_silver.csv")


In [None]:
df_cleaned.printSchema()

: 