In [1]:
from pyspark.sql.functions import *
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType

In [2]:
def load_data():
    raw_path = 'data/realtor-data.csv'
    
    spark = SparkSession.builder \
        .appName("LocalSparkSession") \
        .master("local[*]") \
        .config("spark.driver.memory", "24g") \
        .config("spark.driver.extraJavaOptions", "-XX:ReservedCodeCacheSize=256m") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    schema = StructType([
        StructField("brokered_by", StringType(), True),
        StructField("status", StringType(), True),
        StructField("price", DoubleType(), True),
        StructField("bed", IntegerType(), True),
        StructField("bath", IntegerType(), True),
        StructField("acre_lot", DoubleType(), True),
        StructField("street", StringType(), True),
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("zip_code", StringType(), True),  
        StructField("house_size", DoubleType(), True),
        StructField("prev_sold_date", StringType(), True) 
    ])

    raw_df = spark.read.csv(raw_path, header = True, schema = schema)
    raw_df.cache()
    
    return raw_df

In [3]:
raw_df = load_data()
raw_df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/10 01:14:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-----------+--------+--------+---+----+--------+---------+----------+-----------+--------+----------+--------------+
|brokered_by|  status|   price|bed|bath|acre_lot|   street|      city|      state|zip_code|house_size|prev_sold_date|
+-----------+--------+--------+---+----+--------+---------+----------+-----------+--------+----------+--------------+
|   103378.0|for_sale|105000.0|  3|   2|    0.12|1962661.0|  Adjuntas|Puerto Rico|   00601|     920.0|          NULL|
|    52707.0|for_sale| 80000.0|  4|   2|    0.08|1902874.0|  Adjuntas|Puerto Rico|   00601|    1527.0|          NULL|
|   103379.0|for_sale| 67000.0|  2|   1|    0.15|1404990.0|Juana Diaz|Puerto Rico|   00795|     748.0|          NULL|
|    31239.0|for_sale|145000.0|  4|   2|     0.1|1947675.0|     Ponce|Puerto Rico|   00731|    1800.0|          NULL|
|    34632.0|for_sale| 65000.0|  6|   2|    0.05| 331151.0|  Mayaguez|Puerto Rico|   00680|      NULL|          NULL|
+-----------+--------+--------+---+----+--------+-------

In [4]:
raw_by_state = raw_df.groupBy("state").count()

raw_by_state.show(54)


+--------------------+------+
|               state| count|
+--------------------+------+
|                Ohio| 59207|
|        Pennsylvania| 78373|
|         Connecticut| 14008|
|             Vermont|  2600|
|         Puerto Rico|  3126|
|      Virgin Islands|   895|
|District of Columbia|  6625|
|            Delaware|  8628|
|       West Virginia| 12309|
|            Missouri| 45145|
|        Rhode Island|  8157|
|             Georgia| 80977|
|            Virginia| 68763|
|             Wyoming|  4039|
|          New Jersey| 48199|
|            Maryland| 46052|
|       Massachusetts| 38041|
|           Louisiana| 25815|
|       New Hampshire|  3642|
|           Tennessee| 40964|
|      South Carolina| 42367|
|          California|227215|
|            New York|103159|
|               Maine|  5065|
|            Colorado| 32293|
|       New Brunswick|     1|
|            Michigan| 42429|
|      North Carolina| 85745|
|             Alabama| 34053|
|            Kentucky| 26316|
|         

In [5]:
def count_missing_values(df: DataFrame):
    missing_counts = {}
    
    for column in df.columns:
        missing_count = df.filter(col(column).isNull()).count()
        missing_counts[column] = missing_count
    
    return missing_counts

In [6]:
raw_missing = count_missing_values(raw_df)
print(raw_missing)



CodeCache: size=262144Kb used=29076Kb max_used=29103Kb free=233067Kb
 bounds [0x0000000104558000, 0x00000001061f8000, 0x0000000114558000]
 total_blobs=11045 nmethods=10088 adapters=869
 compilation: disabled (not enough contiguous free space left)
{'brokered_by': 4533, 'status': 0, 'price': 1541, 'bed': 481317, 'bath': 511771, 'acre_lot': 325589, 'street': 10866, 'city': 1407, 'state': 8, 'zip_code': 299, 'house_size': 568484, 'prev_sold_date': 734297}


In [7]:
def drop_columns(df):
    columns_to_drop = ['prev_sold_date', 'brokered_by', 'street']
    df = raw_df.drop(*columns_to_drop)
    
    return df

In [8]:
def drop_na(df):
    df = df.dropna()
    
    return df

