In [47]:
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'

In [48]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Zillow_dataset").enableHiveSupport().getOrCreate()

sc = spark.sparkContext

In [49]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema_z = StructType([
    StructField("Zpid", StringType(), True),        
    StructField("Price", StringType(), True),       
    StructField("PriceLabel", StringType(), True),
    StructField("Address", StringType(), True),
    StructField("Bedrooms", DoubleType(), True),    
    StructField("Bathrooms", IntegerType(), True),
    StructField("LivingArea", StringType(), True),  
    StructField("Latitude", DoubleType(), True),    
    StructField("Longitude", DoubleType(), True),
    StructField("Zipcode", StringType(), True),    
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("LotArea", StringType(), True),
    StructField("LotAreaUnit", StringType(), True),
    StructField("Country", StringType(), True)
])

In [50]:
df = spark.read.csv('file:///home/talentum/shared/project_dataset/Dataset/combined_dataset.csv',\
                    header=True, schema = schema_z)

In [51]:
df.printSchema()

root
 |-- Zpid: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- PriceLabel: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- Bedrooms: double (nullable = true)
 |-- Bathrooms: integer (nullable = true)
 |-- LivingArea: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LotArea: string (nullable = true)
 |-- LotAreaUnit: string (nullable = true)
 |-- Country: string (nullable = true)



Data Pre-Processing

In [52]:
import pyspark.sql.functions as F

df = df.withColumn("LotArea", F.regexp_replace(F.col("LotArea"), "[^0-9.]", "").cast("double"))
df = df.withColumn("LivingArea", F.regexp_replace(F.col("LivingArea"), "[^0-9.]", "").cast("double"))

Handling Null values

In [53]:
null_count = df.filter(df["LivingArea"].isNull() & df["LotArea"].isNull()).count()
null_count

4

In [54]:
df.count()

1186

In [55]:
df = df.dropna(how='all',subset=["LivingArea","LotArea"])
df.count()

1182

In [56]:
df.filter((df["LivingArea"]== 0) & (df["LotArea"]== 0)).count()

0

Price Handling

In [57]:
df.filter(df["Price"].isNull()).count()

2

In [58]:
df = df.dropna(subset =['Price'])
df.count()

1180

In [59]:
df = df.withColumn("Price", 
    F.when(F.col("Price").rlike("[Kk]"), 
           (F.regexp_replace(F.col("Price"), "[^0-9.]", "").cast("double") * 1000).cast("long"))
    .when(F.col("Price").rlike("[Mm]"), 
           (F.regexp_replace(F.col("Price"), "[^0-9.]", "").cast("double") * 1000000).cast("long"))
    .otherwise(F.regexp_replace(F.col("Price"), "[^0-9]", "").cast("long")))
df.show(5)

+---------+------+----------+--------------------+--------+---------+----------+---------+---------+-------+------------+-----+-------+-----------+-------+
|     Zpid| Price|PriceLabel|             Address|Bedrooms|Bathrooms|LivingArea| Latitude|Longitude|Zipcode|        City|State|LotArea|LotAreaUnit|Country|
+---------+------+----------+--------------------+--------+---------+----------+---------+---------+-------+------------+-----+-------+-----------+-------+
|109496581|120000|     $120K|1153 County Road ...|     3.0|        2|    1680.0|34.191063|-86.76373|  35058|     Cullman|   AL|    3.3|      acres|    USA|
|    90299|260000|     $260K|7950 Joy Rd, Blou...|     4.0|        3|    2292.0|34.061863|-86.59546|  35031|Blountsville|   AL|   21.1|      acres|    USA|
|   166673|160000|     $160K|500 30th St N, Pe...|     3.0|        1|    1333.0|33.593388|-86.27587|  35125|   Pell City|   AL|   0.26|      acres|    USA|
|   171603|820000|     $820K|200 River Oaks Ci...|     3.0|     

In [60]:
zero_count = df.filter(F.col("Price") == 0).count()
zero_count

6

In [61]:
df = df.filter(F.col("Price") != "0")
df.filter(F.col("Price") == 0).count()

0

In [62]:
#df = df.withColumn('Bathrooms',floor(col('Bathrooms')).cast('int'))
#df.show(5)

LotArea Handling

In [63]:
df = df.fillna({"LotAreaUnit": "acres"})
df.select("LotAreaUnit").distinct().show()

+-----------+
|LotAreaUnit|
+-----------+
|       sqft|
|Square Feet|
|      Acres|
|      acres|
+-----------+



In [64]:
null_count_lot = df.filter(df["LotArea"].isNull()).count()
null_count_lot

99

In [65]:
df.filter(df["LotArea"]== 0).count()

0

In [66]:
df = df.withColumn('LotArea',F.when(F.col('LotArea').isNull(),F.col('LivingArea') + (0.2*F.col('LivingArea')))
                   .otherwise(F.col('LotArea')))
df.filter(df["LotArea"].isNull()).count()

0

In [67]:
df = df.withColumn("LotArea",F.when(F.col("LotAreaUnit").isin("acres", "Acres") & F.col("LotArea").isNotNull(),
        F.round(43560 * F.col("LotArea"), 2)).otherwise(F.col("LotArea")))
