# Housing Dataset (Using PySparks)

## Bronze -> Silver

In [1]:
from pyspark.sql import SparkSession
from pprint import pprint
import os
os.environ["JAVA_HOME"] = "/nix/store/pj3ni0dwh1j63wxac3aj9jn61047c8ia-openjdk-headless-11.0.23+9/lib/openjdk"

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("HoustonHousingData") \
    .config("spark.sql.debug.maxToStringFields", 4000) \
    .getOrCreate()



24/10/04 21:23:59 WARN Utils: Your hostname, morokei resolves to a loopback address: 127.0.0.2; using 192.168.68.64 instead (on interface wlo1)
24/10/04 21:23:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/04 21:23:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/04 21:24:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


The dataset contains nested fields. Part of the transformations applied to make the silver dataset we need to flatten these out.

There are various ways we can do this, but the most straightforward one is to use pandas to normalize the json file before using it with pyspark

In [2]:
# Load the data with pandas and normalize it
import pandas as pd
bronze_df_pandas = pd.read_json("bronze/houston_housing market 2024_light.json")

In [3]:
import pandas as pd

# Function to flatten a nested JSON object
def flatten_json(y):
    out = {}

    def flatten(x, name=''):
        if isinstance(x, dict):
            # If the value is a dictionary, recurse deeper
            for a in x:
                flatten(x[a], name + a + '_')
        elif isinstance(x, list):
            # If the value is a list, iterate and recurse
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            # If the value is a primitive (not a dict or list), add to output
            out[name[:-1]] = x

    flatten(y)
    return out

bronze_df_pandas_flattened = pd.DataFrame([flatten_json(row) for row in bronze_df_pandas.to_dict(orient="records")])

bronze_df_pandas_flattened.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 25948 entries, 0 to 25947
Columns: 344 entries, zpid to carouselPhotos_213_url
dtypes: bool(22), float64(45), int64(9), object(268)
memory usage: 64.3+ MB


In [7]:
for col in bronze_df_pandas_flattened.columns:
    bronze_df_pandas_flattened[col] = pd.to_numeric(bronze_df_pandas_flattened[col], errors='ignore')


  bronze_df_pandas_flattened[col] = pd.to_numeric(bronze_df_pandas_flattened[col], errors='ignore')


In [8]:
# Convert pandas DataFrame back to Spark DataFrame
silver_df_spark = spark.createDataFrame(bronze_df_pandas_flattened)


In [9]:
silver_df_spark.show(5)


24/10/04 21:37:30 WARN TaskSetManager: Stage 0 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+---------+---------+---------------+---------------------------+--------------------+--------+--------------------+----------+--------------+---------------+--------+----------------+--------------------+--------------------+-----------+------------+--------------+--------------------+----+-----+------+----------------+-----------------+-------------+-----------------+-----------------+---------------------+------------------------------+------------------------+---------------------+----------------------+-------------------------+--------------------------+----------------------+--------------------------+-------------------------+---------------------------+-------------------------+---------------------------+-----------------------------+---------------------------+--------------------------------+--------------------------+------------------------------+-----------------------------------------+-----------------------------+----------------------------------------+--------------

