In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, DateType, TimestampType
from pyspark.sql.functions import *
import csv

sc = SparkContext("local", "etl")
spark = SparkSession.builder.config("spark.executor.memory", "12g").getOrCreate()

22/04/07 16:32:35 WARN Utils: Your hostname, gosroth resolves to a loopback address: 127.0.1.1; using 192.168.1.242 instead (on interface enp4s0)
22/04/07 16:32:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/07 16:32:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Some basic preprocessing to fix some odd issues in the CSV file.
# Mainly there are quote marks inside quoted text in the long marketing description fields.
# For some reason it looks like instead of an escape character like \ they used two quotation marks in a row
# to indicate a quote.  But pyspark does not process that well when reading in a dataframe and since these
# marketing description fields are multiline it can confuse things if the quotes aren't escaped properly.
#
# Doing this in standard python code would be very slow given the file size.  I probably could use spark if I
# read the file in as an RDD first, applied the regex there, then converted it to a dataframe.  But in the interest
# of keeping things simple I just used sed which actually can process large files like this pretty efficiently
#
# Note I did this on a copy because it modifies the file in place and I wanted to keep the original in case I
# needed it for something but of course this would work on the original too.
#
# 

!sed -i 's/""//g' renthub_data_copy.csv

In [2]:
# Just read in the file as a dataframe here with a fixed schema

schema = StructType([
    StructField("id", IntegerType()),
    StructField("state", StringType()),
    StructField("city", StringType()),
    StructField("zip", StringType()),
    StructField("address", StringType()),
    StructField("property_type", StringType()),
    StructField("beds", IntegerType()),
    StructField("baths", DoubleType()),
    StructField("sqft", IntegerType()),
    StructField("price", IntegerType()),
    StructField("lat", DoubleType()),
    StructField("long", DoubleType()),
    StructField("posted_at", TimestampType()),
    StructField("marketing_desc", StringType()),
    StructField("company", StringType()),
    StructField("scraped_at", TimestampType())
])

df = spark.read.schema(schema).option("multiline", "true").option("escape", "\\").csv("renthub_data_copy.csv", header=True)
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip: string (nullable = true)
 |-- address: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- beds: integer (nullable = true)
 |-- baths: double (nullable = true)
 |-- sqft: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- posted_at: timestamp (nullable = true)
 |-- marketing_desc: string (nullable = true)
 |-- company: string (nullable = true)
 |-- scraped_at: timestamp (nullable = true)



In [3]:
df.show(10)

+---------+-----+---------+-----+-------------------+-------------+----+-----+----+-----+----------+-----------+--------------------+--------------------+--------------------+--------------------+
|       id|state|     city|  zip|            address|property_type|beds|baths|sqft|price|       lat|       long|           posted_at|      marketing_desc|             company|          scraped_at|
+---------+-----+---------+-----+-------------------+-------------+----+-----+----+-----+----------+-----------+--------------------+--------------------+--------------------+--------------------+
|316859180|   IL|  Chicago|60610|      805 N Laselle| Multi-family|   0|  1.0| 584| 1681|  41.89687| -87.632466|2019-06-07 19:04:...|This rental prope...|                null|2019-06-07 19:04:...|
|316859617|   MA|   Boston|02134|1 President Terrace| Multi-family|   1|  1.0|null| 1825|42.3534811|-71.1290716|2019-06-07 19:05:...|1 Bed 1 Bath Spli...|Boardwalk Properties|2019-06-07 19:05:...|
|316859618|   M

In [4]:
# I will filter on the state field to eliminate some stray rows with odd values.  I do allow for both abbreviations
# and full names though.
# Note that I am only using US states here but it would be easy enough to add Canadian provences, etc. to the list

valid_names = []
state_mapping = {}
with open('name-abbr.csv') as csvfile:
    reader = csv.reader(csvfile, delimiter=',', quotechar='"')
    for row in reader:
        valid_names.append(row[0])
        valid_names.append(row[1])
        state_mapping[row[0]] = row[1]
        
df = df.filter(df.state.isin(valid_names) & df.zip.isNotNull() & df.price.isNotNull())
#df.select('state').distinct().show(100)

In [5]:
# Now make UDFs to transform state and zipcode fields
# Any states with a name instead of abbreviation will be converted to the abbreviation
# Any zip codes with a dash will be converted to the simple 5 digit zip

# Note: I didn't end up using the state UDF because I realized I didn't really need it

def convert_state(state):
    if state in state_mapping:
        return state_mapping[state]
    else:
        return state
    
convert_state_udf = udf(lambda z: convert_state(z), StringType())
#df.select(convert_state_udf(col("state")).alias("state")).distinct().show(100)

def convert_zip(zip):
    if zip and "-" in zip:
        return zip.split("-")[0]
    else:
        return zip

convert_zip_udf = udf(lambda z: convert_zip(z), StringType())
#df.select(convert_zip_udf(col("zip")).alias("zip")).distinct().show(100)

In [6]:
# Split listings by month and eliminate duplicates
# Zip, and address fields are used to distinguish actually distinct listings.  

reduced_df = df.select(convert_zip_udf(col("zip")).alias("zip"),
                       'address', 'price', year("posted_at").alias("year"),
                       month("posted_at").alias("month"))
reduced_df = reduced_df.groupBy('zip', 'address', 'year', 'month').avg("price")

In [7]:
# Handle cases where there is insufficient data
# Sufficient wasn't really defined so I'll go with at least 10 listings per month per zip
# First I make a dataframe with counts for each zip for every month, then I'll join that with the main df

min_listings = 10

count_df = reduced_df.groupBy('zip', 'year', 'month').count()
count_df = count_df.filter(col("count") >= min_listings)
# count_df.show(20)

In [8]:
# Now join as a filter for zip codes and months with insufficient entries
reduced_df = reduced_df.join(count_df, ['zip', 'year', 'month'])
#reduced_df.show(10)

In [9]:
# Write output

# Output schema is just:
# zip
# year
# month
# price
#
# More information certainly could have been included (for instance sqft, beds, and baths to know more about the
# particular property) but that's not used in the current analysis and the stated objective was to keep storage
# requirements low

reduced_df.select("zip", "year", "month", col("avg(price)").alias("price")).write.parquet("output_data.parquet")

22/04/07 16:37:47 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                