In [81]:
from pyspark.sql import SparkSession

spark = SparkSession \
		.builder \
		.master(f"local[*]") \
		.appName("myApp") \
		.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
		.getOrCreate()

leisure = spark.read.format("mongo")\
    .option('uri', f"mongodb://10.4.41.93:27017/persistent.opendatabcn-leisure") \
    .load().cache()

income = spark.read.format("mongo")\
    .option('uri', f"mongodb://10.4.41.93:27017/persistent.opendatabcn-income") \
    .load().cache()

idealista = spark.read.format("mongo")\
    .option('uri', f"mongodb://10.4.41.93:27017/persistent.idealista") \
    .load().cache()

lookup = spark.read.format("mongo")\
    .option('uri', f"mongodb://10.4.41.93:27017/persistent.lookup_tables") \
    .load().cache()

In [84]:
def map_joined_cases(row):
    if row[1][1] is not None:
        return (row[1][1][0], (row[1][0], row[1][1][1]))
    else:
        return (row[0],(row[1][0], 'unknown'))

def flatten(row):
    res=[]
    lst = list(row)
    res.append(lst[1][0][0][0])
    for elem in lst[1][0][0][1]:
        print(elem)
        res.append(elem)
    res.append(lst[1][0][1])
    if lst[1][1] is not None:
        res.append(lst[1][1][0])
        res.append(lst[1][1][1])
    else:
        res.append("unknown")
        res.append("unknown")
    return tuple(res)

In [85]:
lookupRDD = lookup.rdd.map(lambda t: (t['neighborhood'],(t['neighborhood_n_reconciled'], t['neighborhood_id']))).cache() \
     # Cache it as it will be used in multiple joins later and is very small.
     # Decide not to remove duplicates as only 127 to 111
    
leisureRDD = leisure.rdd.map(lambda t: ((t['addresses_neighborhood_name'],t['secondary_filters_name']), 1)) \
    .reduceByKey(lambda x,y: x+y) \
    .coalesce(2) \
    .map(lambda t: (t[0][0], (t[0][1], t[1]))) \
    .groupByKey() \
    .mapValues(dict)
    # map and reduceByKey -> count for every combination of (neigbourhood, leisure) how many (fe. (Raval, 'Biblioteques'), 20)
    # The coalesce is very artificial in this case, but is used a 'proof of concept'. Further explanation can be found in the report.  
    # GroupBy and MapValues -> Add them in dictionary per neighboourhood and keep it grouped by this neighborhood for later joins

incomeRDD = income.rdd.map(lambda t: (t['Nom_Barri'], (t['Any'], t['Índex RFD Barcelona = 100']))) \
    .groupByKey() \
    .mapValues(dict) \
    # Put income in similar format as leisure with income per year in one dictionary (fe. {2007: '64.7',2008: '62.6',2009: '62.0',...})
    # Keep grouped by neighborhood key

income_join_leisure = incomeRDD \
      .join(leisureRDD) \
      .join(lookupRDD) \
      .map(lambda t: (t[1][-1][0], (t[1][0])))
      # All leisure and lookup joins can happen in one stage because all grouped on same keys
      # Not left outer join needed with lookup because count doesn't change -> every key also one key in lookup 


idealista_header = idealista.schema
final = idealista.repartition(4).rdd.map(lambda t: (t['propertyCode'], (t['neighborhood'], t[1:]))) \
    .reduceByKey(lambda x,y: (x))  \
    .map(lambda t: (t[1][0], (t[0], t[1][1]))) \
    .filter(lambda t: t[0] is not None) \
    .leftOuterJoin(lookupRDD) \
    .map(lambda t: map_joined_cases(t)) \
    .leftOuterJoin(income_join_leisure) \
    .map(lambda t: flatten(t)).cache()
    # Remove duplicates on propertyCode (21073 -> 10421)
    # Then use neighboorhood on key (only keep when neighborhood defined: 10421 -> 6718) and join on lookup which is cached
    # If all not joined on lookup use the name for the neighboorhood defined in the dataset themselves. Remove the None in that case.
    # Now join on leisure and income and flatten everything such that schema can be defined on the resulting rdd
    # + if not in income join leisure, set as "unknown".
    # Cache because kpi's + Model training will use this results for multiple purposes

In [86]:
## Put it in a Schema
from pyspark.sql.types import StructType,StructField, StringType, MapType
final_schema = [StructField('propertyCodeId',StringType(),True)]
for idx, types in enumerate(idealista_header):
    if idx == 0:
        pass
    else:
        final_schema.append(types)
final_schema.append(StructField('NeighborhoodId',StringType(),True))
final_schema.append(StructField('Income',StringType(),True))
final_schema.append(StructField('Leisure',StringType(),True))
final_schema=StructType(final_schema)
df = spark.createDataFrame(data = final.collect(), schema = final_schema)

In [None]:
## To KPI's

## KPI I: Avg. price per area for amount of theatres / parcs / libraries in the neighboorhood
import re
import matplotlib.pyplot as plt
def extract_leisure(row, leisure_type):
    try:
        r = re.compile(".*{}".format(leisure_type))
        splt = row['Leisure'].split('=')
        amount = list(filter(r.match, splt))[0][0:2]
        if amount[1] == ',':
            return leisure_type + ': ' + amount[0]
        elif amount[0] == '{':
            return leisure_type + ': 0'
        return leisure_type + ': ' + amount
    except:
        return leisure_type + ': 0'

