In [35]:
import os, sys, pprint

path_mod = os.getcwd().replace("\\src\\notebooks", "")
sys.path.insert(0,path_mod)

from src.connections import connect

context = connect.ps_connection("real_estate", "ERROR", 3)
sc = context.sc

In [None]:
'''
Create a Spark program to read the house data from in/RealEstate.csv,
output the average price for houses with different number of bedrooms.

The houses dataset contains a collection of recent real estate listings in San Luis Obispo county and
around it. 

The dataset contains the following fields:
1. MLS: Multiple listing service number for the house (unique ID).
2. Location: city/town where the house is located. Most locations are in San Luis Obispo county and
northern Santa Barbara county (Santa Maria­Orcutt, Lompoc, Guadelupe, Los Alamos), but there
some out of area locations as well.
3. Price: the most recent listing price of the house (in dollars).
4. Bedrooms: number of bedrooms.
5. Bathrooms: number of bathrooms.
6. Size: size of the house in square feet.
7. Price/SQ.ft: price of the house per square foot.
8. Status: type of sale. Thee types are represented in the dataset: Short Sale, Foreclosure and Regular.

Each field is comma separated.

Sample output:

   (3, 325000)
   (1, 266356)
   (2, 325000)
   ...

3, 1 and 2 mean the number of bedrooms. 325000 means the average price of houses with 3 bedrooms is 325000.

'''

In [3]:
file_path = f"{os.environ['PROJECT_HOME']}/python-spark-tutorial/in/RealEstate.csv"

In [21]:
sum_bedrooms = (
    sc.textFile(file_path)
    .filter(lambda line: "Bedrooms" not in line)
    .map(lambda line: (line.split(",")[3], float(line.split(",")[2])))
    .reduceByKey(lambda x, y: x + y)
)

count_bedrooms = (
    sc.textFile(file_path)
    .filter(lambda line: "Bedrooms" not in line)
    .map(lambda line: (line.split(",")[3], 1))
    .reduceByKey(lambda x, y: x + y)
)

sum_dict = dict(sum_bedrooms.collect())
count_dict = dict(count_bedrooms.collect())

for bedroom, total in sum_dict.items():
    print(f"{bedroom} : {total / count_dict[bedroom]}")

4 : 483475.6497175141
1 : 169981.81818181818
0 : 293450.0
10 : 699000.0
3 : 359062.20649651974
2 : 266356.3739837398
7 : 325000.0
5 : 657858.0645161291
6 : 603225.0


In [32]:
avg_bedrooms = (
    sc.textFile(file_path)
    .filter(lambda line: "Bedrooms" not in line)
    .map(lambda line: (line.split(",")[3], (1, float(line.split(",")[2]))))
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
    .mapValues(lambda avgTuple: avgTuple[1] / avgTuple[0])
)

avg_dict = dict(avg_bedrooms.collect())
avg_dict

{'4': 483475.6497175141,
 '1': 169981.81818181818,
 '0': 293450.0,
 '10': 699000.0,
 '3': 359062.20649651974,
 '2': 266356.3739837398,
 '7': 325000.0,
 '5': 657858.0645161291,
 '6': 603225.0}

In [37]:
from avg_count import AvgCount

avg_bedrooms = (
    sc.textFile(file_path)
    .filter(lambda line: "Bedrooms" not in line)
    .map(lambda line: (line.split(",")[3], AvgCount(1, float(line.split(",")[2]))))
    .reduceByKey(lambda x, y: AvgCount(x.count + y.count, x.total + y.total))
    .mapValues(lambda avg: avg.total / avg.count)
)

avg_dict = dict(avg_bedrooms.collect())
avg_dict

{'4': 483475.6497175141,
 '1': 169981.81818181818,
 '0': 293450.0,
 '10': 699000.0,
 '3': 359062.20649651974,
 '2': 266356.3739837398,
 '7': 325000.0,
 '5': 657858.0645161291,
 '6': 603225.0}

In [40]:
from avg_count import AvgCount

avg_bedrooms = (
    sc.textFile(file_path)
    .filter(lambda line: "Bedrooms" not in line)
    .map(lambda line: (int(line.split(",")[3]), AvgCount(1, float(line.split(",")[2]))))
    .reduceByKey(lambda x, y: AvgCount(x.count + y.count, x.total + y.total))
    .mapValues(lambda avg: avg.total / avg.count)
    .sortByKey(ascending=False)
)

avg_dict = dict(avg_bedrooms.collect())
avg_dict

{10: 699000.0,
 7: 325000.0,
 6: 603225.0,
 5: 657858.0645161291,
 4: 483475.6497175141,
 3: 359062.20649651974,
 2: 266356.3739837398,
 1: 169981.81818181818,
 0: 293450.0}