# Spark to Compute KPIs

### Load libraries and set up Spark:

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
import os
import sys
import csv
import time

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

conf = SparkConf().setAppName("SparkTraining").setMaster("local[*]")
ctx = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("myApp") \
    .getOrCreate()

### Establish the connection. If this doesn't work, comment this and use local files (snippit below)

In [None]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("myApp") \
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .getOrCreate()

opendata_leisureRDD = spark.read.format("mongo") \
    .option('uri', f"mongodb://10.4.41.97:27017/persistent.opendatabcn-leisure") \
    .load() \
    .rdd

opendata_incomeRDD = spark.read.format("mongo") \
    .option('uri', f"mongodb://10.4.41.97:27017/persistent.opendatabcn-income") \
    .load() \
    .rdd

idealista = spark.read.format("mongo") \
    .option('uri', f"mongodb://10.4.41.97:27017/persistent.idealista") \
    .load() \

lookupRDD = spark.read.format("mongo") \
    .option('uri', f"mongodb://10.4.41.97:27017/persistent.lookup_tables") \
    .load() \
    .rdd

Use the local files if the connection doesn't work


In [None]:
idealista = ctx.read.json('./json/idealista.json')
lookupRDD = ctx.read.json('./json/lookup.json').rdd
opendata_incomeRDD = ctx.read.json('./json/opendata_income.json').rdd
opendata_leisureRDD = ctx.read.json('./json/opendata_leisure.json').rdd

Save the schema of idealista to be later used when building a df out of the transformed RDD

In [None]:
idealistaSchema = idealista.schema

### Auxiliary functions

In [None]:
def flatten(t):
    """
    Flattens a nested tuple containing property data, idealista data, and other features into a single tuple.
    """
    val = [t[1][0][0], t[0][0], t[0][1]]
    for v in t[1][0][1]:
        val.append(v)
    return tuple(val + [t[1][1][0], t[1][1][1]])

def partition_hash_neighbourhood_id(id):
    """
    Returns a partition hash for a given neighborhood ID, based on its first character.
    """
    val = ord(id[:1])  # Convert first character to its ASCII value
    return val % 2

def x_later_than_y(x_date, y_date):
    """
    Checks if x_date is later than y_date. Dates are in the format "YYYY_MM_DD".
    """
    xy, xm, xd = x_date.split("_")
    yy, ym, yd = y_date.split("_")
    if yy > xy: return False
    elif ym > xm: return False
    elif yd > xd: return False
    else: return True

def reconcile_idealista(x, y):
    """
    Returns the latest record between x and y based on their date fields.
    """
    if x_later_than_y(x[1], y[1]):
        return x
    else:
        return y

def get_partition_id(id, n=2):
    """
    Returns a partition ID for a given ID by hashing it and taking modulo n.
    """
    val = hash(id)
    return val % n

def merge_income_dict_count(x, y):
    """
    Merges two dictionaries, summing the values for the same keys.
    """
    out = {}
    xk = list(x.keys())
    yk = list(y.keys())
    for key in set(xk + yk):  # Union of keys from both dictionaries
        if key in xk and key in yk:
            out[key] = x[key] + y[key]
        elif key in yk:
            out[key] = y[key]
        elif key in xk:
            out[key] = x[key]
    return out

## Define the RDDs

`lookupRDD`

- Selects the neighborhood, the reconciled neighborhood name, and the numerical identifier.
- Removes all duplicates from the lookup table.

In [None]:
luRDD = lookupRDD.map(lambda t: (t['neighborhood'], (t['neighborhood_id'], t['neighborhood_n_reconciled'])))\
    .reduceByKey(lambda x, y: x)

`openIRDD`

- Selects the name of the neighborhood, the year, and the measured income level.
- Filters out all neighborhoods labeled "No consta" as they do not contain useful data.
- Partitions the data by calling `get_partition_id` on the neighborhood name.
- Joins the data with the `lookupRDD` to obtain the neighborhood ID.
- Transforms the data so that the neighborhood ID and reconciled name become the key of the tuple, while retaining the year and income as value entries in a dictionary.
- Merges all dictionaries corresponding to each neighborhood into a single dictionary.

In [None]:
openIRDD = opendata_incomeRDD.map(lambda t: (t['Nom_Barri'], (t['Any'], t['Índex RFD Barcelona = 100'])))\
    .filter(lambda t: t[0] != "No consta")\
    .partitionBy(2, lambda k: get_partition_id(k[0]))\
    .join(luRDD)\
    .map(lambda t: (t[1][1], {t[1][0][0]: t[1][0][1]}))\
    .reduceByKey(lambda x, y: {**x, **y})\
    .partitionBy(2, lambda k: get_partition_id(k[0]))

`openLRDD`

