In [59]:
import pyspark as ps
import urllib.request
from pyspark.sql.types import *
from pyspark.sql.functions import concat, col, split
import pyspark.sql.functions as F

spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("case study") \
            .getOrCreate()
            
sc = spark.sparkContext

In [190]:
def clean_real_prop_account(path='rawdata/real_property_account.csv'):
    #extracts zipcodes 
    df=spark.read.csv(path, header=True)
    
    df.registerTempTable("prop_account")
    query="SELECT * from prop_account WHERE CityState LIKE '%WA'"
    df=spark.sql(query)
    
    colkey=df.select('ZipCode', concat(col("Major"), col("Minor")))
    
    #make pin column to join df together and match with zipcode
    colkey1=colkey.withColumnRenamed(existing='concat(Major, Minor)', new='pin')
    colkey2=colkey1.withColumn('Zip',rtrim(colkey1['ZipCode']))
    colkey3=colkey2.select('Zip','pin')
    finaldf=colkey3.withColumn('Zip', colkey3.Zip.cast('int'))
    return finaldf

def clean_res_building(path='rawdata/res_building.csv'):
    import pyspark.sql.functions as F
    df1=spark.read.csv(path, header=True)
    colkey=df1.withColumn('Zip', F.rtrim(df1['ZipCode']))
    
    #make pin column to join df together and match with zipcode
    colkey1=colkey.withColumn('pin',concat(col("Major"), col("Minor")))
    dffinal=colkey1.select(['pin','Zip','NbrLivingUnits','SqFtTotLiving','YrBuilt','Bedrooms', 'BathFullCount'])
    dffinal1=dffinal.withColumn("NbrLivingUnits_n", dffinal.NbrLivingUnits.cast("int"))
    dffinal2=dffinal1.withColumn("SqFtTotLiving", dffinal.SqFtTotLiving.cast("int"))
    dffinal3=dffinal2.withColumn("YrBuilt", dffinal.YrBuilt.cast("int"))
    dffinal4=dffinal3.withColumn("Bedrooms", dffinal.Bedrooms.cast("int"))
    dffinal5=dffinal4.withColumn("BathFullCount", dffinal.BathFullCount.cast("int"))
    return dffinal5

def clean_sales_data(path='rawdata/real_property_sales.csv'):
    df = spark.read.csv(path,
                             header=True,
                             quote='"',
                             sep=",",
                             inferSchema=True)
    
    df1=df.withColumn("pin", concat(df.Major, df.Minor))
    #df2=df1.withColumn('Month', split(df.DocumentDate, '/').alias('date').getItem(0).cast("int"))
    #df2=df2.withColumn('Day', split(df.DocumentDate, '/').alias('date').getItem(1).cast("int"))
    df2=df1.withColumn('Year', split(df.DocumentDate, '/').alias('date').getItem(2).cast("int"))

    
    df3=df2.select(['pin','Year','SalePrice'])
    df4=df3.filter((df3.Year>=2005) & (df3.Year<=2015))
    df5=df4.filter(df4.SalePrice>=20000)

    return df5

In [193]:
res_df=clean_res_building()

In [195]:
res_df.show(3)

+----------+-----+--------------+-------------+-------+--------+-------------+----------------+
|       pin|  Zip|NbrLivingUnits|SqFtTotLiving|YrBuilt|Bedrooms|BathFullCount|NbrLivingUnits_n|
+----------+-----+--------------+-------------+-------+--------+-------------+----------------+
|0421049114|98003|             1|          910|   1955|       3|            1|               1|
|0421049129|98003|             1|         1420|   1958|       4|            1|               1|
|0421049137|98003|             1|         1010|   1959|       3|            1|               1|
+----------+-----+--------------+-------------+-------+--------+-------------+----------------+
only showing top 3 rows



In [191]:
rp_df=clean_real_prop_account()

In [192]:
rp_df.show(3)

+-----+----------+
|  Zip|       pin|
+-----+----------+
|98002|0000800027|
|98071|0000800047|
|98002|0001000035|
+-----+----------+
only showing top 3 rows



In [199]:
feat_df=res_df.join(rp_df, on='pin')

In [201]:
feat_df.show(3)

+----------+-----+--------------+-------------+-------+--------+-------------+----------------+-----+
|       pin|  Zip|NbrLivingUnits|SqFtTotLiving|YrBuilt|Bedrooms|BathFullCount|NbrLivingUnits_n|  Zip|
+----------+-----+--------------+-------------+-------+--------+-------------+----------------+-----+
|0006200017|98032|             1|         1340|   1945|       3|            1|               1|98032|
|0006200017|98032|             1|         1340|   1945|       3|            1|               1|98032|
|0006600059|98032|             1|          840|   1940|       2|            1|               1|98032|
+----------+-----+--------------+-------------+-------+--------+-------------+----------------+-----+
only showing top 3 rows



In [202]:
feat_df.cache()

DataFrame[pin: string, Zip: string, NbrLivingUnits: string, SqFtTotLiving: int, YrBuilt: int, Bedrooms: int, BathFullCount: int, NbrLivingUnits_n: int, Zip: int]

In [203]:
sales_df=clean_sales_data()

In [204]:
sales_df.show(3)

+----------+----+---------+
|       pin|Year|SalePrice|
+----------+----+---------+
|1388600110|2014|   245000|
|3303951610|2012|   335000|
|6385800110|2015|   190000|
+----------+----+---------+
only showing top 3 rows

