In [1]:
!pip install pyspark
!pip install findspark



In [2]:
import findspark
findspark.init()

In [3]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import *
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("zillow").getOrCreate()
path="C:/Users/User/OneDrive - aueb.gr/Επιφάνεια εργασίας/Msc/Large scale data managment/zillow.csv" 
df = spark.read.option("header",True) \
     .csv(path)

In [4]:
df.printSchema()

root
 |-- title: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- price: string (nullable = true)
 |-- facts and features: string (nullable = true)
 |-- real estate provider: string (nullable = true)
 |-- url: string (nullable = true)



In [5]:
df=df.select(col("title"),col("address"),col("city"),col("state"),col("postal_code"),col("price"),col("facts and features").alias("fnfs"), \
         col("real estate provider").alias("real_estate_provider"),col("url"))

In [6]:
df.show()

+--------------+-------+------------+-----+-----------+----------+--------------------+--------------------+--------------------+
|         title|address|        city|state|postal_code|     price|                fnfs|real_estate_provider|                 url|
+--------------+-------+------------+-----+-----------+----------+--------------------+--------------------+--------------------+
|Condo for sale|   null|  Somerville|   MA|      02145|  $342,000|2 bds, 1.0 ba ,70...|William Raveis R....|https://www.zillo...|
|Condo for sale|   null|      Boston|   MA|      02116|$1,700,000|2 bds, 2.0 ba ,12...|Century 21 North ...|https://www.zillo...|
|Condo for sale|   null|      Boston|   MA|      02118|  $336,500|1 bds, 1.0 ba ,10...|Maloney Propertie...|https://www.zillo...|
|House for sale|   null|      Boston|   MA|      02118|$9,950,000|4 bds, 7.0 ba ,68...|Campion & Company...|https://www.zillo...|
|Condo for sale|   null|      Boston|   MA|      02128|  $479,000|2 bds, 3.0 ba ,10...|Ber

Finding number of beds:

In [7]:
def all_beds(st):
    return ('0' if st.split()[0]=='None' else st.split()[0])

convertUDF_all_beds= udf(lambda z: all_beds(z),StringType()) 
spark.udf.register("convertUDF_all_beds",all_beds,StringType())
df.createOrReplaceTempView("zillow")
spark.sql("select convertUDF_all_beds(fnfs) as num_beds from zillow") \
     .show(truncate=False)

+--------+
|num_beds|
+--------+
|2       |
|2       |
|1       |
|4       |
|2       |
|3       |
|2       |
|2       |
|1       |
|2       |
|2       |
|0       |
|3       |
|2       |
|2       |
|1       |
|2       |
|3       |
|4       |
|2       |
+--------+
only showing top 20 rows



Finding number of baths:

In [8]:
def all_baths(st):
    return (float('0' if st.split()[2]=='None' else st.split()[2]))

convertUDF_all_baths= udf(lambda z: all_baths(z),StringType()) 
spark.udf.register("convertUDF_all_baths",all_baths,StringType())
df.createOrReplaceTempView("zillow")
spark.sql("select convertUDF_all_baths(fnfs) as num_baths from zillow") \
     .show(truncate=False)

+---------+
|num_baths|
+---------+
|1.0      |
|2.0      |
|1.0      |
|7.0      |
|3.0      |
|3.0      |
|1.0      |
|1.0      |
|1.0      |
|1.0      |
|2.0      |
|0.0      |
|2.0      |
|1.0      |
|1.0      |
|1.0      |
|2.0      |
|4.0      |
|3.0      |
|2.0      |
+---------+
only showing top 20 rows



Finding sqft:

In [9]:
def sqfts(st):
    return (float('0' if st.split()[4][1:len(st.split()[4])]=='None' else st.split()[4][1:len(st.split()[4])]))

convertUDF_sqfts= udf(lambda z: sqfts(z),StringType()) 
spark.udf.register("convertUDF_sqfts",sqfts,StringType())
df.createOrReplaceTempView("zillow")
spark.sql("select convertUDF_sqfts(fnfs) as num_sqfts from zillow") \
     .show(truncate=False)

