In [1]:
import gc
import math
import os

from math import radians
from pprint import pprint
from pyspark import keyword_only
from pyspark.ml import Transformer, Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler, RobustScaler, OneHotEncoder, VectorIndexer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, from_unixtime, month, dayofmonth, sin, cos, udf
from pyspark.sql.types import ArrayType, DoubleType

In [2]:
# Add here your team number teamx
TEAM = "team14"

# location of your Hive database in HDFS
WAREHOUSE = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName(f"{TEAM} - spark ML")\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", WAREHOUSE)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .config("spark.shuffle.service.enabled", "false")\
        .config("spark.dynamicAllocation.enabled", "false")\
        .config("spark.cores.max", "5")\
        .config("spark.executor.instances","5")\
        .config("spark.executor.memory","8g")\
        .config("spark.executor.cores","5")\
        .enableHiveSupport()\
        .getOrCreate()

spark.sql("SHOW DATABASES").show()

spark.sql("USE team14_projectdb")
spark.sql("SHOW TABLES").show()

spark.sql("SELECT * FROM team14_projectdb.houses_part").show()

spark.sql("SELECT * FROM team14_projectdb.real_estate_announcements_buck").show()

HOUSES = spark.read.format("avro").table('team14_projectdb.houses_part')

LISTINGS = spark.read.format("avro").table('real_estate_announcements_buck')

HOUSES.printSchema()

LISTINGS.printSchema()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             root_db|
|     team0_projectdb|
|team12_hive_proje...|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team17_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
|    team21_projectdb|
|    team22_projectdb|
|    team23_projectdb|
|    team24_projectdb|
|    team25_projectdb|
|    team26_projectdb|
|    team27_projectdb|
+--------------------+
only showing top 20 rows

+----------------+--------------------+-----------+
|       namespace|           tableName|isTemporary|
+----------------+--------------------+-----------+
|team14_projectdb|          evaluation|      false|
|team14_projectdb|         houses_part|      false|
|team14_projectdb|  model1_predictions|      false|
|team14_projectdb|  model2_predictions|      false|
|team14_projectdb|       my_json_table|      false|
|team1

In [3]:
# Take only the Tatarstan flats, as there are too many instances
HOUSES = HOUSES.filter(HOUSES.id_region == 16)

df = HOUSES.join(LISTINGS, HOUSES.house_id == LISTINGS.house_id)

# The house_id is NOT a FEATURE, but a joining point. So, it is not needed after joining
# because all the info about house is reflected in the other features
# Region id contains only a single value after filtering the Tatarstan data
df = df.drop(LISTINGS['house_id'])
df = df.drop('house_id', 'id_region')

df.show()

HOUSES = None
LISTINGS = None
gc.collect()

+---------+-----------+-------------+---------+---------+-----------+---------------+----------------+--------+-----+------+-----+-------+------------+
|street_id|postal_code|building_type|  geo_lon|  geo_lat|object_type|announcement_id|publication_date|   price|level|levels|rooms|   area|kitchen_area|
+---------+-----------+-------------+---------+---------+-----------+---------------+----------------+--------+-----+------+-----+-------+------------+
|   529655|     423822|            2| 52.37455|55.728302|          0|        3379233|   1619384400000| 4560000|    8|     9|    4| 95.000|       0.000|
|   392690|     420011|            4|  49.1667|55.721046|          0|        3334501|   1619211600000| 4700000|    3|     9|    1| 36.000|       9.300|
|   536292|     420137|            2| 49.15724| 55.83636|          0|        3275651|   1619038800000| 4200000|    4|     9|    1| 33.700|       0.000|
|   434535|     420025|            2|49.223713|55.790405|          0|        3202132|   

407

In [4]:
numerical = ['street_id', 'postal_code', 'level', 'levels', 'rooms', 'area', 'kitchen_area']
categorical = ['building_type', 'object_type']
transformable_features = ['geo_lon', 'geo_lat', 'publication_date']

# We aim to predict the price
LABEL = 'price'

# Drop entries with null values
df = df.select(numerical + categorical + transformable_features + [LABEL]).na.drop()

# Drop duplicates
df = df.dropDuplicates()

# Translate the label column
df = df.withColumnRenamed(LABEL,"label")

# Replace -100 and -1 (studio appartment) with zeros
df = df.withColumn("kitchen_area", when(col("kitchen_area") == -100, 0)\
                   .otherwise(col("kitchen_area")))
df = df.withColumn("rooms", when(col("rooms") == -1, 0).otherwise(col("rooms")))

# Filter outliers. Even the smallest flats cannot cost less than 500,000,
# so it is probably malformed data, and flats that cost more than 100,000,000
# are elite flats for which the cost is determined be some extra elite
# features not included in the data, so they will confuse a model a lot
df = df.filter((col("price") >= 500_000) & (col("price") <= 100_000_000))

print("The dataframe that will be used")
df.show()

# Make sure we still satisfy the requirements
df.count()

