# Connect to Hive

In [1]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = 19

# location of your Hive database in HDFS
# warehouse = "project/hive/warehouse"
warehouse = "/user/team19/project/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/03 12:42:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/03 12:42:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/05/03 12:42:10 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/05/03 12:42:11 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
25/05/03 12:42:11 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [2]:
# pip freeze
!java -version

openjdk version "1.8.0_402"
OpenJDK Runtime Environment (build 1.8.0_402-b06)
OpenJDK 64-Bit Server VM (build 25.402-b06, mixed mode)


In [3]:
spark

# list Hive databases

In [4]:
print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES;").show()

                                                                                

[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/apps/hive/warehouse'), Database(name='retake1', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team36/retakedb1'), Database(name='root_db', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/root/root_db'), Database(name='show', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team36/data2'), Database(name='team0_projectdb', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team0/project/hive/warehouse'), Database(name='team11_projectdb', catalog='spark_catalog', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team11/project/hive/warehouse'), Database(name='team12_hive_projectdb', catalog='spark_catalog', description='', locationUri

# Specify the input and output features

In [5]:
spark.sql("SHOW DATABASES").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             retake1|
|             root_db|
|                show|
|     team0_projectdb|
|    team11_projectdb|
|team12_hive_proje...|
|    team12_projectdb|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team17_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
|    team21_projectdb|
| team21_projectdb_v2|
| team21_projectdb_v3|
+--------------------+
only showing top 20 rows



# Read hive tables

In [6]:
spark.sql("USE team19_projectdb").show()
spark.sql("SHOW TABLES").show()
# spark.sql("SELECT * FROM <db_name>.<table_name>").show()

++
||
++
++

+----------------+--------------------+-----------+
|       namespace|           tableName|isTemporary|
+----------------+--------------------+-----------+
|team19_projectdb|      hosts_bucketed|      false|
|team19_projectdb|listings_partitioned|      false|
|team19_projectdb|review_scores_buc...|      false|
+----------------+--------------------+-----------+



In [7]:
hosts_bucketed = spark.read.format("avro").table('team19_projectdb.hosts_bucketed')
listings_partitioned = spark.read.format("avro").table('team19_projectdb.listings_partitioned')
review_scores_bucketed = spark.read.format("avro").table('team19_projectdb.review_scores_bucketed')

In [8]:
# hosts_bucketed.show()
hosts_bucketed.columns

['host_id',
 'host_url',
 'host_name',
 'host_since',
 'host_location',
 'host_about',
 'host_response_time',
 'host_response_rate',
 'host_acceptance_rate',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications']

In [9]:
hosts_bucketed.show(1)

25/05/03 12:42:33 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
[Stage 3:>                                                          (0 + 1) / 1]

+----------------+--------------------+---------+----------+-------------+----------+------------------+------------------+--------------------+-------------------+-------------------------+-------------------+
|         host_id|            host_url|host_name|host_since|host_location|host_about|host_response_time|host_response_rate|host_acceptance_rate|host_listings_count|host_total_listings_count| host_verifications|
+----------------+--------------------+---------+----------+-------------+----------+------------------+------------------+--------------------+-------------------+-------------------------+-------------------+
|71592872.0000000|https://www.airbn...|      Ian|2016-05-12|         NULL|      NULL|within a few hours|       100.0000000|                NULL|          1.0000000|                1.0000000|email,phone,reviews|
+----------------+--------------------+---------+----------+-------------+----------+------------------+------------------+--------------------+------------

                                                                                

In [10]:
listings_partitioned.columns

['id',
 'listing_url',
 'scrape_id',
 'last_scraped',
 'name',
 'host_id',
 'street',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'price',
 'number_of_reviews',
 'reviews_per_month',
 'city',
 'country']

In [11]:
listings_partitioned.show(1)

+-------+--------------------+--------------+------------+--------------------+----------------+--------------------+-------------+---------------+------------+---------+---------+---------+-----------+-----------------+-----------------+-----------+-------------+
|     id|         listing_url|     scrape_id|last_scraped|                name|         host_id|              street|property_type|      room_type|accommodates|bathrooms| bedrooms|     beds|      price|number_of_reviews|reviews_per_month|       city|      country|
+-------+--------------------+--------------+------------+--------------------+----------------+--------------------+-------------+---------------+------------+---------+---------+---------+-----------+-----------------+-----------------+-----------+-------------+
|8867364|https://www.airbn...|20170502172350|  2017-05-03|Great LA one bedr...|46414548.0000000|Mid-Wilshire, Los...|    Apartment|Entire home/apt|   2.0000000|1.0000000|1.0000000|2.0000000|160.0000000|   

In [12]:
review_scores_bucketed.columns

['listing_id',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value']

In [13]:
review_scores_bucketed.show(1)

+----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+
|listing_id|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|
+----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+
|  17895559|                NULL|                  NULL|                     NULL|                 NULL|                       NULL|                  NULL|               NULL|
+----------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+
only showing top 1 row



In [14]:
from pyspark.sql.functions import col

# Основное соединение
result_df = (
    listings_partitioned
    .join(
        hosts_bucketed,
        on="host_id",       # Ключ из listings -> hosts
        how="left"          # Все записи listings, даже без host
    )
    .join(
        review_scores_bucketed,
        on=col("id") == col("listing_id"),  # Ключ listings.id -> reviews.listing_id
        how="left"                          # Все записи listings, даже без reviews
    )
)


In [15]:
result_df.columns

['host_id',
 'id',
 'listing_url',
 'scrape_id',
 'last_scraped',
 'name',
 'street',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'price',
 'number_of_reviews',
 'reviews_per_month',
 'city',
 'country',
 'host_url',
 'host_name',
 'host_since',
 'host_location',
 'host_about',
 'host_response_time',
 'host_response_rate',
 'host_acceptance_rate',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'listing_id',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value']

In [16]:
total_rows = result_df.count()
print(f"Total rows: {total_rows}")



Total rows: 247476


                                                                                

In [17]:
from pyspark.sql.functions import col, count, when

null_counts = result_df.agg(
    *[count(when(col(c).isNull(), c)).alias(c) for c in result_df.columns]
).show(vertical=True)

25/05/03 12:42:50 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 21:>                                                         (0 + 2) / 2]

-RECORD 0-----------------------------
 host_id                     | 0      
 id                          | 0      
 listing_url                 | 0      
 scrape_id                   | 0      
 last_scraped                | 0      
 name                        | 223    
 street                      | 0      
 property_type               | 4      
 room_type                   | 0      
 accommodates                | 39     
 bathrooms                   | 764    
 bedrooms                    | 301    
 beds                        | 476    
 price                       | 4076   
 number_of_reviews           | 0      
 reviews_per_month           | 60968  
 city                        | 235    
 country                     | 0      
 host_url                    | 0      
 host_name                   | 260    
 host_since                  | 259    
 host_location               | 1143   
 host_about                  | 98959  
 host_response_time          | 57987  
 host_response_rate      

                                                                                

In [18]:
features = ['last_scraped', 'street', 'property_type', 'room_type', 'accommodates','bathrooms', 'bedrooms', 'beds', 'number_of_reviews', 'reviews_per_month', 'city',
           'country', 'host_since', 'host_location', 'host_response_time','host_response_rate','host_listings_count', 'host_total_listings_count', 'host_verifications', 'review_scores_rating',
           'review_scores_accuracy','review_scores_cleanliness','review_scores_checkin','review_scores_communication','review_scores_location','review_scores_value']
label = 'price'

In [19]:
result_df = result_df.select(features + [label]).na.drop()
# emps = emps.withColumn("ename_job", F.concat(F.col('ename'), F.lit("_"), F.col('job')))
result_df = result_df.withColumnRenamed("price","label")

result_df.show()



+------------+--------------------+-------------+---------------+------------+---------+---------+---------+-----------------+-----------------+-------------+--------------+----------+--------------------+------------------+------------------+-------------------+-------------------------+--------------------+--------------------+----------------------+-------------------------+---------------------+---------------------------+----------------------+-------------------+-----------+
|last_scraped|              street|property_type|      room_type|accommodates|bathrooms| bedrooms|     beds|number_of_reviews|reviews_per_month|         city|       country|host_since|       host_location|host_response_time|host_response_rate|host_listings_count|host_total_listings_count|  host_verifications|review_scores_rating|review_scores_accuracy|review_scores_cleanliness|review_scores_checkin|review_scores_communication|review_scores_location|review_scores_value|      label|
+------------+--------------

                                                                                

In [20]:
total_rows = result_df.count()
print(f"Total rows: {total_rows}")



Total rows: 151055


                                                                                

In [21]:
result_df.columns

['last_scraped',
 'street',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'number_of_reviews',
 'reviews_per_month',
 'city',
 'country',
 'host_since',
 'host_location',
 'host_response_time',
 'host_response_rate',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'label']

In [22]:
from pyspark.sql.functions import to_date, year, month, dayofmonth

df_with_dates = result_df.withColumn(
    "date_parsed",
    to_date("last_scraped", format="yyyy-MM-dd")
).withColumn(
    "last_scraped_year", year("date_parsed")
).withColumn(
    "month", month("date_parsed")
).withColumn(
    "day", dayofmonth("date_parsed")
)

In [23]:
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCols
from pyspark.sql.functions import sin, cos, pi
import math

class CyclicDateEncoder(Transformer, HasInputCol, HasOutputCols):
    def __init__(self, inputCol=None, outputCols=None):
        super(CyclicDateEncoder, self).__init__()
        self._set(inputCol=inputCol, outputCols=outputCols)
        
    def _transform(self, df):
        input_col = self.getInputCol()
        output_cols = self.getOutputCols()
        if input_col == "month":
            return df.withColumn(
                output_cols[0],  # Например, "month_sin"
                sin(2 * pi() * col(input_col) / 12)  # Нормировка для месяца
            ).withColumn(
                output_cols[1],  # Например, "month_cos"
                cos(2 * pi() * col(input_col) / 12)
            )
        elif input_col == "day":
            return df.withColumn(
                output_cols[0],  # Например, "month_sin"
                sin(2 * pi() * col(input_col) / 31)  # Нормировка для месяца
            ).withColumn(
                output_cols[1],  # Например, "month_cos"
                cos(2 * pi() * col(input_col) / 31)
            )

In [24]:
# Для месяца
month_encoder = CyclicDateEncoder(
    inputCol="month",
    outputCols=["last_scrapped_month_sin", "last_scrapped_month_cos"]
)

# Для дня (аналогично, нормировка на 31)
day_encoder = CyclicDateEncoder(
    inputCol="day",
    outputCols=["last_scrapped_day_sin", "last_scrapped_day_cos"]
)

# Применяем трансформеры
df_encoded = month_encoder.transform(df_with_dates)
df_encoded = day_encoder.transform(df_encoded)

# Результат
# df_encoded.select("last_scraped_year", "month", "day", "month_sin", "month_cos", "day_sin", "day_cos").show(5)

In [25]:
df_encoded.columns

['last_scraped',
 'street',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'number_of_reviews',
 'reviews_per_month',
 'city',
 'country',
 'host_since',
 'host_location',
 'host_response_time',
 'host_response_rate',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'label',
 'date_parsed',
 'last_scraped_year',
 'month',
 'day',
 'last_scrapped_month_sin',
 'last_scrapped_month_cos',
 'last_scrapped_day_sin',
 'last_scrapped_day_cos']

# Feature selection

In [26]:
df_encoded = df_encoded.drop("last_scraped", "date_parsed","day","month")
df_encoded.columns

['street',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'number_of_reviews',
 'reviews_per_month',
 'city',
 'country',
 'host_since',
 'host_location',
 'host_response_time',
 'host_response_rate',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'label',
 'last_scraped_year',
 'last_scrapped_month_sin',
 'last_scrapped_month_cos',
 'last_scrapped_day_sin',
 'last_scrapped_day_cos']

In [27]:
from pyspark.sql.functions import to_date, year, month, dayofmonth

df_with_dates = df_encoded.withColumn(
    "date_parsed",
    to_date("host_since", format="yyyy-MM-dd")  # Уточните формат вашей даты!
).withColumn(
    "year_host_since", year("date_parsed")
).withColumn(
    "month", month("date_parsed")
).withColumn(
    "day", dayofmonth("date_parsed")
)

In [28]:
df_with_dates.columns

['street',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'number_of_reviews',
 'reviews_per_month',
 'city',
 'country',
 'host_since',
 'host_location',
 'host_response_time',
 'host_response_rate',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'label',
 'last_scraped_year',
 'last_scrapped_month_sin',
 'last_scrapped_month_cos',
 'last_scrapped_day_sin',
 'last_scrapped_day_cos',
 'date_parsed',
 'year_host_since',
 'month',
 'day']

In [29]:
# Для месяца
month_encoder = CyclicDateEncoder(
    inputCol="month",
    outputCols=["host_since_month_sin", "host_since_month_cos"]
)

# Для дня (аналогично, нормировка на 31)
day_encoder = CyclicDateEncoder(
    inputCol="day",
    outputCols=["host_since_day_sin", "host_since_day_cos"]
)

# Применяем трансформеры
df_encoded = month_encoder.transform(df_with_dates)
df_encoded = day_encoder.transform(df_encoded)


In [30]:
# df_encoded.columns
df_encoded = df_encoded.drop("month", "day","date_parsed","host_since")
df_encoded.columns

['street',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'number_of_reviews',
 'reviews_per_month',
 'city',
 'country',
 'host_location',
 'host_response_time',
 'host_response_rate',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'label',
 'last_scraped_year',
 'last_scrapped_month_sin',
 'last_scrapped_month_cos',
 'last_scrapped_day_sin',
 'last_scrapped_day_cos',
 'year_host_since',
 'host_since_month_sin',
 'host_since_month_cos',
 'host_since_day_sin',
 'host_since_day_cos']

In [31]:
df_encoded.columns

['street',
 'property_type',
 'room_type',
 'accommodates',
 'bathrooms',
 'bedrooms',
 'beds',
 'number_of_reviews',
 'reviews_per_month',
 'city',
 'country',
 'host_location',
 'host_response_time',
 'host_response_rate',
 'host_listings_count',
 'host_total_listings_count',
 'host_verifications',
 'review_scores_rating',
 'review_scores_accuracy',
 'review_scores_cleanliness',
 'review_scores_checkin',
 'review_scores_communication',
 'review_scores_location',
 'review_scores_value',
 'label',
 'last_scraped_year',
 'last_scrapped_month_sin',
 'last_scrapped_month_cos',
 'last_scrapped_day_sin',
 'last_scrapped_day_cos',
 'year_host_since',
 'host_since_month_sin',
 'host_since_month_cos',
 'host_since_day_sin',
 'host_since_day_cos']

In [32]:
df_encoded = df_encoded.drop("last_scraped")

In [33]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer, RegexTokenizer
from pyspark.sql.functions import col

categoricalCols = ['city','country','host_location','host_verifications', 'property_type','room_type', 'host_response_time']
textCols = ['street']
others = ['accommodates','bathrooms','bedrooms','beds','number_of_reviews','reviews_per_month',
          'host_response_rate','host_listings_count', 'host_total_listings_count','review_scores_rating','review_scores_accuracy',
          'review_scores_cleanliness', 'review_scores_checkin','review_scores_communication', 'review_scores_location', 'review_scores_value',
         'last_scraped_year','host_since_month_sin','host_since_month_cos','host_since_day_sin','host_since_day_cos','year_host_since']

In [34]:
df_encoded.select('street').show(5)
# df_encoded.select('city').show(5)
# df_encoded.select('country').show(5)
# df_encoded.select('host_location').show(5)
# df_encoded.select('host_verifications').show(5)



+--------------------+
|              street|
+--------------------+
|Mission District,...|
|New York, NY 1002...|
|Ficial District, ...|
|Panthéon, Paris, ...|
|Copenhagen, Capit...|
+--------------------+
only showing top 5 rows



                                                                                

In [35]:
# unique_count = df_encoded.select("city").distinct().count()
# print(unique_count)

In [36]:
# unique_count = df_encoded.select("host_location").distinct().count()
# print(unique_count)

In [37]:
# unique_count = df_encoded.select("host_verifications").distinct().count()
# print(unique_count)

In [38]:
# unique_count = df_encoded.select("country").distinct().count()
# print(unique_count)

In [39]:
# unique_count = df_encoded.select("street").distinct().count()
# print(unique_count)

In [40]:
from pyspark.ml.feature import RegexTokenizer, Word2Vec
from pyspark.sql.functions import col

tokenizer = RegexTokenizer(
    inputCol="street", 
    outputCol="city_tokens",
    pattern="[,\s]+"
)

# df_encoded_tok = tokenizer.transform(df_encoded)
# df_encoded_tok.select('city_tokens').show(1)


In [41]:
word2Vec = Word2Vec(
    vectorSize=10,
    minCount=1,
    windowSize=5,
    inputCol="city_tokens",
    outputCol="city_vec"
)
# word2VecModel = word2Vec.fit(df_encoded_tok)
# print(word2VecModel)

# df_encoded_tok = word2VecModel.transform(df_encoded_tok)
# # df_encoded_tok.show()

# # Adding the encoded ename_job to the list of other columns
# others += [city_vec]


In [42]:
indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)).setHandleInvalid("skip") for c in categoricalCols ]
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + others, outputCol= "features")

