
# PySpark Assignment

## RealEstate Housing Data

1. Extract: Load the data
 - Read data all csv as txt as rdd
2. Transform: Exploratory data analysis using rdd
 - Unique records count
 - Extract full address from the column url*
 - from http://www.zillow.com/homes/for_sale//homedetails/V-l-Buell-Newstead-NY10001/2089629334_zpid/
 - to V-l-Buell-Newstead-NY-10001
 - Replace NA by zero in all numerical columns
 - concat - bedrooms*, bathrooms* as bed_bath_rooms* 3b2bh
 - GroupBy zip,bed_bath_rooms* and avg, max, min
3. Load: Save analysis report
 - GroupBy zip,bed_bath_rooms* and avg, max, min, save as files


In [91]:
from random import random
import os
import pyspark
from pyspark.sql import SparkSession

In [92]:
spark = SparkSession.builder.master("local").\
        appName("SparkApplication").\
        config("spark.driver.bindAddress","localhost").\
        config("spark.ui.port","4041").\
        getOrCreate()

In [93]:
sc = spark.sparkContext

### To read multiple CSV files in Spark into single RDD.

In [94]:
data=sc.textFile("2018-05-12_154616.csv,2018-05-12_155104.csv,2018-05-12_155435.csv")

In [95]:
# Filter out header row
header=data.first()

In [96]:
print(header)

address,city,state,zip,price,sqft,bedrooms,bathrooms,days_on_zillow,sale_type,url


In [97]:
# remove header
step1= data.filter(lambda line: line !=header)

In [98]:
step1.take(2)

['V/l Buell,Newstead,NY,10001,49000,NA,NA,NA,2,Lot/Land For Sale,http://www.zillow.com/homes/for_sale//homedetails/V-l-Buell-Newstead-NY-10001/2089629334_zpid/',
 '263 9th Ave # PHD,New York,NY,10001,4495000,2250,3,2,1,Condo For Sale,http://www.zillow.com/homes/for_sale//homedetails/263-9th-Ave-PHD-New-York-NY-10001/2103425273_zpid/']

### Total records count

In [99]:
step1.count()

1117

### Total unique records count

In [100]:
step1.distinct().count()

1064

In [101]:
### Filtering out duplicate records

In [102]:
step2=step1.distinct()

In [103]:
step2.count()

1064

### Extract full address from url

In [104]:
#Splitting each line by commma to form array
step3= step2.map(lambda line: line.split(","))

In [105]:
step3.first()

['252 7th Ave APT 4L',
 'NEW YORK',
 'NY',
 '10001',
 '1529000',
 '980',
 '0',
 '1',
 '2',
 'Condo For Sale',
 'http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/']

In [106]:
step3.take(2)

[['252 7th Ave APT 4L',
  'NEW YORK',
  'NY',
  '10001',
  '1529000',
  '980',
  '0',
  '1',
  '2',
  'Condo For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/'],
 ['133 W 28th St APT 6-C',
  'New York',
  'NY',
  '10001',
  '1550000',
  '1300',
  '2',
  '2',
  'NA',
  'Co-op For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/133-W-28th-St-APT-6-C-New-York-NY-10001/79496201_zpid/']]

In [107]:
# Module/self designed function for extracting address from url
def extract_address(url):
    after_split=url.split("/")
    return(after_split[-3])

In [108]:
extract_address("http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/")

'252-7th-Ave-APT-4L-New-York-NY-10001'

In [109]:
step4=step3.map(lambda x: (x[0],x[1],x[2],x[3],x[4],x[5],x[6],x[7],x[8],x[9],x[10],extract_address(x[-1])))

In [110]:
step4.take(2)

[('252 7th Ave APT 4L',
  'NEW YORK',
  'NY',
  '10001',
  '1529000',
  '980',
  '0',
  '1',
  '2',
  'Condo For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/',
  '252-7th-Ave-APT-4L-New-York-NY-10001'),
 ('133 W 28th St APT 6-C',
  'New York',
  'NY',
  '10001',
  '1550000',
  '1300',
  '2',
  '2',
  'NA',
  'Co-op For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/133-W-28th-St-APT-6-C-New-York-NY-10001/79496201_zpid/',
  '133-W-28th-St-APT-6-C-New-York-NY-10001')]

In [111]:
step4.take(2)