In [9]:
# IQR method
def drop_outliers_1(df):
    numeric_columns = ['bed', 'bath', 'acre_lot', 'house_size']

    # Calculate Q1 (25th percentile) and Q3 (75th percentile) grouped by zip_code
    quantiles_df = df.groupBy('zip_code').agg(
        *[F.expr(f'percentile_approx({col}, 0.25)').alias(f'{col}_Q1') for col in numeric_columns] +
        [F.expr(f'percentile_approx({col}, 0.75)').alias(f'{col}_Q3') for col in numeric_columns]
    )

    # Join the quantiles with the original DataFrame
    joined_df = df.join(quantiles_df, on='zip_code')

    # Calculate IQR and identify outlier bounds
    non_outlier_condition = (
        (F.col('bed') >= F.col('bed_Q1') - 1.5 * (F.col('bed_Q3') - F.col('bed_Q1'))) &
        (F.col('bed') <= F.col('bed_Q3') + 1.5 * (F.col('bed_Q3') - F.col('bed_Q1'))) &
        (F.col('bath') >= F.col('bath_Q1') - 1.5 * (F.col('bath_Q3') - F.col('bath_Q1'))) &
        (F.col('bath') <= F.col('bath_Q3') + 1.5 * (F.col('bath_Q3') - F.col('bath_Q1'))) &
        (F.col('acre_lot') >= F.col('acre_lot_Q1') - 1.5 * (F.col('acre_lot_Q3') - F.col('acre_lot_Q1'))) &
        (F.col('acre_lot') <= F.col('acre_lot_Q3') + 1.5 * (F.col('acre_lot_Q3') - F.col('acre_lot_Q1'))) &
        (F.col('house_size') >= F.col('house_size_Q1') - 1.5 * (F.col('house_size_Q3') - F.col('house_size_Q1'))) &
        (F.col('house_size') <= F.col('house_size_Q3') + 1.5 * (F.col('house_size_Q3') - F.col('house_size_Q1')))
    )

    cleaned_df = joined_df.filter(non_outlier_condition).drop(
        *[f'{col}_Q1' for col in numeric_columns] + [f'{col}_Q3' for col in numeric_columns]
    )

    return cleaned_df

In [10]:
# Z score method
def drop_outliers_2(df):
    numeric_columns = ['bed', 'bath', 'acre_lot', 'house_size']

    # Calculate mean and standard deviation grouped by zip_code
    mean_stddev_df = df.groupBy('zip_code').agg(
        *[F.mean(col).alias(f'{col}_mean') for col in numeric_columns] +
        [F.stddev(col).alias(f'{col}_stddev') for col in numeric_columns]
    )

    # Join the mean and stddev with the original DataFrame
    joined_df = df.join(mean_stddev_df, on='zip_code')

    # Calculate Z-scores and identify outliers based on Z-score threshold (|Z| > 3)
    outlier_threshold = 3

    non_outlier_condition = (
        (F.abs((F.col('bed') - F.col('bed_mean')) / F.col('bed_stddev')) <= outlier_threshold) &
        (F.abs((F.col('bath') - F.col('bath_mean')) / F.col('bath_stddev')) <= outlier_threshold) &
        (F.abs((F.col('acre_lot') - F.col('acre_lot_mean')) / F.col('acre_lot_stddev')) <= outlier_threshold) &
        (F.abs((F.col('house_size') - F.col('house_size_mean')) / F.col('house_size_stddev')) <= outlier_threshold)
    )

    cleaned_df = joined_df.filter(non_outlier_condition).drop(*[f'{col}_mean' for col in numeric_columns] + [f'{col}_stddev' for col in numeric_columns])

    return cleaned_df


In [11]:
df = load_data()
df = drop_columns(df)
df = drop_na(df)
df = drop_outliers_1(df)
df.count()

                                                                                

1073005

In [12]:
distinct_row_count_by_state = df.groupBy("state").count()

distinct_row_count_by_state.show(54)


[Stage 54:>                                                       (0 + 10) / 11]

+--------------------+------+
|               state| count|
+--------------------+------+
|                Utah|  7817|
|              Hawaii|  3281|
|           Minnesota| 26362|
|                Ohio| 31226|
|            Arkansas|  8459|
|              Oregon| 16804|
|               Texas|115246|
|        North Dakota|  1953|
|        Pennsylvania| 40898|
|         Connecticut|  7706|
|             Vermont|   996|
|            Nebraska|  3414|
|              Nevada|  8197|
|         Puerto Rico|  1332|
|          Washington| 37485|
|            Illinois| 36852|
|            Oklahoma| 19853|
|District of Columbia|  2449|
|            Delaware|  4838|
|              Alaska|   648|
|          New Mexico|  9102|
|       West Virginia|  5332|
|            Missouri| 23957|
|        Rhode Island|  5295|
|             Georgia| 40231|
|             Montana|  3918|
|            Virginia| 35942|
|            Michigan| 16976|
|                Guam|    92|
|      North Carolina| 29267|
|         