In [43]:
# df_encoded_tok.show(1)

In [44]:
# You can create a pipeline to use only a single fit and transform on the data.
pipeline = Pipeline(stages=[tokenizer, word2Vec] + indexers + encoders + [assembler])


# Fit the pipeline ==> This will call the fit functions for all transformers if exist
model=pipeline.fit(df_encoded)
# Fit the pipeline ==> This will call the transform functions for all transformers
data = model.transform(df_encoded)

# data.show()

# We delete all features and keep only the features and label columns
data = data.select(["features", "label"])


from pyspark.ml.feature import VectorIndexer

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4
# distinct values are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)
transformed = featureIndexer.transform(data)

# Display the output Spark DataFrame
# transformed.show()


25/05/03 12:46:09 WARN DAGScheduler: Broadcasting large task binary with size 1584.1 KiB
                                                                                

# Feature extraction

# Split the dataset

In [45]:
transformed = transformed.withColumn("label", col("label").cast("double"))

In [46]:
#  split the data into 60% training and 40% test (it is not stratified)
(train_data, test_data) = transformed.randomSplit([0.6, 0.4], seed = 10)

def run(command):
    import os
    return os.popen(command).read()

train_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > ../data/train.json")

test_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > ../data/test.json")

25/05/03 12:46:48 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
25/05/03 12:47:08 WARN DAGScheduler: Broadcasting large task binary with size 3.3 MiB
                                                                                