theatres = df.rdd.map(lambda t: (extract_leisure(t, 'Teatres'),(float(t['priceByArea']),1))) \
    .reduceByKey(lambda x,y: ((x[0]+y[0]),(x[1]+y[1]))) \
    .mapValues(lambda t: t[0]/t[1])

parcs = df.rdd.map(lambda t: (extract_leisure(t, 'Parcs i jardins'),(float(t['priceByArea']),1))) \
    .reduceByKey(lambda x,y: ((x[0]+y[0]),(x[1]+y[1]))) \
    .mapValues(lambda t: t[0]/t[1])

libraries = df.rdd.map(lambda t: (extract_leisure(t, 'Biblioteques'),(float(t['priceByArea']),1))) \
    .reduceByKey(lambda x,y: ((x[0]+y[0]),(x[1]+y[1]))) \
    .mapValues(lambda t: t[0]/t[1])

kpi1 = theatres.union(parcs).union(libraries).collect()
kpi1.sort()

fields = ['Leisure: Amount', 'Avg. Price / Area']
import csv
with open('kpi1.csv', 'w') as f:
    write = csv.writer(f)
    write.writerow(fields)
    write.writerows(kpi1)

In [22]:
## KPI II: Monthly evolution of the listings Avg. price per area (based on scrape date)

def extract_mmyy(row):
    yy,mm = row['scrap_date'].split('_')[0:2]
    return yy,mm
    
kpi2RDD = df.rdd.map(lambda t: (extract_mmyy(t),(float(t['priceByArea']),1))) \
    .reduceByKey(lambda x,y: ((x[0]+y[0]),(x[1]+y[1]))) \
    .mapValues(lambda t: t[0]/t[1]).cache()

kpi2 = kpi2RDD.collect()
kpi2.sort()

fields = ['scraped: (year, month) ', 'Avg. Price / Area']
import csv
with open('kpi2.csv', 'w') as f:
    write = csv.writer(f)
    write.writerow(fields)
    write.writerows(kpi2)

In [25]:
## KPI III: Amount of every property type per neighboorhood

kpi3RDD = df.rdd.map(lambda t: ((t['neighborhood'], t['propertyType']),1)) \
    .reduceByKey(lambda x,y: x+y)

kpi3 = kpi3RDD.collect()

kpi3.sort()

fields = ['(neighboorhood, propertyType)', '#listings']
import csv
with open('kpi3.csv', 'w') as f:
    write = csv.writer(f)
    write.writerow(fields)
    write.writerows(kpi3)

In [87]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer,IndexToString
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.regression import GeneralizedLinearRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

In [88]:
train,test=df.randomSplit([0.8,0.2])

indexer = StringIndexer(inputCols=["neighborhood", "municipality","propertyType"], 
                        outputCols=["neighborhoodNum", "municipalityNum","propertyTypeNum",],
                        handleInvalid='keep')

ohe = OneHotEncoder(inputCols=["neighborhoodNum","municipalityNum"]
                        , outputCols=["neighborhoodOhe","municipalityOhe"],
                        handleInvalid='keep')

vec_assembler = VectorAssembler(inputCols=['bathrooms', 'has360', 'has3DTour', 'hasLift', 'hasPlan', 'hasStaging', 
        'hasVideo', 'neighborhoodOhe', 'municipalityOhe','newDevelopment', 'newDevelopmentFinished', 'numPhotos','rooms',
        'showAddress', 'size', 'topNewDevelopment', 'price'], outputCol='features', handleInvalid='keep')

model=RandomForestClassifier(featuresCol='features', labelCol='propertyTypeNum')

pipeline = Pipeline(stages=[indexer,ohe,vec_assembler,model])
pipelineModel = pipeline.fit(train)

In [None]:
results = pipelineModel.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="propertyTypeNum", predictionCol="prediction",metricName='weightedRecall')
recall = evaluator.evaluate(results)
evaluator = MulticlassClassificationEvaluator(labelCol="propertyTypeNum", predictionCol="prediction", metricName='accuracy')
accuracy = evaluator.evaluate(results)
evaluator = MulticlassClassificationEvaluator(labelCol="propertyTypeNum", predictionCol="prediction", metricName='f1')
f1 = evaluator.evaluate(results)
print("weighted recall = %s" % (recall))
print("accuracy = %s" % (accuracy))
print("f1 = %s" % (f1))

pipelineModel.write().overwrite().save('pipeline.model')

In [None]:
indexer2 = StringIndexer(inputCols=["NeighborhoodId"], 
                        outputCols=["NeighborhoodNum"],
                        handleInvalid='keep')

ohe2 = OneHotEncoder(inputCols=["NeighborhoodNum"]
                        , outputCols=["NeighborhoodOhe"],
                        handleInvalid='keep')

vec_assembler2 = VectorAssembler(inputCols=['NeighborhoodOhe'], outputCol='feature', handleInvalid='keep')

model2=GeneralizedLinearRegression(featuresCol='feature', labelCol='price')

pipeline2 = Pipeline(stages=[indexer2,ohe2,vec_assembler2,model2])

pipelineModel2 = pipeline2.fit(train)

results2 = pipelineModel2.transform(test)
evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction",metricName='rmse')
rmse = evaluator.evaluate(results2)
print("rmse = %s" % (rmse))

pipelineModel2.write().overwrite().save('pipeline2.model')