[('252 7th Ave APT 4L',
  'NEW YORK',
  'NY',
  '10001',
  '1529000',
  '980',
  '0',
  '1',
  '2',
  'Condo For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/',
  '252-7th-Ave-APT-4L-New-York-NY-10001'),
 ('133 W 28th St APT 6-C',
  'New York',
  'NY',
  '10001',
  '1550000',
  '1300',
  '2',
  '2',
  'NA',
  'Co-op For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/133-W-28th-St-APT-6-C-New-York-NY-10001/79496201_zpid/',
  '133-W-28th-St-APT-6-C-New-York-NY-10001')]

### Replacing NA by 0 in all numerical columns

In [112]:
# function to convert numerical columns from string to int and replace NA values by 0 for a list of columns
def replace_na_0(column_val):
    try:
        return int(float(column_val))
    except:
        return 0

In [113]:
num_columns=[3,4,5,6,7,8]

In [114]:
step5=step4.map(lambda x: (x[0],x[1],x[2],replace_na_0(x[3]),replace_na_0(x[4]),replace_na_0(x[5]),
                           replace_na_0(x[6]),replace_na_0(x[7]),replace_na_0(x[8]),x[9],x[10],x[11]))

In [115]:
step4.take(2)

[('252 7th Ave APT 4L',
  'NEW YORK',
  'NY',
  '10001',
  '1529000',
  '980',
  '0',
  '1',
  '2',
  'Condo For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/',
  '252-7th-Ave-APT-4L-New-York-NY-10001'),
 ('133 W 28th St APT 6-C',
  'New York',
  'NY',
  '10001',
  '1550000',
  '1300',
  '2',
  '2',
  'NA',
  'Co-op For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/133-W-28th-St-APT-6-C-New-York-NY-10001/79496201_zpid/',
  '133-W-28th-St-APT-6-C-New-York-NY-10001')]

* In the above output we can see the second record has NA values for a numerical column.
* This has been replaced in step5 and this is reflected in the following output

In [116]:
step5.take(2)

[('252 7th Ave APT 4L',
  'NEW YORK',
  'NY',
  10001,
  1529000,
  980,
  0,
  1,
  2,
  'Condo For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/',
  '252-7th-Ave-APT-4L-New-York-NY-10001'),
 ('133 W 28th St APT 6-C',
  'New York',
  'NY',
  10001,
  1550000,
  1300,
  2,
  2,
  0,
  'Co-op For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/133-W-28th-St-APT-6-C-New-York-NY-10001/79496201_zpid/',
  '133-W-28th-St-APT-6-C-New-York-NY-10001')]

### Concat - bedrooms*, bathrooms* as bed_bath_rooms* 3b2bh

In [117]:
# function to concat values in 6 and 7 th columns to give new column bed_bath_rooms
def bed_n_bath_combined(val1,val2):
    return str(val1)+"b"+str(val2)+"bh"

In [118]:
step6=step5.map(lambda x: (x[0],x[1],x[2],x[3],x[4],x[5],bed_n_bath_combined(x[6],x[7]),x[8],x[9],x[10],x[11]))

In [119]:
step5.take(2)

[('252 7th Ave APT 4L',
  'NEW YORK',
  'NY',
  10001,
  1529000,
  980,
  0,
  1,
  2,
  'Condo For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/',
  '252-7th-Ave-APT-4L-New-York-NY-10001'),
 ('133 W 28th St APT 6-C',
  'New York',
  'NY',
  10001,
  1550000,
  1300,
  2,
  2,
  0,
  'Co-op For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/133-W-28th-St-APT-6-C-New-York-NY-10001/79496201_zpid/',
  '133-W-28th-St-APT-6-C-New-York-NY-10001')]

In [120]:
step6.take(2)

[('252 7th Ave APT 4L',
  'NEW YORK',
  'NY',
  10001,
  1529000,
  980,
  '0b1bh',
  2,
  'Condo For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/252-7th-Ave-APT-4L-New-York-NY-10001/55501383_zpid/',
  '252-7th-Ave-APT-4L-New-York-NY-10001'),
 ('133 W 28th St APT 6-C',
  'New York',
  'NY',
  10001,
  1550000,
  1300,
  '2b2bh',
  0,
  'Co-op For Sale',
  'http://www.zillow.com/homes/for_sale//homedetails/133-W-28th-St-APT-6-C-New-York-NY-10001/79496201_zpid/',
  '133-W-28th-St-APT-6-C-New-York-NY-10001')]

###  GroupBy zip,bed_bath_rooms* and avg, max, min