- Ignores the 'created' field as it does not accurately reflect when the amenity was actually built.
- Selects the neighborhood and the type of amenity as a key, incrementing a count for each amenity.
- Filters out all neighborhoods with empty names.
- Immediately joins the data with the `lookupRDD` to simplify subsequent operations.
- Excludes neighborhoods in the `lookupRDD` that are not present in the leisure data. A left outer join is used to ensure this.
- Rearranges the data so the key becomes a tuple containing the ID, reconciled name, and amenity type, with a value of 1 for each amenity.
- Performs a `reduceByKey` operation to count the total number of amenities per neighborhood, using the tuple `(neighborhood, amenity)` as the key.

In [None]:
openLRDD = opendata_leisureRDD.map(lambda t: (t['addresses_neighborhood_name'],
                                              (t['secondary_filters_name'], 1)))\
    .filter(lambda t: t[0] != '')\
    .partitionBy(2, lambda k: get_partition_id(k[0]))\
    .join(luRDD)\
    .map(lambda t: ((t[1][1][0], t[1][1][1]), {str(t[1][0][0]): t[1][0][1]}))\
    .reduceByKey(lambda x, y: merge_income_dict_count(x, y))\
    .partitionBy(2, lambda k: get_partition_id(k[0]))

`joinOpenRDD`

- Performs a full outer join on the neighborhood ID between `openLRDD` and `openIRDD`.
- Accounts for keys that may not appear in either dataset by setting any `None` values to empty dictionaries.
- Utilizes `mapValues` to transform the values without changing the keys, preserving the partition information, which aids in subsequent joins with `idealistaRDD`.

In [None]:
joinOpenRDD = openLRDD.fullOuterJoin(openIRDD)\
    .mapValues(lambda t: ({} if t[0] == None else t[0], t[1]))\
    .mapValues(lambda t: (t[0], {} if t[1] == None else t[1]))\
    .partitionBy(2, lambda k: get_partition_id(k[0]))\
    .cache()

`ilRDD`

- Creates rows with the property ID as the key and saves the neighborhood name along with other data.
- Performs a `reduceByKey` operation on the property ID to remove duplicates, retaining the record with the latest date through the `reconcile_idealista` function.
- Transforms the key to the neighborhood name for joining with `lookupRDD`.
- Filters out records where the neighborhood name is not a string.
- Joins with `lookupRDD` to get the neighborhood ID and reconciled name.
- Maps the key to a tuple containing the reconciled neighborhood ID and name.
- Joins the RDD with `joinOpenRDD`, which contains both leisure and income data.
- Flattens all values into a single, non-nested tuple.
- Checks for duplicates based on the latest scraping date and retains the most recent data.

In [None]:
ilRDD = idealista.rdd.map(lambda t: (t['propertyCode'], (t['neighborhood'], t['scrap_date'], t[1:])))\
    .reduceByKey(lambda x, y: reconcile_idealista(x, y))\
    .map(lambda t: (t[1][0], (t[0], t[1][2])))\
    .filter(lambda t: isinstance(t[0], str))\
    .partitionBy(2, lambda k: get_partition_id(k[0]))\
    .join(luRDD)\
    .map(lambda t: (t[1][1], t[1][0]))\
    .partitionBy(2, lambda k: get_partition_id(k[0]))\
    .join(joinOpenRDD)\
    .map(lambda t: flatten(t)).cache()

Run first part of the pipeline and cache it

In [None]:
joinOpenRDD.take(1)

Get the dataframe

In [None]:
from pyspark.sql.types import StringType, StructType, StructField

schema_list = [StructField("propertyCode", StringType(), False)] # Not nullable as it is an ID
schema_list.append(StructField("NeighbourhoodID", StringType(), True)) # Nullable, but should not have any null values due to the pipeline
schema_list.append(StructField("NeighbourhoodName", StringType(), True)) # Nullable, but should not have any null values due to the pipeline
for i, field in enumerate(idealistaSchema):
    if i == 0:
        pass
    else:
        schema_list.append(field)
schema_list.append(StructField("LeisureDict", StringType(), True)) # Nullable, but should not have any null values due to the pipeline (at most an empty dict)
schema_list.append(StructField("IncomeDict", StringType(), True)) # Nullable, but should not have any null values due to the pipeline (at most an empty dict)
schema = StructType(schema_list)
df = ctx.createDataFrame(data=ilRDD.collect(), schema=schema)

## Compute the KPIs

### KPI 1: Information Rating of Listing
This KPI measures the "commonness" of the information provided in each property listing. It is defined as the sum of the following features:
- `hasVideo`: Presence of a video in the listing, normalized by the total number of listings with a video.
- `has360`: Presence of a 360-degree view in the listing, normalized by the total number of listings with a 360-degree view.
- `hasPlan`: Presence of a floor plan in the listing, normalized by the total number of listings with a floor plan.
- `has3DTour`: Presence of a 3D tour in the listing, normalized by the total number of listings with a 3D tour.
- `hasStaging`: Presence of staging in the listing, normalized by the total number of listings with staging.
- `numPhotos`: Number of photos in the listing, normalized by the average number of photos across all listings.
- `showAddress`: Whether the address is shown in the listing, normalized by the total number of listings that show the address.

