# Part 1. Spark Setup
Create a spark session and the UDFs required to parse the data into the income/population proxies

**Libraries Required**
- [PySpark](https://spark.apache.org/docs/latest/api/python/)
- [uszipcode](https://pypi.org/project/uszipcode/)

You have to extract the simple_db.sqlite database ([available here](https://github.com/MacHu-GWU/uszipcode-project/releases/download/1.0.1.db/simple_db.sqlite)) and place in a data folder along with the datasets.

In [1]:
import os
# get the /data folder in the project (not tracked by git)
datapath = os.path.join(os.path.abspath(os.path.join(os.getcwd(), os.pardir)), 'data/')
# name of the sqlite jdbc driver jar stored in /data
dbjar = 'sqlite-jdbc-3.39.4.0.jar'

In [2]:
from pyspark.sql import SparkSession
# change to spark.jars to your own address
# Create a spark session to process the data
spark = SparkSession.builder.config("spark.driver.memory", "8g")\
    .config("spark.jars", os.path.join(datapath, dbjar))\
    .master("local[*]").getOrCreate()  # running at :4040


In [3]:
sqlite_db = os.path.join(datapath, 'simple_db.sqlite')
location_df = spark.read.format("JDBC")\
    .option("url", f"jdbc:sqlite:{sqlite_db}")\
    .option("dbtable", "simple_zipcode")\
    .option("driver", "org.sqlite.JDBC")\
    .load()

In [4]:
location_df.createOrReplaceTempView("zip_data_tbl")

In [5]:
spark.udf.register('zip_converter', lambda address: address.split()[-1]) # splits the address string in realtor_data and selects the zip code

<function __main__.<lambda>(address)>

### Part 2. Read data from the rental housing dataset

Source: [Kaggle(austinreese)](https://www.kaggle.com/datasets/austinreese/usa-housing-listings)

In [6]:
df = spark.read.csv("../data/housing.csv", header=True)

In [7]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- url: string (nullable = true)
 |-- region: string (nullable = true)
 |-- region_url: string (nullable = true)
 |-- price: string (nullable = true)
 |-- type: string (nullable = true)
 |-- sqfeet: string (nullable = true)
 |-- beds: string (nullable = true)
 |-- baths: string (nullable = true)
 |-- cats_allowed: string (nullable = true)
 |-- dogs_allowed: string (nullable = true)
 |-- smoking_allowed: string (nullable = true)
 |-- wheelchair_access: string (nullable = true)
 |-- electric_vehicle_charge: string (nullable = true)
 |-- comes_furnished: string (nullable = true)
 |-- laundry_options: string (nullable = true)
 |-- parking_options: string (nullable = true)
 |-- image_url: string (nullable = true)
 |-- description: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- long: string (nullable = true)
 |-- state: string (nullable = true)



In [8]:
df.createOrReplaceTempView("rentals") # load the dataframe as a queryable table

In [9]:
new_rent_df = spark.sql("""
WITH rental_data AS (
    SELECT 
        ROW_NUMBER() OVER (ORDER BY (SELECT 1)) AS number,
        DOUBLE(sqfeet) AS sqfeet,
        DOUBLE(beds) AS beds, 
        DOUBLE(baths) AS baths,
        DOUBLE(price) AS price,
        DOUBLE(lat) AS lat, 
        DOUBLE(long) AS long
    FROM rentals
    WHERE 
        sqfeet IS NOT NULL AND 
        beds IS NOT NULL AND 
        baths IS NOT NULL AND 
        price IS NOT NULL AND
        lat IS NOT NULL AND
        long IS NOT NULL 
)
SELECT
    sqfeet,
    beds,
    baths,
    price,
    population_density AS density,
    median_household_income AS income,
    post_office_city AS label
FROM(
    SELECT 
        A.number, 
        A.sqfeet, 
        A.beds, 
        A.baths, 
        A.price, 
        A.lat, 
        A.long,
        B.population_density,
        B.median_household_income,
        B.post_office_city,
        B.state,
        RANK () OVER ( 
            PARTITION BY A.number
            ORDER BY 
                SQRT(
                    POWER((B.bounds_north + B.bounds_south)/2-A.lat, 2) + 
                    POWER((B.bounds_west + B.bounds_east)/2-A.long, 2)
                )
            ASC
        ) AS rank 
    FROM 
        rental_data AS A,
        zip_data_tbl AS B
    WHERE 
        A.lat BETWEEN B.bounds_south AND B.bounds_north
        AND A.long BETWEEN B.bounds_west AND B.bounds_east
        AND B.population_density IS NOT NULL
        AND B.median_household_income IS NOT NULL
)
WHERE rank = 1

""")

In [10]:
new_rent_df.printSchema()

root
 |-- sqfeet: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- baths: double (nullable = true)
 |-- price: double (nullable = true)
 |-- density: float (nullable = true)
 |-- income: integer (nullable = true)
 |-- label: string (nullable = true)



In [11]:
new_rent_df.cache()

DataFrame[sqfeet: double, beds: double, baths: double, price: double, density: float, income: int, label: string]

In [12]:
new_rent_df.show(5) # will compute the entire table (comment out if you only want computation at the end)

+------+----+-----+------+-------+------+-------------+
|sqfeet|beds|baths| price|density|income|        label|
+------+----+-----+------+-------+------+-------------+
| 739.0| 1.0|  1.0|1177.0| 3741.0| 37110|   Sparks, NV|
| 250.0| 0.0|  1.0| 309.0| 6709.0| 22345|     Reno, NV|
| 900.0| 2.0|  1.0|1195.0| 4058.0| 31081|     Reno, NV|
|1400.0| 3.0|  2.5| 988.0| 1938.0| 85479|Camarillo, CA|
| 270.0| 0.0|  1.0| 249.0| 6709.0| 22345|     Reno, NV|
+------+----+-----+------+-------+------+-------------+
only showing top 5 rows



In [13]:
new_rent_df.count()

368690

In [14]:
new_rent_df.repartition(1)

DataFrame[sqfeet: double, beds: double, baths: double, price: double, density: float, income: int, label: string]

In [15]:
new_rent_df.createOrReplaceTempView('rental_values') # save the final data as a dataframe

### Part 3. Read data from the housing listings dataset

Source: [Kaggle(ahmedshahriarsakib)](https://www.kaggle.com/datasets/ahmedshahriarsakib/usa-real-estate-dataset)

In [16]:
df = spark.read.csv("../data/realtor-data.csv", header=True) 

In [17]:
df.printSchema()

root
 |-- status: string (nullable = true)
 |-- price: string (nullable = true)
 |-- bed: string (nullable = true)
 |-- bath: string (nullable = true)
 |-- acre_lot: string (nullable = true)
 |-- full_address: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- house_size: string (nullable = true)
 |-- sold_date: string (nullable = true)



In [18]:
df.show()

+--------+--------+---+----+--------+--------------------+--------------------+-------------+-----------+--------+----------+---------+
|  status|   price|bed|bath|acre_lot|        full_address|              street|         city|      state|zip_code|house_size|sold_date|
+--------+--------+---+----+--------+--------------------+--------------------+-------------+-----------+--------+----------+---------+
|for_sale|105000.0|3.0| 2.0|    0.12|Sector Yahuecas T...|Sector Yahuecas T...|     Adjuntas|Puerto Rico|   601.0|     920.0|     null|
|for_sale| 80000.0|4.0| 2.0|    0.08|Km 78 9 Carr # 13...|  Km 78 9 Carr # 135|     Adjuntas|Puerto Rico|   601.0|    1527.0|     null|
|for_sale| 67000.0|2.0| 1.0|    0.15|556G 556-G 16 St,...|    556G 556-G 16 St|   Juana Diaz|Puerto Rico|   795.0|     748.0|     null|
|for_sale|145000.0|4.0| 2.0|     0.1|R5 Comunidad El P...|R5 Comunidad El P...|        Ponce|Puerto Rico|   731.0|    1800.0|     null|
|for_sale| 65000.0|6.0| 2.0|    0.05|14 Navarro,

In [19]:
df.createOrReplaceTempView("sales")

In [20]:
new_sale_df = spark.sql(f"""
WITH sales_data AS (
    SELECT 
        DOUBLE(house_size) AS sqfeet,
        DOUBLE(bed) AS beds, 
        DOUBLE(bath) AS baths,
        DOUBLE(price),
        zip_converter(full_address) AS zip
    FROM sales
    WHERE 
        house_size IS NOT NULL AND 
        bed IS NOT NULL AND 
        bath IS NOT NULL AND 
        price IS NOT NULL AND
        full_address IS NOT NULL
)

SELECT 
    A.sqfeet, 
    A.beds, 
    A.baths, 
    A.price, 
    A.zip,
    B.population_density AS density,
    B.median_household_income AS income,
    B.post_office_city AS label
FROM 
    sales_data AS A
    JOIN
    zip_data_tbl AS B
    ON 
    A.zip = B.zipcode
WHERE
    B.population_density IS NOT NULL
    AND B.median_household_income IS NOT NULL
""")

In [21]:
new_sale_df.printSchema()

root
 |-- sqfeet: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- baths: double (nullable = true)
 |-- price: double (nullable = true)
 |-- zip: string (nullable = true)
 |-- density: float (nullable = true)
 |-- income: integer (nullable = true)
 |-- label: string (nullable = true)



In [22]:
new_sale_df.cache()

DataFrame[sqfeet: double, beds: double, baths: double, price: double, zip: string, density: float, income: int, label: string]

In [23]:
new_sale_df.show(5)

+------+----+-----+---------+-----+-------+------+----------+
|sqfeet|beds|baths|    price|  zip|density|income|     label|
+------+----+-----+---------+-----+-------+------+----------+
|4000.0| 4.0|  4.0|1200000.0|02053| 1105.0|106132|Medway, MA|
|3024.0| 3.0|  3.0| 979900.0|02053| 1105.0|106132|Medway, MA|
|1400.0| 3.0|  3.0| 539900.0|02053| 1105.0|106132|Medway, MA|
|2284.0| 4.0|  1.0| 599900.0|02053| 1105.0|106132|Medway, MA|
|3697.0| 4.0|  4.0| 929900.0|02053| 1105.0|106132|Medway, MA|
+------+----+-----+---------+-----+-------+------+----------+
only showing top 5 rows



In [24]:
new_sale_df.count()

579006

In [25]:
new_sale_df.repartition(1)

DataFrame[sqfeet: double, beds: double, baths: double, price: double, zip: string, density: float, income: int, label: string]

In [26]:
new_sale_df.createOrReplaceTempView('sale_values') # will compute the entire table (comment out if you only want computation at the end)

### Part 4. Saving data into parquets to read by pandas
We want to use parquet instead of csv because it is smaller and faster to read, but you lose human readability

In [27]:
# Select the points from the rent database
# I will apply a filter here to limit to a reasonable range 
# because I know the dataset has outliers (from a previous run)
rent_df = spark.sql(f"""
SELECT 
    sqfeet,
    beds, 
    baths,
    income,
    density,
    price
FROM rental_values
WHERE 
    sqfeet <= 5000 AND
    beds <= 8 AND
    baths <= 8 AND
    price BETWEEN 100 AND 4000
""").repartition(1)
rent_df.cache()
rent_df.show(10)

+------+----+-----+------+-------+------+
|sqfeet|beds|baths|income|density| price|
+------+----+-----+------+-------+------+
| 739.0| 1.0|  1.0| 37110| 3741.0|1177.0|
| 250.0| 0.0|  1.0| 22345| 6709.0| 309.0|
| 900.0| 2.0|  1.0| 31081| 4058.0|1195.0|
|1400.0| 3.0|  2.5| 85479| 1938.0| 988.0|
| 270.0| 0.0|  1.0| 22345| 6709.0| 249.0|
|1117.0| 3.0|  2.0| 39912| 4300.0|1132.0|
|1900.0| 3.0|  2.0| 67750|  101.0|2650.0|
| 715.0| 1.0|  1.0| 55177| 2591.0|1495.0|
| 828.0| 2.0|  1.0| 45537| 8121.0|1265.0|
|1183.0| 2.0|  2.0| 60513| 3792.0|1795.0|
+------+----+-----+------+-------+------+
only showing top 10 rows



In [28]:
sale_df = spark.sql(f"""
SELECT 
    sqfeet,
    beds, 
    baths,
    income,
    density,
    price
FROM sale_values
WHERE 
    sqfeet <= 5000 AND
    beds <= 8 AND
    baths <= 8
""").repartition(1)
sale_df.cache()
sale_df.show(10)

+------+----+-----+------+-------+---------+
|sqfeet|beds|baths|income|density|    price|
+------+----+-----+------+-------+---------+
|4000.0| 4.0|  4.0|106132| 1105.0|1200000.0|
|3024.0| 3.0|  3.0|106132| 1105.0| 979900.0|
|1400.0| 3.0|  3.0|106132| 1105.0| 539900.0|
|2284.0| 4.0|  1.0|106132| 1105.0| 599900.0|
|3697.0| 4.0|  4.0|106132| 1105.0| 929900.0|
|4000.0| 4.0|  4.0|106132| 1105.0|1200000.0|
|3024.0| 3.0|  3.0|106132| 1105.0| 979900.0|
|1400.0| 3.0|  3.0|106132| 1105.0| 539900.0|
|2284.0| 4.0|  1.0|106132| 1105.0| 599900.0|
|3697.0| 4.0|  4.0|106132| 1105.0| 929900.0|
+------+----+-----+------+-------+---------+
only showing top 10 rows



In [29]:
# Union together the location data 
# list partition block (state) first for faster compare
loc_df = spark.sql(f"""
(SELECT DISTINCT
    income,
    density,
    label
FROM rental_values)
UNION
(SELECT DISTINCT
    income,
    density,
    label
FROM sale_values)
""").repartition(1)
loc_df.cache()
loc_df.show(10)

+------+-------+-------------------+
|income|density|              label|
+------+-------+-------------------+
|111744|  895.0|       Cheshire, CT|
| 42581| 4513.0|    Casselberry, FL|
| 34283| 2849.0|New Port Richey, FL|
| 40972| 2408.0|       Columbus, GA|
| 51792|  197.0|      Middleton, ID|
| 21737| 7703.0|    Springfield, MA|
| 28929| 3237.0|        Pontiac, MI|
| 51814|  493.0|    Summerville, SC|
| 71628|  804.0|      Escondido, CA|
| 44775|  326.0|         Tulare, CA|
+------+-------+-------------------+
only showing top 10 rows



In [30]:
# Save the data to parquet blocks
# creates separate parts based on the partion value
rent_df.write.mode("overwrite").parquet("./outputs/rental") 
sale_df.write.mode("overwrite").parquet("./outputs/sales") 
loc_df.write.mode("overwrite").parquet("./outputs/location") 

In [31]:
spark.stop() # kill spark when notebook is done