+---------+
|num_sqfts|
+---------+
|705.0    |
|1228.0   |
|1000.0   |
|6836.0   |
|1000.0   |
|2313.0   |
|780.0    |
|856.0    |
|675.0    |
|511.0    |
|1099.0   |
|126.0    |
|1070.0   |
|624.0    |
|1165.0   |
|500.0    |
|932.0    |
|1680.0   |
|2043.0   |
|1200.0   |
+---------+
only showing top 20 rows



Extracting type:

In [10]:
def types(st): 
    return st.split()[0]

convertUDF_types= udf(lambda z: types(z),StringType()) 
spark.udf.register("convertUDF_types",types,StringType())
df.createOrReplaceTempView("zillow")
spark.sql("select convertUDF_types(title) as types from zillow") \
     .show(truncate=False)

+-----+
|types|
+-----+
|Condo|
|Condo|
|Condo|
|House|
|Condo|
|House|
|Condo|
|Condo|
|Condo|
|Condo|
|Condo|
|Condo|
|Condo|
|Condo|
|House|
|Condo|
|Condo|
|House|
|House|
|House|
+-----+
only showing top 20 rows



Offers:

In [11]:
def offers(st):    
    a=st.split()
    a = [w.replace(',', '') for w in a]
    for j in range(len(a)):
        if a[j]=="sale":
            return("sale")
            break
        elif a[j]=="rent":
            return("rent")
            
        elif a[j]=="sold":
            return("sold")
            break
            
        elif a[j]=="forclose":
            return("forclose")
            break
        else:
            return("sale")
            break
            
convertUDF_offers= udf(lambda z: offers(z),StringType()) 
spark.udf.register("convertUDF_offers",offers,StringType())
df.createOrReplaceTempView("zillow")
spark.sql("select convertUDF_offers(title) as offers from zillow") \
     .show(truncate=False)            

+------+
|offers|
+------+
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
|sale  |
+------+
only showing top 20 rows



Filter not sales:


In [12]:
spark.sql("select convertUDF_offers(title),price,city as offers from zillow where convertUDF_offers(title) like '%sale%'")\
.show(truncate=False)

+------------------------+----------+------------+
|convertUDF_offers(title)|price     |offers      |
+------------------------+----------+------------+
|sale                    |$342,000  |Somerville  |
|sale                    |$1,700,000|Boston      |
|sale                    |$336,500  |Boston      |
|sale                    |$9,950,000|Boston      |
|sale                    |$479,000  |Boston      |
|sale                    |$899,000  |East Boston |
|sale                    |$397,300  |Somerville  |
|sale                    |$619,900  |South Boston|
|sale                    |$850,000  |Boston      |
|sale                    |$649,900  |Boston      |
|sale                    |$625,000  |Boston      |
|sale                    |$80,000   |Somerville  |
|sale                    |$1,425,000|Boston      |
|sale                    |$199,000  |Boston      |
|sale                    |$1,200,000|Boston      |
|sale                    |$499,950  |South Boston|
|sale                    |$739,

In [13]:
def prices(st):
    st=st.replace(',', '')
    st=st.replace('+', '')
    st=st.replace('$', '') 
    return(int(st))

convertUDF_prices= udf(lambda z: prices(z),StringType()) 
spark.udf.register("convertUDF_prices",prices,StringType())
df.createOrReplaceTempView("zillow")
spark.sql("select convertUDF_prices(price) as prices from zillow") \
     .show(truncate=False)            

+-------+
|prices |
+-------+
|342000 |
|1700000|
|336500 |
|9950000|
|479000 |
|899000 |
|397300 |
|619900 |
|850000 |
|649900 |
|625000 |
|80000  |
|1425000|
|199000 |
|1200000|
|499950 |
|739000 |
|1119000|
|1699000|
|589000 |
+-------+
only showing top 20 rows



Filter out >10 number of beds:

In [14]:
spark.sql("select city,convertUDF_prices(price) as clear_price,convertUDF_all_beds(fnfs) as beds_num from zillow where convertUDF_all_beds(fnfs)<11 ") \
     .show(truncate=False)