In [13]:
# df.write.csv("Data/cleaned_data.csv", header=True, mode="overwrite")
df.coalesce(1).write.csv("Data/cleaned_data.csv", header=True, mode="overwrite")

                                                                                

In [17]:
numeric_columns = ['price', 'bed', 'bath', 'acre_lot', 'house_size']

states_df = df.groupBy("state").agg(
    F.count("*").alias("total_listings"),
    *[F.round(F.expr(f'percentile_approx({col}, 0.5)'), 2).alias(f'{col}_median') for col in numeric_columns],
    *[F.round(F.mean(col), 2).alias(f'{col}_mean') for col in numeric_columns]
)

states_df.show()
states_df.coalesce(1).write.csv("Data/aggregated_by_state", header=True, mode="overwrite")

states_df.toPandas().to_csv("Data/aggregated_by_state.csv", index=False)

                                                                                

+--------------------+--------------+------------+----------+-----------+---------------+-----------------+----------+--------+---------+-------------+---------------+
|               state|total_listings|price_median|bed_median|bath_median|acre_lot_median|house_size_median|price_mean|bed_mean|bath_mean|acre_lot_mean|house_size_mean|
+--------------------+--------------+------------+----------+-----------+---------------+-----------------+----------+--------+---------+-------------+---------------+
|                Utah|          7817|    570997.0|         4|          3|           0.17|           2402.0| 780533.29|    3.68|     2.94|         0.37|        2653.42|
|              Hawaii|          3281|    825000.0|         3|          2|           0.44|           1296.0|1246148.97|    2.78|     2.25|         2.26|        1547.74|
|           Minnesota|         26362|    329000.0|         3|          2|           0.22|           2068.0| 379074.34|    3.28|     2.33|          0.7|         

                                                                                

In [19]:
# numeric_columns = ['price', 'bed', 'bath', 'acre_lot', 'house_size']

# # Group by state and zipcode and aggregate
# zipcode_df = df.groupBy("state", "zip_code").agg(
#     F.count("*").alias("total_listings"),
#     *[F.round(F.expr(f'percentile_approx({col}, 0.5)'), 2).alias(f'{col}_median') for col in numeric_columns],
#     *[F.round(F.mean(col), 2).alias(f'{col}_mean') for col in numeric_columns]
# )

# zipcode_df.show()

# # Write the DataFrame to CSV files, partitioned by state
# zipcode_df.coalesce(1).write \
#     .option("header", True) \
#     .mode("overwrite") \
#     .partitionBy("state") \
#     .csv("Data/aggregated_by_zipcode")


numeric_columns = ['price', 'bed', 'bath', 'acre_lot', 'house_size']

# Group by state and zipcode and aggregate
zipcode_df = df.groupBy("state", "zip_code").agg(
    F.count("*").alias("total_listings"),
    *[F.round(F.expr(f'percentile_approx({col}, 0.5)'), 2).alias(f'{col}_median') for col in numeric_columns],
    *[F.round(F.mean(col), 2).alias(f'{col}_mean') for col in numeric_columns]
)

zipcode_df.show()

# Get list of states
states = [row.state for row in zipcode_df.select('state').distinct().collect()]

# Create base directory if it doesn't exist
import os
base_dir = "Data/aggregated_by_zipcode"
os.makedirs(base_dir, exist_ok=True)

# Write separate files for each state with consistent names
for state in states:
    state_df = zipcode_df.filter(F.col("state") == state)
    

    (state_df
        .toPandas()
        .to_csv(f"{base_dir}/{state.lower()}_zipcodes.csv", index=False)
    )

                                                                                

+-------+--------+--------------+------------+----------+-----------+---------------+-----------------+----------+--------+---------+-------------+---------------+
|  state|zip_code|total_listings|price_median|bed_median|bath_median|acre_lot_median|house_size_median|price_mean|bed_mean|bath_mean|acre_lot_mean|house_size_mean|
+-------+--------+--------------+------------+----------+-----------+---------------+-----------------+----------+--------+---------+-------------+---------------+
|Alabama|   35004|            93|    279900.0|         3|          2|            0.2|           1866.0| 291843.25|    3.35|     2.54|         0.25|        1949.15|
|Alabama|   35032|             1|    115000.0|         3|          1|           0.57|           1388.0|  115000.0|     3.0|      1.0|         0.57|         1388.0|
|Alabama|   35033|            12|    325000.0|         4|          3|            2.5|           2312.0| 769058.33|    3.58|      3.0|         2.38|        2970.25|
|Alabama|   3504

                                                                                