The dataframe that will be used
+---------+-----------+-----+------+-----+------+------------+-------------+-----------+---------+---------+----------------+-------+
|street_id|postal_code|level|levels|rooms|  area|kitchen_area|building_type|object_type|  geo_lon|  geo_lat|publication_date|  label|
+---------+-----------+-----+------+-----+------+------------+-------------+-----------+---------+---------+----------------+-------+
|   509685|     422551|    5|     5|    4|66.800|       6.500|            4|          0| 48.55926|55.848755|   1612904400000|2360000|
|   452753|     423457|    3|     9|    3|55.100|       0.000|            1|          0|52.273163|54.901543|   1618520400000|3450000|
|   557192|     423259|    2|     5|    2|44.000|       0.000|            2|          0| 52.43451|54.590355|   1627333200000|1300000|
|   123266|     423826|    1|     5|    3|58.300|       0.000|            2|          0| 52.42587|55.763092|   1616360400000|3550000|
|   255669|     422985|    3| 

161425

In [11]:
class TimeEncoder(Transformer, HasInputCol, DefaultParamsReadable):
    """
    A custom transformer to encode date using sin and cos
    """
    pi = 3.141592653589793
    
    @keyword_only
    def __init__(self, **kwargs):
        super(TimeEncoder, self).__init__()
        self._setDefault(inputCol="publication_date")
        # kwargs = self._input_kwargs
        self._set(**kwargs)

    def getInputCol(self):
        return self.getOrDefault(self.inputCol)

    def _transform(self, dataset):
        # Convert timestamp to date format
        dataset = dataset.withColumn("date", from_unixtime(col(self.getInputCol())))
        
        # Extract month, and day of month
        dataset = dataset.withColumn("month", month(col("date"))) \
                         .withColumn("day", dayofmonth(col("date")))

        # Encode month, day using sine and cosine functions
        dataset = dataset.withColumn("month_sin", sin(col("month") * (2 * self.pi / 12))) \
                         .withColumn("month_cos", cos(col("month") * (2 * self.pi / 12))) \
                         .withColumn("day_sin", sin(col("day") * (2 * self.pi / 31))) \
                         .withColumn("day_cos", cos(col("day") * (2 * self.pi / 31)))

        # Drop intermediate columns
        dataset = dataset.drop(self.getInputCol(), "date", "month", "day")
        return dataset

In [7]:
class GeoToECEF(Transformer, HasOutputCol, DefaultParamsReadable):
    """
    Custom transformer to encode longtitude and latitude columns to the
    ECEF format
    """
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(GeoToECEF, self).__init__()
        self._setDefault(outputCol="ecef_coordinates")
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def getOutputCol(self):
        return self.getOrDefault(self.outputCol)

    def _transform(self, dataset):
        def geo_to_ecef(lat, lon):
            a_cst = 6378137.0  # WGS-84 semi-major axis
            e_2 = 6.6943799901377997e-3  # WGS-84 first eccentricity squared

            lat_rad = radians(lat)
            lon_rad = radians(lon)

            n_cst = a_cst / ((1 - e_2 * (math.sin(lat_rad) ** 2)) ** 0.5)
            ecef_x = n_cst * math.cos(lat_rad) * math.cos(lon_rad)
            ecef_y = n_cst * math.cos(lat_rad) * math.sin(lon_rad)
            ecef_z = n_cst * (1 - e_2) * math.sin(lat_rad)
            return [ecef_x, ecef_y, ecef_z]

        geo_to_ecef_udf = udf(geo_to_ecef, ArrayType(DoubleType()))
        dataset = dataset.withColumn(self.getOutputCol(), geo_to_ecef_udf(dataset["geo_lat"], dataset["geo_lon"]))
        return dataset

In [8]:
class Disassembler(Transformer):
    """
    A custom transformer to disassemble a list column to a list of columns
    """

    def __init__(self, inputCol, outputCol):
        super().__init__()
        self.input_col = inputCol
        self.output_col = outputCol

    def _transform(self, dataset):
        for i, column in enumerate(self.output_col):
            dataset = dataset.withColumn(column, col(self.input_col)[i])
        return dataset.drop(self.input_col)

In [9]:
enc = TimeEncoder()
print(enc.getInputCol())

publication_date


In [12]:
time_features = ["month_sin", "month_cos", "day_sin", "day_cos"]

ecef_features = ["ecef_x", "ecef_y", "ecef_z"]

# Encode the categorical data
encoders = [ OneHotEncoder(inputCol=c, outputCol=f"{c}_encoded") for c in categorical]

# Assemble the features to one column
assembler = VectorAssembler(
    inputCols=numerical+[f"{c}_encoded" for c in categorical]+time_features+ecef_features,
    outputCol="unscaled_features"
)

# Scalr the features using the robust scaler
scaler = RobustScaler(inputCol="unscaled_features", outputCol="features")

# Pipeline to execute transformations
pipeline = Pipeline(stages=[
    TimeEncoder(),
    GeoToECEF(),
    Disassembler("ecef_coordinates", ecef_features)
] + encoders + [assembler, scaler])

# Fit the pipeline to the data
pipeline_model = pipeline.fit(df)

# Apply transformations
TRANSFORMED_DF = pipeline_model.transform(df)

print("The transformed dataframe with all stages")
TRANSFORMED_DF.show(truncate=False)

The transformed dataframe with all stages
+---------+-----------+-----+------+-----+------+------------+-------------+-----------+---------+---------+-------+-----------------------+-----------------------+--------------------+--------------------+------------------+------------------+-----------------+---------------------+-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|street_id|postal_code|level|levels|rooms|area  |kitchen_area|building_type|object_type|geo_lon  |geo_lat  |label  |month_sin              |month_cos              |day_sin             |