+------------+-----------+--------+
|city        |clear_price|beds_num|
+------------+-----------+--------+
|Somerville  |342000     |2       |
|Boston      |1700000    |2       |
|Boston      |336500     |1       |
|Boston      |9950000    |4       |
|Boston      |479000     |2       |
|East Boston |899000     |3       |
|Somerville  |397300     |2       |
|South Boston|619900     |2       |
|Boston      |850000     |1       |
|Boston      |649900     |2       |
|Boston      |625000     |2       |
|Somerville  |80000      |0       |
|Boston      |1425000    |3       |
|Boston      |199000     |2       |
|Boston      |1200000    |2       |
|South Boston|499950     |1       |
|Charlestown |739000     |2       |
|Boston      |1119000    |3       |
|South Boston|1699000    |4       |
|Boston      |589000     |2       |
+------------+-----------+--------+
only showing top 20 rows



Price boundaries filtering:

In [15]:
spark.sql("select city,convertUDF_prices(price) as clear_price,convertUDF_all_beds(fnfs) as beds_num from zillow where convertUDF_prices(price) between 100000 and 20000000 ") \
     .show(truncate=False)

+------------+-----------+--------+
|city        |clear_price|beds_num|
+------------+-----------+--------+
|Somerville  |342000     |2       |
|Boston      |1700000    |2       |
|Boston      |336500     |1       |
|Boston      |9950000    |4       |
|Boston      |479000     |2       |
|East Boston |899000     |3       |
|Somerville  |397300     |2       |
|South Boston|619900     |2       |
|Boston      |850000     |1       |
|Boston      |649900     |2       |
|Boston      |625000     |2       |
|Boston      |1425000    |3       |
|Boston      |199000     |2       |
|Boston      |1200000    |2       |
|South Boston|499950     |1       |
|Charlestown |739000     |2       |
|Boston      |1119000    |3       |
|South Boston|1699000    |4       |
|Boston      |589000     |2       |
|Boston      |919000     |2       |
+------------+-----------+--------+
only showing top 20 rows



Filter out listings that are not houses:

In [16]:
spark.sql("select city,convertUDF_prices(price) as clear_price,title from zillow where title like '%house%' or title like '%House%'") \
     .show(truncate=False)

+------------+-----------+--------------+
|city        |clear_price|title         |
+------------+-----------+--------------+
|Boston      |9950000    |House for sale|
|East Boston |899000     |House for sale|
|Boston      |1200000    |House for sale|
|Boston      |1119000    |House for sale|
|South Boston|1699000    |House for sale|
|Boston      |589000     |House for sale|
|Boston      |9750000    |House for sale|
|Somerville  |2075000    |House for sale|
|Boston      |3200000    |House for sale|
|South Boston|1175000    |House for sale|
|South Boston|1250000    |House for sale|
|Boston      |9950000    |House for sale|
|East Boston |899000     |House for sale|
|Boston      |1200000    |House for sale|
|Boston      |1119000    |House for sale|
|South Boston|1699000    |House for sale|
|Boston      |9750000    |House for sale|
|Somerville  |2075000    |House for sale|
|Boston      |3200000    |House for sale|
|South Boston|1175000    |House for sale|
+------------+-----------+--------

Calculate average price per sqft for houses for sale grouping them by the number of bedrooms.:

In [17]:
spark.sql("select convertUDF_all_beds(fnfs) as Number_of_Beds , avg(convertUDF_prices(price)/convertUDF_sqfts(fnfs)) as average_price_per_sqft  from zillow where title like '%House for sale%'  group by convertUDF_all_beds(fnfs)") \
     .show(truncate=False)

+--------------+----------------------+
|Number_of_Beds|average_price_per_sqft|
+--------------+----------------------+
|7             |1126.0252348993286    |
|11            |433.6545589325427     |
|3             |678.9521125584432     |
|8             |1567.647058823529     |
|0             |1250.0                |
|5             |908.8325677804119     |
|6             |422.3111656297147     |
|9             |1108.1412183984849    |
|4             |909.1473996440552     |
|2             |716.0381965996941     |
+--------------+----------------------+