''

# First model

## Build a model

In [47]:
from pyspark.ml.regression import LinearRegression
# Create Linear Regression Model
lr = LinearRegression()

# Fit the data to the pipeline stages
model_lr = lr.fit(train_data)

25/05/03 12:47:42 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:47 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/03 12:47:47 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:50 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:50 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:51 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:51 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:51 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:51 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:51 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:52 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:47:52 WARN DAGSchedul

## Predict for test data

In [48]:
predictions = model_lr.transform(test_data)
predictions.show()

25/05/03 12:48:40 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
[Stage 791:>                                                        (0 + 1) / 1]

+--------------------+-----+--------------------+------------------+
|            features|label|     indexedFeatures|        prediction|
+--------------------+-----+--------------------+------------------+
|(9436,[0,3082,309...| 56.0|(9436,[0,3082,309...| 76.61436773831156|
|(9436,[0,3082,309...| 65.0|(9436,[0,3082,309...| 93.57470102039406|
|(9436,[0,3082,309...| 99.0|(9436,[0,3082,309...| 98.17849918884895|
|(9436,[0,3082,309...| 55.0|(9436,[0,3082,309...| 69.24144230063212|
|(9436,[0,3082,309...|175.0|(9436,[0,3082,309...|  99.5360312167013|
|(9436,[0,3082,309...| 70.0|(9436,[0,3082,309...| 97.17015544617061|
|(9436,[0,3082,309...| 98.0|(9436,[0,3082,309...|107.99071511178227|
|(9436,[0,3082,309...| 80.0|(9436,[0,3082,309...|102.53600719051792|
|(9436,[0,3082,309...| 85.0|(9436,[0,3082,309...|61.426270517215016|
|(9436,[0,3082,309...| 70.0|(9436,[0,3082,309...|102.45182203371223|
|(9436,[0,3082,309...| 85.0|(9436,[0,3082,309...| 85.78625609815208|
|(9436,[0,3082,309...| 58.0|(9436,

                                                                                

## Evaluate the model

In [49]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator1_rmse.evaluate(predictions)
r2 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse))
print("R^2 on test data = {}".format(r2))

25/05/03 12:48:55 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:49:13 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB

Root Mean Squared Error (RMSE) on test data = 73.41824838925476
R^2 on test data = 0.7233085281452736


                                                                                

## Hyperparameter optimization

In [50]:
model_lr.params

[Param(parent='LinearRegression_2d3a4007ad99', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LinearRegression_2d3a4007ad99', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'),
 Param(parent='LinearRegression_2d3a4007ad99', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'),
 Param(parent='LinearRegression_2d3a4007ad99', name='featuresCol', doc='features column name.'),
 Param(parent='LinearRegression_2d3a4007ad99', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LinearRegression_2d3a4007ad99', name='labelCol', doc='label column name.'),
 Param(parent='LinearRegression_2d3a4007ad99', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'),
 Param(parent='LinearRegression_2d3a4007ad99', name='m

In [51]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
import numpy as np

# Подготовка данных
train_data.cache().count()
sample_data = train_data.sample(False, 0.001, seed=42)

# Уменьшенный набор параметров
grid = (ParamGridBuilder()
        .addGrid(model_lr.aggregationDepth, [2, 3])
        .addGrid(model_lr.regParam, np.logspace(-2, -1, 3))
        .build())


25/05/03 12:49:31 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:49:50 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
                                                                                

In [52]:
print(sample_data.count())

25/05/03 12:50:01 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB

98


                                                                                

In [55]:
train_data.rdd.getNumPartitions()

200

25/05/03 13:06:10 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:11 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:12 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:15 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:16 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:21 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:21 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:26 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:27 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:28 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:35 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:06:36 WARN DAGScheduler: Broadcasting larg

In [53]:
# Быстрая оценка с TrainValidationSplit
tvs = TrainValidationSplit(
    estimator=lr,
    estimatorParamMaps=grid,
    evaluator=RegressionEvaluator(metricName="mse"),  # Более быстрая метрика
    trainRatio=0.8,
    parallelism=5)

# Запуск на уменьшенных данных
tvsModel = tvs.fit(sample_data)
bestModel = tvsModel.bestModel

# Финализация на полных данных с лучшими параметрами
final_model = lr.setParams(**{param.name: bestModel.getOrDefault(param) 
                            for param in bestModel.extractParamMap()})
final_model.fit(train_data)

25/05/03 12:56:55 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:56:55 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:56:55 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:56:55 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:56:55 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:57:06 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:57:09 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:57:14 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:57:19 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:57:24 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:57:35 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 12:57:36 WARN DAGScheduler: Broadcasting larg

KeyboardInterrupt: 

25/05/03 13:05:45 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:05:46 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:05:51 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
25/05/03 13:05:51 WARN DAGScheduler: Broadcasting large task binary with size 3.1 MiB
[Stage 1057:(144 + 2) / 200][Stage 1060:(0 + 0) / 200][Stage 1062:(0 + 0) / 200]

In [53]:
# from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

# import numpy as np


# grid = ParamGridBuilder()
# grid = grid.addGrid(
#                     model_lr.aggregationDepth, [2, 3, 4])\
#                     .addGrid(model_lr.regParam, np.logspace(1e-3,1e-1)
#                     )\
#                     .build()

# cv = CrossValidator(estimator = lr, 
#                     estimatorParamMaps = grid, 
#                     evaluator = evaluator1_rmse,
#                     parallelism = 5,
#                     numFolds=3)

# cvModel = cv.fit(train_data)
# bestModel = cvModel.bestModel
# bestModel

## Best model 1


In [None]:
from pprint import pprint
model1 = bestModel
pprint(model1.extractParamMap())

## Save the model to HDFS

In [None]:
model1.write().overwrite().save("project/big_data_project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/big_data_project/models/model1 models/model1")

## Predict for test data using best model1

In [None]:
predictions = model1.transform(test_data)
predictions.show()

In [None]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

## Evaluate the best model1

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse1 = evaluator1_rmse.evaluate(predictions)
r21 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
print("R^2 on test data = {}".format(r21))

# Second model

## Build a model

In [None]:
from pyspark.ml.regression import GBTRegressor

# Create Linear Regression Model
gbt = GBTRegressor()

# Fit the data to the pipeline stages
model_gbt = gbt.fit(train_data)

## Predict for test data

In [None]:
predictions = model_gbt.transform(test_data)
predictions.show()

## Evaluate the model

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

## Hyperparameter optimization

In [None]:
model_gbt.params

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


grid = ParamGridBuilder()
grid = grid.addGrid(model_gbt.maxDepth, [2, 5, 10]).addGrid(model_gbt.lossType, ['squared', 'absolute']).build()

cv = CrossValidator(estimator = gbt, 
                    estimatorParamMaps = grid, 
                    evaluator = evaluator2_rmse,
                    parallelism = 5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

## Best model 2


In [None]:
from pprint import pprint
model2 = bestModel
pprint(model2.extractParamMap())

## Save the model to HDFS

In [None]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

## Predict for test data using best model2

In [None]:
predictions = model2.transform(test_data)
predictions.show()

In [None]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

## Evaluate the best model2

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

# Compare best models

In [None]:
models = [[str(model1),rmse1, r21], [str(model2),rmse2, r22]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

In [None]:
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")