df.show(5)

+---------+------+----------+--------------------+--------+---------+----------+---------+---------+-------+------------+-----+--------+-----------+-------+
|     Zpid| Price|PriceLabel|             Address|Bedrooms|Bathrooms|LivingArea| Latitude|Longitude|Zipcode|        City|State| LotArea|LotAreaUnit|Country|
+---------+------+----------+--------------------+--------+---------+----------+---------+---------+-------+------------+-----+--------+-----------+-------+
|109496581|120000|     $120K|1153 County Road ...|     3.0|        2|    1680.0|34.191063|-86.76373|  35058|     Cullman|   AL|143748.0|      acres|    USA|
|    90299|260000|     $260K|7950 Joy Rd, Blou...|     4.0|        3|    2292.0|34.061863|-86.59546|  35031|Blountsville|   AL|919116.0|      acres|    USA|
|   166673|160000|     $160K|500 30th St N, Pe...|     3.0|        1|    1333.0|33.593388|-86.27587|  35125|   Pell City|   AL| 11325.6|      acres|    USA|
|   171603|820000|     $820K|200 River Oaks Ci...|     3.0

In [68]:
df = df.withColumn("LotArea",F.when((F.col("Bedrooms") == 1) & (F.col("Bathrooms").isin(1,0)),
                                    F.col('LivingArea')+0.1*F.col('LivingArea')).otherwise(F.col("LotArea")))

Living Area Handling

In [69]:
null_count_living = df.filter(df["LivingArea"].isNull()).count()
null_count_living

3

In [70]:
df = df.withColumn('LivingArea',F.when(F.col('LivingArea').isNull(),
                                    F.col('LotArea')-(0.2*F.col('LotArea'))).otherwise(F.col('LivingArea')))
df.filter(df["LivingArea"].isNull()).count()

0

In [71]:
df.filter(df["LivingArea"]==0).count()

14

In [72]:
df = df.filter(~((F.col("LivingArea") == 0) & (F.col("LotArea") == 0)))

In [73]:
df = df.withColumn('LivingArea',F.when(F.col('LivingArea') == 0,
                                    F.col('LotArea')-(0.05*F.col('LotArea'))).otherwise(F.col('LivingArea')))
df.filter(df["LivingArea"]==0).count()

0

In [74]:
df.count()

1163

In [75]:
df.selectExpr(
    *[f"sum(case when {c} is null then 1 else 0 end) as {c}" for c in df.columns]
).show()

+----+-----+----------+-------+--------+---------+----------+--------+---------+-------+----+-----+-------+-----------+-------+
|Zpid|Price|PriceLabel|Address|Bedrooms|Bathrooms|LivingArea|Latitude|Longitude|Zipcode|City|State|LotArea|LotAreaUnit|Country|
+----+-----+----------+-------+--------+---------+----------+--------+---------+-------+----+-----+-------+-----------+-------+
|   0|    0|       601|      0|       2|        2|         0|       3|        3|      0|   0|    0|      0|          0|      0|
+----+-----+----------+-------+--------+---------+----------+--------+---------+-------+----+-----+-------+-----------+-------+



In [76]:
df = df.fillna({"Bathrooms": 0})
df = df.fillna({"Bedrooms": 0})

Handling of 0s in bedroom and bathroom

In [77]:
df.filter(df["Bedrooms"]== 0).count()

6

In [78]:
from pyspark.sql.functions import desc

mode_val = df.groupBy("Bedrooms").count().orderBy(desc("count")).first()[0]
mode_val

3.0

In [79]:
df = df.withColumn('Bedrooms',F.when(F.col('Bedrooms') == 0,mode_val).otherwise(F.col("Bedrooms")))
df.filter(df["Bedrooms"]== 0).count()

0

In [80]:
df.filter(df["Bathrooms"]== 0).count()

11

In [81]:
mode_val = df.groupBy("Bathrooms").count().orderBy(desc("count")).first()[0]

df = df.withColumn('Bathrooms',F.when(F.col('Bathrooms') == 0,mode_val).otherwise(F.col("Bathrooms")))
df.filter(df["Bathrooms"]== 0).count()

0

In [82]:
df.selectExpr(
    *[f"sum(case when {c} is null then 1 else 0 end) as {c}" for c in df.columns]
).show()

+----+-----+----------+-------+--------+---------+----------+--------+---------+-------+----+-----+-------+-----------+-------+
|Zpid|Price|PriceLabel|Address|Bedrooms|Bathrooms|LivingArea|Latitude|Longitude|Zipcode|City|State|LotArea|LotAreaUnit|Country|
+----+-----+----------+-------+--------+---------+----------+--------+---------+-------+----+-----+-------+-----------+-------+
|   0|    0|       601|      0|       0|        0|         0|       3|        3|      0|   0|    0|      0|          0|      0|
+----+-----+----------+-------+--------+---------+----------+--------+---------+-------+----+-----+-------+-----------+-------+



Writing to csv file

In [84]:
df.write.option("header",True).csv("file:///home/talentum/Project Workspace/zillow_data")