In [121]:
# Creating an rdd that contains only the zipid, bed_bath_rooms and price columns
step7 = step6.map(lambda x: (x[3],x[6],x[4]))

In [122]:
step7.take(10)

[(10001, '0b1bh', 1529000),
 (10001, '2b2bh', 1550000),
 (10001, '2b2bh', 3300000),
 (10001, '3b3bh', 6495000),
 (10001, '2b3bh', 4350000),
 (10001, '2b2bh', 2700000),
 (10001, '0b1bh', 410000),
 (10001, '3b3bh', 4450000),
 (10001, '2b2bh', 1995000),
 (10001, '5b5bh', 6995000)]

In [123]:
# Grouping by zip and then bed_bath_rooms
step8 = step7.groupBy(lambda x: (x[0],x[1]))

In [124]:
step8.take(10)

[((10001, '0b1bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861fc10>),
 ((10001, '0b0bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861f7f0>),
 ((10003, '2b2bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861f880>),
 ((10003, '2b4bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861ff70>),
 ((10003, '8b10bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861feb0>),
 ((10002, '1b1bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861f250>),
 ((10002, '3b2bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861f310>),
 ((10002, '2b3bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861ff40>),
 ((10002, '0b0bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861fc40>),
 ((10004, '4b5bh'), <pyspark.resultiterable.ResultIterable at 0x1cf6861fd60>)]

In [125]:
step8.mapValues(list).take(2)

[((10001, '0b1bh'),
  [(10001, '0b1bh', 1529000),
   (10001, '0b1bh', 410000),
   (10001, '0b1bh', 435000),
   (10001, '0b1bh', 625000),
   (10001, '0b1bh', 435000),
   (10001, '0b1bh', 449500),
   (10001, '0b1bh', 1295000)]),
 ((10001, '0b0bh'),
  [(10001, '0b0bh', 0), (10001, '0b0bh', 49000), (10001, '0b0bh', 0)])]

In [126]:
# Aggregate min
step9= step8.map(lambda x: min(x[1]))

In [127]:
step9.take(10)

[(10001, '0b1bh', 410000),
 (10001, '0b0bh', 0),
 (10003, '2b2bh', 1395000),
 (10003, '2b4bh', 7000000),
 (10003, '8b10bh', 17800000),
 (10002, '1b1bh', 400000),
 (10002, '3b2bh', 1100000),
 (10002, '2b3bh', 1850000),
 (10002, '0b0bh', 0),
 (10004, '4b5bh', 2500000)]

In [128]:
# aggregating by max value
step10= step8.map(lambda x: max(x[1]))

In [129]:
step10.take(10)

[(10001, '0b1bh', 1529000),
 (10001, '0b0bh', 49000),
 (10003, '2b2bh', 7350000),
 (10003, '2b4bh', 23000000),
 (10003, '8b10bh', 17800000),
 (10002, '1b1bh', 2750000),
 (10002, '3b2bh', 3527000),
 (10002, '2b3bh', 9995000),
 (10002, '0b0bh', 9750000),
 (10004, '4b5bh', 10995000)]

In [130]:
# function to find mean

def mean_val(x):
    sums=0
    l=0
    for i in x:
        sums=(i[2])+sums
        l=l+1
    return (round(sums/l,2))

In [131]:
# aggregating by mean
step11= step8.map(lambda x: (x[0][0],x[0][1], mean_val(x[1])))

In [132]:
step11.take(10)

[(10001, '0b1bh', 739785.71),
 (10001, '0b0bh', 16333.33),
 (10003, '2b2bh', 2505388.88),
 (10003, '2b4bh', 13300000.0),
 (10003, '8b10bh', 17800000.0),
 (10002, '1b1bh', 1093295.45),
 (10002, '3b2bh', 2388217.14),
 (10002, '2b3bh', 3885400.0),
 (10002, '0b0bh', 2295454.55),
 (10004, '4b5bh', 6535750.0)]

In [133]:
# aggregate all 3 (min,max and average) in one step
step12= step8.map(lambda x: (x[0][0],x[0][1],mean_val(x[1]),min(x[1])[2],max(x[1])[2]))

In [134]:
step12.take(10)

[(10001, '0b1bh', 739785.71, 410000, 1529000),
 (10001, '0b0bh', 16333.33, 0, 49000),
 (10003, '2b2bh', 2505388.88, 1395000, 7350000),
 (10003, '2b4bh', 13300000.0, 7000000, 23000000),
 (10003, '8b10bh', 17800000.0, 17800000, 17800000),
 (10002, '1b1bh', 1093295.45, 400000, 2750000),
 (10002, '3b2bh', 2388217.14, 1100000, 3527000),
 (10002, '2b3bh', 3885400.0, 1850000, 9995000),
 (10002, '0b0bh', 2295454.55, 0, 9750000),
 (10004, '4b5bh', 6535750.0, 2500000, 10995000)]

### Saving outputs as csv files

In [135]:
# Header 
col1=["zip_code","bed_bath_rooms","min_price"]
col2=["zip_code","bed_bath_rooms","max_price"]
col3=["zip_code","bed_bath_rooms","avg_price"]
col=["zip_code","bed_bath_rooms","avg_price","min_price","max_price"]

# conversion to Data Frame
f1=step9.toDF(col1)
f2=step10.toDF(col2)
f3=step11.toDF(col3)
f=step12.toDF(col)

In [136]:
f1.show(10)

+--------+--------------+---------+
|zip_code|bed_bath_rooms|min_price|
+--------+--------------+---------+
|   10001|         0b1bh|   410000|
|   10001|         0b0bh|        0|
|   10003|         2b2bh|  1395000|
|   10003|         2b4bh|  7000000|
|   10003|        8b10bh| 17800000|
|   10002|         1b1bh|   400000|
|   10002|         3b2bh|  1100000|
|   10002|         2b3bh|  1850000|
|   10002|         0b0bh|        0|
|   10004|         4b5bh|  2500000|
+--------+--------------+---------+
only showing top 10 rows



In [137]:
f2.show(10)

+--------+--------------+---------+
|zip_code|bed_bath_rooms|max_price|
+--------+--------------+---------+
|   10001|         0b1bh|  1529000|
|   10001|         0b0bh|    49000|
|   10003|         2b2bh|  7350000|
|   10003|         2b4bh| 23000000|
|   10003|        8b10bh| 17800000|
|   10002|         1b1bh|  2750000|
|   10002|         3b2bh|  3527000|
|   10002|         2b3bh|  9995000|
|   10002|         0b0bh|  9750000|
|   10004|         4b5bh| 10995000|
+--------+--------------+---------+
only showing top 10 rows



In [138]:
f3.show(10)

+--------+--------------+----------+
|zip_code|bed_bath_rooms| avg_price|
+--------+--------------+----------+
|   10001|         0b1bh| 739785.71|
|   10001|         0b0bh|  16333.33|
|   10003|         2b2bh|2505388.88|
|   10003|         2b4bh|    1.33E7|
|   10003|        8b10bh|    1.78E7|
|   10002|         1b1bh|1093295.45|
|   10002|         3b2bh|2388217.14|
|   10002|         2b3bh| 3885400.0|
|   10002|         0b0bh|2295454.55|
|   10004|         4b5bh| 6535750.0|
+--------+--------------+----------+
only showing top 10 rows



In [139]:
f.show(10)

+--------+--------------+----------+---------+---------+
|zip_code|bed_bath_rooms| avg_price|min_price|max_price|
+--------+--------------+----------+---------+---------+
|   10001|         0b1bh| 739785.71|   410000|  1529000|
|   10001|         0b0bh|  16333.33|        0|    49000|
|   10003|         2b2bh|2505388.88|  1395000|  7350000|
|   10003|         2b4bh|    1.33E7|  7000000| 23000000|
|   10003|        8b10bh|    1.78E7| 17800000| 17800000|
|   10002|         1b1bh|1093295.45|   400000|  2750000|
|   10002|         3b2bh|2388217.14|  1100000|  3527000|
|   10002|         2b3bh| 3885400.0|  1850000|  9995000|
|   10002|         0b0bh|2295454.55|        0|  9750000|
|   10004|         4b5bh| 6535750.0|  2500000| 10995000|
+--------+--------------+----------+---------+---------+
only showing top 10 rows



In [50]:
f1.toPandas().to_csv("min.csv")
f2.toPandas().to_csv("max.csv")
f3.toPandas().to_csv("average.csv")
f.toPandas().to_csv("combined.csv")

### Submitted By:
* **Lakshmi V Aji         (20BDA09)**
* **Josmi Agnes Jose      (20BDA27)**
* **Aishwarya Nair M J    (20BDA42)**
* **Mariya Biju           (20BDA61)**
    