Check for missing values.
In case there are string values that write NaN, we replace them with NULL (since they won't be counted in either check we do)

In [19]:
from pyspark.sql.functions import when, col

for c in silver_df_spark.columns:
    if silver_df_spark.schema[c].dataType.simpleString() == 'string':
        silver_df_spark = silver_df_spark.withColumn(c, when(col(c) == "NaN", None).otherwise(col(c)))


In [20]:
from pyspark.sql.functions import col, sum

# Count NULL values for each column
null_counts = silver_df_spark.select([sum(col(c).isNull().cast("int")).alias(c) for c in silver_df_spark.columns])
null_counts.show()


24/10/04 21:46:49 WARN TaskSetManager: Stage 22 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.

+----+---+---------------+---------------------------+------+--------+---------+----------+----------+---------------+-----+----------------+-------+-------------+-----------+------------+--------------+--------------------+----+-----+----+----------------+-----------------+-------------+-----------------+-----------------+---------------------+------------------------------+------------------------+---------------------+----------------------+-------------------------+--------------------------+----------------------+--------------------------+-------------------------+---------------------------+-------------------------+---------------------------+-----------------------------+---------------------------+--------------------------------+--------------------------+------------------------------+-----------------------------------------+-----------------------------+----------------------------------------+---------------------------------+----------------------------+----------------

                                                                                

In [17]:

from pyspark.sql.functions import isnan

# Count NaN values for each column (only applicable to numeric columns)
nan_counts = silver_df_spark.select([sum(isnan(c).cast("int")).alias(c) for c in silver_df_spark.columns if silver_df_spark.schema[c].dataType.simpleString() in ['double', 'float']])
nan_counts.show()


24/10/04 21:43:38 WARN TaskSetManager: Stage 19 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
[Stage 19:====>                                                   (1 + 11) / 12]

+--------+----+-----+----+----------------+-----------------+-------------------------+--------------------------+--------------------------+-------------------------+---------------------------+--------------------------+------------------------------+-----------------------------------------+---------------------------------+-----------------------------+---------+------------+-----------------+-------------------+---------------------------------+----------------------------+------------------------+-------------------------+----------------------------------------------+----------------------------------------------------------------------+--------------------------------------------------------------------+--------------------------------------------+----------------------------------+----------------------------------------------------------------------+--------------------------------------------------------------------+-----------------------------------------------------------

                                                                                

In [21]:
total_rows = silver_df_spark.count()
null_counts_df = null_counts.toPandas().T.reset_index()  # Transpose to make columns as rows
null_counts_df.columns = ['Column', 'Null Count']  # Rename columns

nan_counts_df = nan_counts.toPandas().T.reset_index()  # Transpose to make columns as rows
nan_counts_df.columns = ['Column', 'NaN Count']  # Rename columns

summary_df = pd.merge(null_counts_df, nan_counts_df, on='Column', how='outer').fillna(0)

summary_df['Null Percentage'] = (summary_df['Null Count'] / total_rows) * 100
summary_df['NaN Percentage'] = (summary_df['NaN Count'] / total_rows) * 100


24/10/04 21:48:48 WARN TaskSetManager: Stage 25 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 21:48:51 WARN TaskSetManager: Stage 28 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 21:48:55 WARN TaskSetManager: Stage 31 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [25]:
summary_df.sort_values('Null Percentage', ascending=False, inplace=True)
summary_df

Unnamed: 0,Column,Null Count,NaN Count,Null Percentage,NaN Percentage
138,carouselPhotos_213_url,25947,0.0,99.996146,0.000000
136,carouselPhotos_211_url,25947,0.0,99.996146,0.000000
305,info2String,25947,0.0,99.996146,0.000000
137,carouselPhotos_212_url,25947,0.0,99.996146,0.000000
130,carouselPhotos_206_url,25946,0.0,99.992292,0.000000
...,...,...,...,...,...
337,variableData,0,25948.0,0.000000,100.000000
339,variableData_data_isRead,0,25948.0,0.000000,100.000000
338,variableData_data_isFresh,0,25749.0,0.000000,99.233082
342,zestimate,0,4427.0,0.000000,17.061045


In [26]:
summary_df.sort_values('NaN Percentage', ascending=False, inplace=True)
summary_df

Unnamed: 0,Column,Null Count,NaN Count,Null Percentage,NaN Percentage
339,variableData_data_isRead,0,25948.0,0.000000,100.000000
337,variableData,0,25948.0,0.000000,100.000000
11,carouselPhotos,0,25948.0,0.000000,100.000000
257,hdpData_homeInfo_listing_sub_type_is_forAuction,0,25945.0,0.000000,99.988438
250,hdpData_homeInfo_isRentalWithBasePrice,0,25945.0,0.000000,99.988438
...,...,...,...,...,...
25,carouselPhotos_111_url,25852,0.0,99.630029,0.000000
22,carouselPhotos_109_url,25849,0.0,99.618468,0.000000
21,carouselPhotos_108_url,25846,0.0,99.606906,0.000000
20,carouselPhotos_107_url,25844,0.0,99.599198,0.000000


A lot of columns are redundant due to very high numbers of NaN or NULL values. Dropping all columns that surpass 80% NULL/NaN rate.

In [27]:
# Write a list of the columns with more than 80% NULL values
columns_to_drop = summary_df[summary_df['Null Percentage'] > 80]['Column'].tolist()
# Add columns with more than 80% NaN values
columns_to_drop.extend(summary_df[summary_df['NaN Percentage'] > 80]['Column'].tolist())

# Drop the columns
silver_df_spark = silver_df_spark.drop(*columns_to_drop)


In [30]:
silver_df_spark.columns

['zpid',
 'id',
 'rawHomeStatusCd',
 'marketingStatusSimplifiedCd',
 'imgSrc',
 'hasImage',
 'detailUrl',
 'statusType',
 'statusText',
 'countryCurrency',
 'price',
 'unformattedPrice',
 'address',
 'addressStreet',
 'addressCity',
 'addressState',
 'addressZipcode',
 'isUndisclosedAddress',
 'beds',
 'baths',
 'area',
 'latLong_latitude',
 'latLong_longitude',
 'isZillowOwned',
 'variableData_type',
 'variableData_text',
 'hdpData_homeInfo_zpid',
 'hdpData_homeInfo_streetAddress',
 'hdpData_homeInfo_zipcode',
 'hdpData_homeInfo_city',
 'hdpData_homeInfo_state',
 'hdpData_homeInfo_latitude',
 'hdpData_homeInfo_longitude',
 'hdpData_homeInfo_price',
 'hdpData_homeInfo_bathrooms',
 'hdpData_homeInfo_bedrooms',
 'hdpData_homeInfo_livingArea',
 'hdpData_homeInfo_homeType',
 'hdpData_homeInfo_homeStatus',
 'hdpData_homeInfo_daysOnZillow',
 'hdpData_homeInfo_isFeatured',
 'hdpData_homeInfo_shouldHighlight',
 'hdpData_homeInfo_zestimate',
 'hdpData_homeInfo_rentZestimate',
 'hdpData_homeInfo

There's multiple possible indexes, zpid (the zillow property ID), id, providerListingId and hdpData_homeInfo_zpid (which may be a duplicate of zpid).
The first thing that needs to be confirmed is whether the hdpData_home_Info versions of some columns are duplicate data or not.

The most important data is the index (we're using zpid), unformattedPrice, beds, baths, area, address

In [33]:
silver_df_spark.count()

24/10/04 22:03:34 WARN TaskSetManager: Stage 41 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

25948

In [32]:
# Drop rows with nulls in critical columns
silver_df_clean_1 = silver_df_spark.dropna(subset=['zpid', 'unformattedPrice', 'beds', 'baths', 'area', 'address'])
silver_df_clean_1.count()

24/10/04 22:03:14 WARN TaskSetManager: Stage 38 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

22683

In [34]:
# Compare values
silver_df_clean_1.select('beds', 'hdpData_homeInfo_bedrooms').show(5)


24/10/04 22:06:46 WARN TaskSetManager: Stage 44 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.


+----+-------------------------+
|beds|hdpData_homeInfo_bedrooms|
+----+-------------------------+
| 4.0|                      4.0|
| 4.0|                      4.0|
| 5.0|                      5.0|
| 5.0|                      5.0|
| 4.0|                      4.0|
+----+-------------------------+
only showing top 5 rows



In [38]:
silver_df_clean_1.select('area', 'hdpData_homeInfo_livingArea').show(5)


24/10/04 22:08:46 WARN TaskSetManager: Stage 45 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.


+------+---------------------------+
|  area|hdpData_homeInfo_livingArea|
+------+---------------------------+
|1943.0|                     1943.0|
|2638.0|                     2638.0|
|3804.0|                     3804.0|
|3042.0|                     3042.0|
|3420.0|                     3420.0|
+------+---------------------------+
only showing top 5 rows



Since we're seeing duplicated data, it might be worth checking all the columns to see if more duplicated ones exist.

In [41]:
from pyspark.sql.functions import col

def find_similar_headers(df):
    columns = [col.lower() for col in df.columns]
    similar_columns = []
    for i in range(len(columns)):
        for j in range(i + 1, len(columns)):
            if columns[i] in columns[j] or columns[j] in columns[i]:
                similar_columns.append((df.columns[i], df.columns[j]))
    return similar_columns

def check_identical_columns(df, column_pairs):
    identical_columns = []
    for col1, col2 in column_pairs:
        if df.filter(col(col1) != col(col2)).count() == 0:
            identical_columns.append((col1, col2))
    return identical_columns

similar_headers = find_similar_headers(silver_df_clean_1)
identical_columns = check_identical_columns(silver_df_clean_1, similar_headers)

if identical_columns:
    print("Identical columns:")
    for col_pair in identical_columns:
        print(f"{col_pair[0]} is identical to {col_pair[1]}")
else:
    print("No identical columns found among similar headers.")


24/10/04 22:14:06 WARN TaskSetManager: Stage 190 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:14:09 WARN TaskSetManager: Stage 193 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:14:11 WARN TaskSetManager: Stage 196 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:14:14 WARN TaskSetManager: Stage 199 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:14:17 WARN TaskSetManager: Stage 202 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:14:20 WARN TaskSetManager: Stage 205 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:14:22 WARN TaskSetManager: Stage 208 contains a task of very large size (10590 KiB). The maximum recommended task size is 10

Identical columns:
zpid is identical to id
zpid is identical to hdpData_homeInfo_zpid
id is identical to hdpData_homeInfo_zpid
price is identical to unformattedPrice
price is identical to hdpData_homeInfo_price
price is identical to hdpData_homeInfo_priceForHDP
price is identical to shouldShowZestimateAsPrice
price is identical to hdpData_homeInfo_datePriceChanged
price is identical to hdpData_homeInfo_priceChange
address is identical to addressZipcode
address is identical to isUndisclosedAddress
area is identical to hdpData_homeInfo_livingArea
area is identical to hdpData_homeInfo_lotAreaUnit
isZillowOwned is identical to hdpData_homeInfo_isZillowOwned
hdpData_homeInfo_price is identical to hdpData_homeInfo_priceForHDP
hdpData_homeInfo_price is identical to hdpData_homeInfo_priceReduction
hdpData_homeInfo_homeStatus is identical to hdpData_homeInfo_homeStatusForHDP
hdpData_homeInfo_zestimate is identical to zestimate
hdpData_homeInfo_isShowcaseListing is identical to isShowcaseListing

                                                                                

A lot of columns are identical to each other. Dropping all but one.

In [63]:
# Drop columns that are identical 

columns_to_keep = ["unformattedPrice", "zpid","address","area","isZillowOwned","hdpData_homeInfo_homeStatus","zestimate","isShowcaseListing","providerListingId"]

# Drop the columns in the pairs that are not in the columns_to_keep list, and keep the rest in the DataFrame
columns_to_drop = [col_pair[1] for col_pair in similar_headers if col_pair[1] not in columns_to_keep]
silver_df_clean_2 = silver_df_clean_1.drop(*columns_to_drop)

silver_df_clean_2.columns


['zpid',
 'rawHomeStatusCd',
 'marketingStatusSimplifiedCd',
 'imgSrc',
 'hasImage',
 'detailUrl',
 'statusType',
 'statusText',
 'countryCurrency',
 'price',
 'unformattedPrice',
 'address',
 'beds',
 'baths',
 'area',
 'latLong_latitude',
 'latLong_longitude',
 'isZillowOwned',
 'variableData_type',
 'variableData_text',
 'hdpData_homeInfo_zipcode',
 'hdpData_homeInfo_city',
 'hdpData_homeInfo_state',
 'hdpData_homeInfo_latitude',
 'hdpData_homeInfo_longitude',
 'hdpData_homeInfo_bathrooms',
 'hdpData_homeInfo_bedrooms',
 'hdpData_homeInfo_homeType',
 'hdpData_homeInfo_homeStatus',
 'hdpData_homeInfo_daysOnZillow',
 'hdpData_homeInfo_isFeatured',
 'hdpData_homeInfo_shouldHighlight',
 'hdpData_homeInfo_zestimate',
 'hdpData_homeInfo_rentZestimate',
 'hdpData_homeInfo_listing_sub_type_is_FSBA',
 'hdpData_homeInfo_isUnmappable',
 'hdpData_homeInfo_isPreforeclosureAuction',
 'hdpData_homeInfo_timeOnZillow',
 'hdpData_homeInfo_isNonOwnerOccupied',
 'hdpData_homeInfo_isPremierBuilder',
 'h

Deal with other missing data by just filling with placeholders.

In [64]:
from pyspark.sql.functions import mean, col

# Fill missing values in string columns with 'Not Provided'
string_columns = [column for column in silver_df_clean_2.columns if silver_df_clean_2.schema[column].dataType.simpleString() == 'string']
silver_df_clean_2 = silver_df_clean_2.fillna('Not Provided', subset=string_columns)

# Fill missing values in numeric columns with the mean value
numeric_columns = [column for column in silver_df_clean_2.columns if silver_df_clean_2.schema[column].dataType.simpleString() in ['double', 'float']]
for column in numeric_columns:
    mean_value = silver_df_clean_2.select(mean(col(column))).collect()[0][0]
    silver_df_clean_2 = silver_df_clean_2.fillna({column: mean_value})

24/10/04 22:40:31 WARN TaskSetManager: Stage 322 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:40:34 WARN TaskSetManager: Stage 325 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:40:37 WARN TaskSetManager: Stage 328 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:40:40 WARN TaskSetManager: Stage 331 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:40:43 WARN TaskSetManager: Stage 334 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:40:46 WARN TaskSetManager: Stage 337 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.
24/10/04 22:40:49 WARN TaskSetManager: Stage 340 contains a task of very large size (10590 KiB). The maximum recommended task size is 10

In [72]:
# Drop carouselPhotos columns, hdpData_homeInfo bedrooms and bathrooms columns
columns_to_drop_1 = [column for column in silver_df_clean_2.columns if 'carouselPhotos' in column or column in ['hdpData_homeInfo_bedrooms', 'hdpData_homeInfo_bathrooms']]
silver_df_clean_3 = silver_df_clean_2.drop(*columns_to_drop_1)

# Rename hdpData_homeInfo_ columns to remove the prefix
for column in silver_df_clean_3.columns:
    if 'hdpData_homeInfo_' in column:
        silver_df_clean_3 = silver_df_clean_3.withColumnRenamed(column, column.replace('hdpData_homeInfo_', ''))

columns_to_drop_2 = [
    'rawHomeStatusCd',
    'marketingStatusSimplifiedCd',
    'imgSrc',
    'hasImage',
    'detailUrl',
    'statusType',
    'statusText',
    'countryCurrency',
    'variableData_type',
    'variableData_text',
    'isFeatured',
    'shouldHighlight',
    'isZillowOwned',
    'listing_sub_type_is_FSBA',
    'isUnmappable',
    'isPreforeclosureAuction',
    'isNonOwnerOccupied',
    'isPremierBuilder',
    'isSaved',
    'isUserClaimingOwner',
    'isUserConfirmedClaim',
    'pgapt',
    'sgapt',
    'has3DModel',
    'isHomeRec',
    'hasAdditionalAttributions',
    'isFeaturedListing',
    'isShowcaseListing',
    'relaxed',
    'providerListingId',
    'flexFieldText',
    'flexFieldType',
    'newConstructionType',
]

# Drop columns from the DataFrame
silver_df_clean_4 = silver_df_clean_3.drop(*columns_to_drop_2)

# Drop price and rename unformattedPrice to price
silver_df_clean_4 = silver_df_clean_4.drop('price').withColumnRenamed('unformattedPrice', 'price')

silver_df_clean_4.columns

['zpid',
 'price',
 'address',
 'beds',
 'baths',
 'area',
 'latLong_latitude',
 'latLong_longitude',
 'zipcode',
 'city',
 'state',
 'latitude',
 'longitude',
 'homeType',
 'homeStatus',
 'daysOnZillow',
 'zestimate',
 'rentZestimate',
 'timeOnZillow',
 'currency',
 'country',
 'taxAssessedValue',
 'zestimate',
 'brokerName',
 'builderName',
 'unit']

In [75]:
# Compare latLong_latitude and latLong_longitude with latitude and longitude
silver_df_clean_4.select('latLong_latitude', 'latitude', 'latLong_longitude', 'longitude').show(5)

+----------------+---------+-----------------+----------+
|latLong_latitude| latitude|latLong_longitude| longitude|
+----------------+---------+-----------------+----------+
|       29.865578|29.865578|        -95.19615| -95.19615|
|       29.894386|29.894386|        -95.21654| -95.21654|
|        29.96752| 29.96752|        -95.15717| -95.15717|
|        29.93202| 29.93202|       -95.185394|-95.185394|
|        29.92655| 29.92655|        -95.16345| -95.16345|
+----------------+---------+-----------------+----------+
only showing top 5 rows



24/10/04 22:51:04 WARN TaskSetManager: Stage 370 contains a task of very large size (10590 KiB). The maximum recommended task size is 1000 KiB.


In [76]:
# Drop latLong_latitude and latLong_longitude columns
silver_df_clean_5 = silver_df_clean_4.drop('latLong_latitude', 'latLong_longitude')

Add Price per Square Foot, as a common metric used in housing.

In [77]:
from pyspark.sql.types import DoubleType, FloatType
silver_df_clean_6 = silver_df_clean_5.withColumn('price_per_sqft', (col('price') / col('area')).cast(DoubleType()))
silver_df_clean_6.columns

['zpid',
 'price',
 'address',
 'beds',
 'baths',
 'area',
 'zipcode',
 'city',
 'state',
 'latitude',
 'longitude',
 'homeType',
 'homeStatus',
 'daysOnZillow',
 'zestimate',
 'rentZestimate',
 'timeOnZillow',
 'currency',
 'country',
 'taxAssessedValue',
 'zestimate',
 'brokerName',
 'builderName',
 'unit',
 'price_per_sqft']

In [86]:
# Find duplicate column names
duplicate_columns = [col for col in silver_df_clean_6.columns if silver_df_clean_6.columns.count(col) > 1]
print("Duplicate Columns:", duplicate_columns)

# Drop duplicate 'zestimate' if you don't need both
silver_df_clean_7 = silver_df_clean_6.drop('zestimate')


Duplicate Columns: ['zestimate', 'zestimate']


Standardize the zipcode

In [95]:
from pyspark.sql.functions import lpad
silver_df_clean_8 = silver_df_clean_7.withColumn('zipcode', lpad(col('zipcode'), 5, '0'))

Pysparks likes to save in partitions. Coalescing to one before writing.

In [96]:
silver_df_clean_8.coalesce(1).write.mode('ignore').option("header", "true").csv("silver/houston_housing_market_2024")
# Move the file in the folder to the silver folder with the correct name
import os
os.system("mv silver/houston_housing_market_2024/*.csv silver/houston_housing_market_2024.csv")
os.system("rm -r silver/houston_housing_market_2024")

24/10/04 22:57:32 WARN TaskSetManager: Stage 375 contains a task of very large size (130508 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

0

# Silver -> Gold: Pricing and listings by ZipCode

In [97]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName('HoustonHousingGold').getOrCreate()

# Load the silver dataset (replace 'path_to_silver_dataset.csv' with your actual file path)
silver_df = spark.read.csv('silver/houston_housing_market_2024.csv', header=True, inferSchema=True)


In [98]:
from pyspark.sql.types import DoubleType, IntegerType

silver_df = silver_df.withColumn('price', silver_df['price'].cast(DoubleType()))
silver_df = silver_df.withColumn('area', silver_df['area'].cast(DoubleType()))
silver_df = silver_df.withColumn('price_per_sqft', silver_df['price_per_sqft'].cast(DoubleType()))
silver_df = silver_df.withColumn('days_on_zillow', silver_df['daysOnZillow'].cast(IntegerType()))

In [109]:
from pyspark.sql.functions import avg, count, round

# Group by zipcode and calculate the average price, price_per_sqft, days_on_zillow, and total listings
gold_df = silver_df.groupBy('zipcode').agg(
    round(avg('price'), 2).alias('average_price'),
    round(avg('price_per_sqft'), 2).alias('average_price_per_sqft'),
    round(avg('days_on_zillow'), 2).alias('average_days_on_market'),
    count('*').alias('total_listings')
)

# Add back the city and state columns to the DataFrame from the silver DataFrame
gold_df = gold_df.join(silver_df.select('zipcode','city', 'state').distinct(), 'zipcode')

In [110]:
# Show the first 5 rows of the gold DataFrame sorted by average_price in descending order
gold_df.orderBy('average_price', ascending=False).show(5)

+-------+-------------+----------------------+----------------------+--------------+--------+-----+
|zipcode|average_price|average_price_per_sqft|average_days_on_market|total_listings|    city|state|
+-------+-------------+----------------------+----------------------+--------------+--------+-----+
|  77024|   2706102.39|                419.75|                 64.96|           137| Houston|   TX|
|  77019|   2487279.77|                538.24|                 79.54|           148| Houston|   TX|
|  77005|   1986926.81|                511.86|                 59.12|            78| Houston|   TX|
|  77027|   1523645.22|                419.01|                  65.6|           114| Houston|   TX|
|  77401|   1338937.73|                360.35|                 48.89|            45|Bellaire|   TX|
+-------+-------------+----------------------+----------------------+--------------+--------+-----+
only showing top 5 rows



In [111]:
import os
# Save the gold DataFrame as a CSV file then move it to the gold folder with the correct name
gold_df.coalesce(1).write.mode('ignore').option("header", "true").csv("gold/houston_housing_market_2024")
os.system("mv gold/houston_housing_market_2024/*.csv gold/houston_housing_market_2024.csv")
os.system("rm -r gold/houston_housing_market_2024")

0