Additionally, the neighborhood name and price are captured for each listing. These will be used for joining datasets and creating meaningful plots in Tableau.

In [None]:
def set_value(avg, t, feature):
    """
    Sets the value of a specific feature in the average dictionary.
    """
    avg['total_' + feature] = 1 if t[feature] else 0
    return avg

def init_averages(t):
    """
    Initializes the average dictionary for each listing.
    """
    avg = {}
    features = ["hasVideo", "has360", "hasPlan", "has3DTour", "hasStaging", "showAddress"]
    for feature in features:
        avg = set_value(avg, t, feature)
    avg["avgNumPhotos"] = t["numPhotos"]
    avg["count"] = 1
    return avg

def calc_totals(x, y):
    """
    Calculates the total counts for all features.
    """
    return {key: x[key] + y[key] for key in x.keys()}

def calc_averages(t):
    """
    Calculates the average number of photos.
    """
    t["avgNumPhotos"] /= t['count']
    return t

def calc_kpi1(t):
    """
    Calculates the KPI1 value for each listing.
    """
    kpi1 = sum(t[1][0][0][key] / t[1][1][key] for key in t[1][0][0].keys())
    return (t[1][0][1], kpi1, t[1][0][2], t[1][0][3])

# Initialize RDD from DataFrame
KPI1rdd = df.rdd.map(lambda t: ('key', (init_averages(t), t['propertyCode'], t['NeighbourhoodName'], t['price'])))\
    .cache()

# Calculate averages
averages = KPI1rdd.mapValues(lambda t: t[0])\
    .reduceByKey(calc_totals)\
    .mapValues(calc_averages).cache()

# Calculate KPI1 and join with averages
KPI1rdd = KPI1rdd.join(averages)\
    .map(calc_kpi1)

# Collect and write results to CSV
kpi1 = KPI1rdd.collect()
features = ['PropertyID', 'InformationScore', 'District', 'Price']
kpi1.sort()
with open('KPIs/kpi1.csv', 'w') as f:
    write = csv.writer(f, lineterminator='\n')
    write.writerow(features)
    write.writerows(kpi1)

### KPI 2: Amenities-to-Price Ratio
This KPI measures the ratio of neighborhood amenities to the average listing price. It is calculated both for selling and renting properties. The KPI is defined as:


$\text{KPI 2} = \frac{\text{Average Listing Price}}{\text{Total Number of Amenities} + 1}$

The '+1' in the denominator is to avoid division by zero.

- `Average Listing Price`: The average price of all listings in the neighborhood for a specific type of operation (selling or renting).
- `Total Number of Amenities`: The total count of different types of amenities in the neighborhood.

The output features are 'District', 'Type of Offer', and 'Price to Leisure Ratio'.

In [None]:
def leisure_str2amount(lstr):
    """
    Converts a leisure string into the total count of amenities.
    """
    amount = 0
    if lstr != '{}':
        for item in lstr.strip("{}").split(","):
            key, val = item.split("=")
            amount += int(val)
    return amount

def init_kpi2(t):
    """
    Initializes the KPI2 values for each listing.
    """
    return ((t['NeighbourhoodName'], t['operation']), (t['price'], leisure_str2amount(t['LeisureDict']), 1))

# Initialize and calculate KPI2
KPI2rdd = df.rdd.map(init_kpi2)\
    .reduceByKey(lambda x, y: (x[0] + y[0], x[1], x[2] + y[2]))\
    .map(lambda t: (t[0][0], t[0][1], (t[1][0] / t[1][2]) / (t[1][1] + 1)))  # Adding 1 to avoid division by zero

# Collect and write results to CSV
kpi2 = KPI2rdd.collect()
features = ['District', 'Type of offer', 'Price to Leisure Ratio']
kpi2.sort()
with open('KPIs/kpi2.csv', 'w') as f:
    write = csv.writer(f, lineterminator='\n')
    write.writerow(features)
    write.writerows(kpi2)

### KPI 3: Monthly Listings per District
This KPI measures the number of listings posted per month for each district (municipality). It is defined as the count of listings grouped by:

- `District`: The name of the district or municipality.
- `Month`: The month in which the listings were posted.

The output features are 'District', 'Month', and 'Number of Listings'.

In [None]:
def init_kpi3(t):
    """
    Initializes the KPI3 values for each listing.
    """
    return ((t['NeighbourhoodName'], t['scrap_date'][:-3]), 1)

# Initialize and calculate KPI3
KPI3rdd = df.rdd.map(init_kpi3)\
    .reduceByKey(lambda x, y: x + y)\
    .map(lambda t: (t[0][0], t[0][1], t[1]))

# Collect and write results to CSV
kpi3 = KPI3rdd.collect()
features = ['District', 'Month', 'Number of Listings']
kpi3.sort()
with open('KPIs/kpi3.csv', 'w') as f:
    write = csv.writer(f, lineterminator='\n')
    write.writerow(features)
    write.writerows(kpi3)