# Connect to Hive

In [253]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = "team23"

# Location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()
print("session started")
# Now you can proceed with your Spark operations


ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
spark

ConnectionRefusedError: [Errno 111] Connection refused

<pyspark.sql.session.SparkSession at 0x7ff80e843a10>

# list Hive databases

In [3]:
# print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             retake1|
|             root_db|
|                show|
|     team0_projectdb|
|    team11_projectdb|
|           team12_db|
|team12_hive_proje...|
|    team12_projectdb|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team17_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
|    team21_projectdb|
| team21_projectdb_v2|
+--------------------+
only showing top 20 rows



# Specify the input and output features

In [4]:
# We will use the following features
# Excluded 'comm' because it has a lot of nulls
# Excuded hiredate because it is given as practice to implement the cos_sin_transformation for the student
features = [
    "vendor_id", 
    "passenger_count", 
    "trip_distance", 
    "rate_code", 
    "payment_type", 
    "pickup_location_id", 
    "dropoff_location_id",
    "year",  # keep as numerical
    # cyclical encoded
    "month_sin", "month_cos",
    "day_sin", "day_cos",
    "hour_sin", "hour_cos",
    "minute_sin", "minute_cos",
    "second_sin", "second_cos",
]

# The output/target of our model
label = "tip_amount"

# Read hive tables

In [6]:
taxi = spark.read.format("avro").table('team23_projectdb.trips_part_buck')


In [7]:
taxi.dtypes

[('trip_id', 'int'),
 ('vendor_id', 'int'),
 ('pickup_datetime', 'bigint'),
 ('dropoff_datetime', 'bigint'),
 ('passenger_count', 'int'),
 ('trip_distance', 'string'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('payment_type', 'int'),
 ('fare_amount', 'string'),
 ('extra', 'string'),
 ('mta_tax', 'string'),
 ('tip_amount', 'string'),
 ('tolls_amount', 'string'),
 ('imp_surcharge', 'string'),
 ('total_amount', 'string'),
 ('pickup_location_id', 'int'),
 ('dropoff_location_id', 'int'),
 ('pickup_date', 'string')]

In [8]:
taxi.show()

25/05/04 16:33:27 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
                                                                                

+-------+---------+---------------+----------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+-----------+
|trip_id|vendor_id|pickup_datetime|dropoff_datetime|passenger_count|trip_distance|rate_code|store_and_fwd_flag|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|imp_surcharge|total_amount|pickup_location_id|dropoff_location_id|pickup_date|
+-------+---------+---------------+----------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------+------------------+-------------------+-----------+
|9282024|        2|  1514728527000|   1514729473000|              4|         8.29|        1|                 N|           1|      24.50| 0.50|   0.50|      5.16|        0.00|         0.30|       30.96|               230|                243| 

# Feature selection

In [9]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

# --- Step 1: Split pickup_date into year and month ---
taxi = taxi.withColumn('pickup_year', F.substring('pickup_date', 1, 4).cast('int'))
taxi = taxi.withColumn('pickup_month', F.substring('pickup_date', 6, 2).cast('int'))
taxi = taxi.drop('pickup_date')

# --- Step 2: Cast necessary columns from string to double ---
string_cols = ['trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'imp_surcharge', 'total_amount']
for col_name in string_cols:
    taxi = taxi.withColumn(col_name, F.col(col_name).cast('double'))

# --- Step 3: Check for missing or invalid values ---
print("\n==== Null / Missing Values Check ====\n")
null_counts = taxi.select([F.count(F.when(F.col(c).isNull() | F.isnan(c) | (F.col(c) == ''), c)).alias(c) for c in taxi.columns])
null_counts.show(vertical=True)

# --- Step 4: Basic statistics ---
stats = taxi.describe().toPandas()

print("\n==== Summary Statistics ====\n")
for i, row in stats.iterrows():
    print(f"\n{row['summary'].capitalize()}:")
    for col in stats.columns[1:]:
        print(f"{col} : {row[col]}")

# --- Step 5: Correlation Analysis (numerical features only) ---
numeric_cols = [field for (field, dtype) in taxi.dtypes if dtype in ['int', 'bigint', 'double'] and field != 'tip_amount']

# Assemble numeric features into a feature vector
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features_numeric")
taxi_vector = assembler.transform(taxi).select('features_numeric', 'tip_amount')

# Compute Pearson correlation matrix
print("\n==== Correlation with Tip Amount ====\n")
for feature in numeric_cols:
    corr = taxi.stat.corr(feature, 'tip_amount')
    if corr is not None:
        print(f"{feature}: {corr:.4f}")

# --- Step 6: Distribution analysis (percentiles) ---
print("\n==== Percentiles (25%, 50%, 75%) for Numeric Features ====\n")
for col_name in numeric_cols:
    percentiles = taxi.select(F.percentile_approx(col_name, [0.25, 0.5, 0.75], 1000)).collect()[0][0]
    print(f"{col_name} -> 25%: {percentiles[0]}, 50% (median): {percentiles[1]}, 75%: {percentiles[2]}")

# --- Step 7: Explore Categorical Columns ---
print("\n==== Categorical Columns ====\n")
print("\nStore and Forward Flag distinct values:")
taxi.select('store_and_fwd_flag').distinct().show()

print("\nPayment Type distinct values:")
taxi.select('payment_type').distinct().show()

print("\nRate Code distinct values:")
taxi.select('rate_code').distinct().show()

# --- Step 8: Final Data Types ---
print("\n==== Final Data Types after Processing ====\n")
for name, dtype in taxi.dtypes:
    print(f"{name}: {dtype}")



==== Null / Missing Values Check ====



                                                                                

-RECORD 0------------------
 trip_id             | 0   
 vendor_id           | 0   
 pickup_datetime     | 0   
 dropoff_datetime    | 0   
 passenger_count     | 0   
 trip_distance       | 0   
 rate_code           | 0   
 store_and_fwd_flag  | 0   
 payment_type        | 0   
 fare_amount         | 0   
 extra               | 0   
 mta_tax             | 0   
 tip_amount          | 0   
 tolls_amount        | 0   
 imp_surcharge       | 0   
 total_amount        | 0   
 pickup_location_id  | 0   
 dropoff_location_id | 0   
 pickup_year         | 0   
 pickup_month        | 0   



25/05/04 16:34:36 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                


==== Summary Statistics ====


Count:
trip_id : 10000000
vendor_id : 10000000
pickup_datetime : 10000000
dropoff_datetime : 10000000
passenger_count : 10000000
trip_distance : 10000000
rate_code : 10000000
store_and_fwd_flag : 10000000
payment_type : 10000000
fare_amount : 10000000
extra : 10000000
mta_tax : 10000000
tip_amount : 10000000
tolls_amount : 10000000
imp_surcharge : 10000000
total_amount : 10000000
pickup_location_id : 10000000
dropoff_location_id : 10000000
pickup_year : 10000000
pickup_month : 10000000

Mean:
trip_id : 5000000.5
vendor_id : 1.6143277
pickup_datetime : 1.5298667605022021E12
dropoff_datetime : 1.5298689075286394E12
passenger_count : 1.6029494
trip_distance : 8.84928020999992
rate_code : 1.2012385
store_and_fwd_flag : None
payment_type : 1.1892995
fare_amount : 31.652551951000152
extra : 0.33837806500000006
mta_tax : 0.481928941
tip_amount : 5.5985269889998746
tolls_amount : 2.1379179970021194
imp_surcharge : 0.29788313800085975
total_amount : 40.5160735290

                                                                                

trip_id: -0.0238


                                                                                

vendor_id: 0.0179


                                                                                

pickup_datetime: -0.0178


                                                                                

dropoff_datetime: -0.0178


                                                                                

passenger_count: -0.0045


                                                                                

trip_distance: 0.3530


                                                                                

rate_code: 0.1342


                                                                                

payment_type: -0.5048


                                                                                

fare_amount: 0.0506


                                                                                

extra: 0.0133


                                                                                

mta_tax: -0.1362


                                                                                

tolls_amount: 0.3315


                                                                                

imp_surcharge: 0.0621


                                                                                

total_amount: 0.0882


                                                                                

pickup_location_id: -0.0177


                                                                                

dropoff_location_id: -0.0264


                                                                                

pickup_year: -0.0004


                                                                                

pickup_month: -0.0172

==== Percentiles (25%, 50%, 75%) for Numeric Features ====



                                                                                

trip_id -> 25%: 2496140, 50% (median): 4994989, 75%: 7497136


                                                                                

vendor_id -> 25%: 1, 50% (median): 2, 75%: 2


                                                                                

pickup_datetime -> 25%: 1521729819000, 50% (median): 1528910757000, 75%: 1538042749000


                                                                                

dropoff_datetime -> 25%: 1521737358000, 50% (median): 1528909736000, 75%: 1538033772000


                                                                                

passenger_count -> 25%: 1, 50% (median): 1, 75%: 2


                                                                                

trip_distance -> 25%: 5.83, 50% (median): 8.49, 75%: 11.1


                                                                                

rate_code -> 25%: 1, 50% (median): 1, 75%: 1


                                                                                

payment_type -> 25%: 1, 50% (median): 1, 75%: 1


                                                                                

fare_amount -> 25%: 23.5, 50% (median): 28.5, 75%: 37.0


                                                                                

extra -> 25%: 0.0, 50% (median): 0.0, 75%: 0.5


                                                                                

mta_tax -> 25%: 0.5, 50% (median): 0.5, 75%: 0.5


                                                                                

tolls_amount -> 25%: 0.0, 50% (median): 0.0, 75%: 5.76


                                                                                

imp_surcharge -> 25%: 0.3, 50% (median): 0.3, 75%: 0.3


ERROR:root:KeyboardInterrupt while sending command.               (39 + 1) / 72]
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

KeyboardInterrupt: 



In [10]:
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler, VectorIndexer
from pyspark.ml import Transformer
import math

In [11]:
# Step 1: Decompose pickup_datetime and other time-related features into cyclical components
taxi = taxi.withColumn("pickup_timestamp", F.from_unixtime(F.col("pickup_datetime"))) \
    .withColumn("pickup_year", F.year("pickup_timestamp")) \
    .withColumn("pickup_month", F.month("pickup_timestamp")) \
    .withColumn("pickup_day", F.dayofmonth("pickup_timestamp")) \
    .withColumn("pickup_hour", F.hour("pickup_timestamp")) \
    .withColumn("pickup_minute", F.minute("pickup_timestamp")) \
    .withColumn("pickup_second", F.second("pickup_timestamp"))



In [12]:
# Step 2: Build a custom cyclical transformer
class CyclicalTransformer(Transformer):
    def __init__(self, inputCol=None, maxVal=None):
        super(CyclicalTransformer, self).__init__()
        self.inputCol = inputCol
        self.maxVal = maxVal

    def _transform(self, df):
        return df.withColumn(f"{self.inputCol}_sin", F.sin(2 * math.pi * F.col(self.inputCol) / self.maxVal)) \
                 .withColumn(f"{self.inputCol}_cos", F.cos(2 * math.pi * F.col(self.inputCol) / self.maxVal))




In [13]:
# Step 3: Apply cyclical transformations to time columns
month_cyclical = CyclicalTransformer(inputCol="pickup_month", maxVal=12)
day_cyclical = CyclicalTransformer(inputCol="pickup_day", maxVal=31)
hour_cyclical = CyclicalTransformer(inputCol="pickup_hour", maxVal=24)
minute_cyclical = CyclicalTransformer(inputCol="pickup_minute", maxVal=60)
second_cyclical = CyclicalTransformer(inputCol="pickup_second", maxVal=60)


                                                                                

In [14]:
taxi = month_cyclical.transform(taxi)
taxi = day_cyclical.transform(taxi)
taxi = hour_cyclical.transform(taxi)
taxi = minute_cyclical.transform(taxi)
taxi = second_cyclical.transform(taxi)

In [15]:
# Step 4: Select relevant features and drop unnecessary ones based on correlation
# Based on the correlation, we will exclude:
# 'trip_id', 'dropoff_datetime', 'passenger_count', vendor_id , 'pickup_location_id', 'dropoff_location_id',
feature_cols = [
    "trip_distance",
    "payment_type",
    "rate_code", 
    "mta_tax", 
    "tolls_amount", 
    "imp_surcharge",
    "total_amount",
    "pickup_year",
    "pickup_month_sin", "pickup_month_cos",
    "pickup_day_sin", "pickup_day_cos",
    "pickup_hour_sin", "pickup_hour_cos",
    "pickup_minute_sin", "pickup_minute_cos",
    "pickup_second_sin", "pickup_second_cos"
]

In [16]:
taxi.dtypes

[('trip_id', 'int'),
 ('vendor_id', 'int'),
 ('pickup_datetime', 'bigint'),
 ('dropoff_datetime', 'bigint'),
 ('passenger_count', 'int'),
 ('trip_distance', 'double'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('payment_type', 'int'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('imp_surcharge', 'double'),
 ('total_amount', 'double'),
 ('pickup_location_id', 'int'),
 ('dropoff_location_id', 'int'),
 ('pickup_year', 'int'),
 ('pickup_month', 'int'),
 ('pickup_timestamp', 'string'),
 ('pickup_day', 'int'),
 ('pickup_hour', 'int'),
 ('pickup_minute', 'int'),
 ('pickup_second', 'int'),
 ('pickup_month_sin', 'double'),
 ('pickup_month_cos', 'double'),
 ('pickup_day_sin', 'double'),
 ('pickup_day_cos', 'double'),
 ('pickup_hour_sin', 'double'),
 ('pickup_hour_cos', 'double'),
 ('pickup_minute_sin', 'double'),
 ('pickup_minute_cos', 'double'),
 ('pickup_second_sin', 'double'),
 ('pickup_sec

# Feature extraction

In [17]:
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

# Step 1: Define categorical columns
categorical_columns = ["rate_code", "payment_type"]

# Step 2: Define numerical columns
numerical_columns = [
    "trip_distance", "tolls_amount", "imp_surcharge", 
    "total_amount", "pickup_year", "pickup_month_sin", "pickup_month_cos", 
    "pickup_day_sin", "pickup_day_cos", "pickup_hour_sin", "pickup_hour_cos", 
    "pickup_minute_sin", "pickup_minute_cos"# , "pickup_second_sin", "pickup_second_cos"
]

for c in numerical_columns:
    taxi = taxi.withColumn(c, col(c).cast(DoubleType()))
taxi = taxi.withColumn("tip_amount", col("tip_amount").cast(DoubleType()))

# Step 3: Create StringIndexer for categorical features
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in categorical_columns]

# Step 4: Create VectorAssembler for numerical features
assembler = VectorAssembler(inputCols=numerical_columns, outputCol="numerical_features")

# Step 5: Apply StandardScaler to scale the numerical features
scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_features", withStd=True, withMean=True)

# Step 6: Create a pipeline to apply both StringIndexer for categorical features and StandardScaler for numerical features
pipeline = Pipeline(stages=indexers + [assembler, scaler])

# Step 7: Fit and transform the data
scaling_and_encoding_model = pipeline.fit(taxi)
taxi_transformed = scaling_and_encoding_model.transform(taxi)

# Step 8: Combine scaled features and encoded categorical features into a single vector
final_assembler = VectorAssembler(
    inputCols=[f"{col}_index" for col in categorical_columns] + ["scaled_features"],
    outputCol="features"
)

# Apply final assembler to create the 'features' column
final_df = final_assembler.transform(taxi_transformed)

# Step 9: Select only the 'features' and rename 'tip_amount' to 'label'
final_df = final_df.select("features", col("tip_amount").alias("label"))

# Show the final DataFrame
final_df.show()


Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.ServerSocket.implAccept(ServerSocket.java:560)
	at java.net.ServerSocket.accept(ServerSocket.java:528)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
                                                                                

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,0.0,0.433646...| 11.2|
|[0.0,0.0,-0.51840...| 4.95|
|[0.0,0.0,-0.55240...| 5.55|
|[0.0,0.0,-0.34839...| 6.65|
|[0.0,0.0,0.110628...| 10.0|
|[0.0,0.0,-0.33139...| 6.25|
|[0.0,0.0,-0.65441...|  6.0|
|[0.0,0.0,1.011678...| 8.75|
|[0.0,0.0,0.348641...|  7.8|
|[0.0,0.0,0.773665...|  6.0|
|[0.0,0.0,0.722662...| 6.04|
|[0.0,0.0,0.535651...| 6.95|
|[0.0,0.0,0.042624...|  3.5|
|[0.0,0.0,-0.04237...| 5.35|
|[0.0,0.0,-0.07638...|  6.4|
|[0.0,0.0,0.110628...|  7.0|
|[0.0,0.0,-0.73941...|  2.5|
|[0.0,0.0,0.620656...| 9.96|
|[0.0,0.0,-0.58640...|  7.3|
|[0.0,0.0,0.042624...| 5.75|
+--------------------+-----+
only showing top 20 rows



In [19]:
final_df.dtypes

[('features', 'vector'), ('label', 'double')]

# Split the dataset

In [20]:
transformed=final_df
(train_data, test_data) = transformed.randomSplit([0.7, 0.3], seed = 10)


In [21]:
def run(command):
    import os
    return os.popen(command).read()


In [None]:
train_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")


test_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")



In [94]:
# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > ../data/train.json")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > ../data/test.json")

''

# First model

## Build a model

In [28]:
from pyspark.ml.regression import LinearRegression
# Create Linear Regression Model
lr = LinearRegression()

# Fit the data to the pipeline stages
model_lr = lr.fit(train_data)

25/05/04 17:05:44 WARN Instrumentation: [a09401f2] regParam is zero, which might cause numerical instability and overfitting.
25/05/04 17:06:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/05/04 17:06:53 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

## Predict for test data

In [29]:
predictions = model_lr.transform(test_data)
predictions.select("label", "prediction").show()

[Stage 134:>                                                        (0 + 1) / 1]

+-----+-----------------+
|label|       prediction|
+-----+-----------------+
|  0.0|7.836390419527919|
|  0.0|7.837974345419144|
|  0.0|7.829877867200599|
|  0.0|4.502877050142552|
|  0.0| 4.50021801538909|
|  0.0|4.503570411441559|
|  0.0|4.488905024721694|
|  0.0|4.490926347873421|
| 0.25|4.502323500594206|
| 0.49|4.507184464598853|
| 0.49|4.500972632151903|
| 0.49|4.499051427753117|
| 0.49|4.497901868125037|
| 0.49|4.493495394995312|
|  0.0|4.495943145751278|
|  0.0|4.503616676738039|
|  0.0|4.498549495817342|
|  0.0|4.497380772864993|
|  0.0|4.496338413572973|
|  0.0|4.498158738215477|
+-----+-----------------+
only showing top 20 rows



                                                                                

## Evaluate the model

In [30]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator1_rmse.evaluate(predictions)
r2 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse))
print("R^2 on test data = {}".format(r2))



Root Mean Squared Error (RMSE) on test data = 3.6724492908058566
R^2 on test data = 0.4254532595381808


                                                                                

## Hyperparameter optimization

In [31]:
model_lr.params

[Param(parent='LinearRegression_e536c8e7c678', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LinearRegression_e536c8e7c678', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'),
 Param(parent='LinearRegression_e536c8e7c678', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'),
 Param(parent='LinearRegression_e536c8e7c678', name='featuresCol', doc='features column name.'),
 Param(parent='LinearRegression_e536c8e7c678', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LinearRegression_e536c8e7c678', name='labelCol', doc='label column name.'),
 Param(parent='LinearRegression_e536c8e7c678', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'),
 Param(parent='LinearRegression_e536c8e7c678', name='m

In [32]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

lr = LinearRegression(featuresCol="features", labelCol="label")

grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 1.0]) \
    .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="label", predictionCol="prediction")

cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4,
    seed=42
)

cvModel = cv.fit(train_data)

bestModel = cvModel.bestModel
print(f"Best parameters: regParam={bestModel.getRegParam()}, elasticNet={bestModel.getElasticNetParam()}")



Best parameters: regParam=0.1, elasticNet=1.0


                                                                                

In [34]:
bestModel = cvModel.bestModel
print(f"Best parameters: regParam={bestModel.getRegParam()}, elasticNet={bestModel.getElasticNetParam()}")

Best parameters: regParam=0.1, elasticNet=1.0


## Best model 1


In [35]:
from pprint import pprint
model1 = bestModel
pprint(model1.extractParamMap())

{Param(parent='LinearRegression_eb028268bea9', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_eb028268bea9', name='tol', doc='the convergence tolerance for iterative algorithms (>= 0).'): 1e-06,
 Param(parent='LinearRegression_eb028268bea9', name='regParam', doc='regularization parameter (>= 0).'): 0.1,
 Param(parent='LinearRegression_eb028268bea9', name='maxIter', doc='max number of iterations (>= 0).'): 100,
 Param(parent='LinearRegression_eb028268bea9', name='solver', doc='The solver algorithm for optimization. Supported options: auto, normal, l-bfgs.'): 'auto',
 Param(parent='LinearRegression_eb028268bea9', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
 Param(parent='LinearRegression_eb028268bea9', name='maxBlockSizeInMB', doc='maximum memory in MB for stacking input data into blocks. Data is stacked within partitions. If more than remaining data size in a partition 

## Save the model to HDFS

In [42]:
model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 ../models/model1")

''

25/05/05 04:32:57 ERROR YarnClientSchedulerBackend: YARN application has exited unexpectedly with state KILLED! Check the YARN application logs for more details.
25/05/05 04:32:57 ERROR YarnClientSchedulerBackend: Diagnostics message: Application is killed by ResourceManager as it has exceeded the lifetime period.
25/05/05 04:32:57 WARN TransportChannelHandler: Exception in connection from /10.100.30.58:46386
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:254)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio

## Predict for test data using best model1

In [37]:
predictions = model1.transform(test_data)
predictions.show()

[Stage 267:>                                                        (0 + 1) / 1]

+--------------------+-----+-----------------+
|            features|label|       prediction|
+--------------------+-----+-----------------+
|[0.0,0.0,-1.50446...|  0.0|6.747157660411386|
|[0.0,0.0,-1.50446...|  0.0|4.601947217104696|
|[0.0,0.0,-1.50446...|  0.0|4.601947217104696|
|[0.0,0.0,-1.50446...|  0.0|4.601947217104696|
|[0.0,0.0,-1.50446...|  0.0|4.601947217104696|
|[0.0,0.0,-1.50446...|  0.0|4.601947217104696|
|[0.0,0.0,-1.50446...|  0.0|4.601947217104696|
|[0.0,0.0,-1.50446...|  0.0|4.601947217104696|
|[0.0,0.0,-1.50446...| 0.01|4.601957437294171|
|[0.0,0.0,-1.50446...| 0.05|4.601998318052072|
|[0.0,0.0,-1.50446...|  0.1|4.602049418999448|
|[0.0,0.0,-1.50446...| 0.17|4.602120960325774|
|[0.0,0.0,-1.50446...| 0.49|4.602448006388978|
|[0.0,0.0,-1.50446...| 0.49|4.602448006388978|
|[0.0,0.0,-1.50446...| 0.49|4.602448006388978|
|[0.0,0.0,-1.50446...| 0.49|4.602448006388978|
|[0.0,0.0,-1.50446...| 0.49|4.602448006388978|
|[0.0,0.0,-1.50446...| 0.49|4.602448006388978|
|[0.0,0.0,-1.

                                                                                

In [38]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > ../output/model1_predictions.csv")

                                                                                

''

## Evaluate the best model1

In [39]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse1 = evaluator1_rmse.evaluate(predictions)
r21 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
print("R^2 on test data = {}".format(r21))



Root Mean Squared Error (RMSE) on test data = 3.6681455890473145
R^2 on test data = 0.40207776140191664


                                                                                

# Second model

## Build a model

In [24]:
from pyspark.ml.regression import GBTRegressor

# Create Linear Regression Model
gbt = GBTRegressor()

# Fit the data to the pipeline stages
model_gbt = gbt.fit(train_data)

                                                                                

## Predict for test data

In [25]:
predictions = model_gbt.transform(test_data)
predictions.show()

[Stage 217:>                                                        (0 + 1) / 1]

+--------------------+-----+-----------------+
|            features|label|       prediction|
+--------------------+-----+-----------------+
|[0.0,0.0,-1.50446...| 0.02|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...| 0.02|0.883268410047259|
|[0.0,0.0,-1.50446...| 0.16|0.883268410047259|
|[0.0,0.0,-1.50446...| 0.49|0.883268410047259|
|[0.0,0.0,-1.50446...| 0.49|0.883268410047259|
|[0.0,0.0,-1.50446...| 0.49|0.883268410047259|
|[0.0,0.0,-1.50446...| 0.49|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.50446...|  0.0|0.883268410047259|
|[0.0,0.0,-1.

                                                                                

## Evaluate the model

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))



Root Mean Squared Error (RMSE) on test data = 2.576290415629213
R^2 on test data = 0.7141985849993504


                                                                                

## Hyperparameter optimization

In [27]:
model_gbt.params

[Param(parent='GBTRegressor_326f2a155795', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='GBTRegressor_326f2a155795', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='GBTRegressor_326f2a155795', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'a

In [30]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="label",
    predictionCol="prediction", 
    metricName="rmse"
)

gbt = GBTRegressor(featuresCol="features", labelCol="label")

grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5]) \
    .addGrid(gbt.stepSize, [0.05, 0.1]) \
    .build()

cv = CrossValidator(
    estimator=gbt,
    estimatorParamMaps=grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=4,
    seed=42,
    collectSubModels=False
)

cvModel = cv.fit(train_data)

bestModel = cvModel.bestModel
bestModel


25/05/04 03:38:38 WARN CacheManager: Asked to cache already cached data.
25/05/04 03:38:38 WARN CacheManager: Asked to cache already cached data.
25/05/04 04:20:06 WARN BlockManagerMaster: Failed to remove RDD 3586 - Block rdd_3586_63 does not exist
org.apache.spark.SparkException: Block rdd_3586_63 does not exist
	at org.apache.spark.errors.SparkCoreErrors$.blockDoesNotExistError(SparkCoreErrors.scala:318)
	at org.apache.spark.storage.BlockInfoManager.blockInfo(BlockInfoManager.scala:269)
	at org.apache.spark.storage.BlockInfoManager.removeBlock(BlockInfoManager.scala:547)
	at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:2093)
	at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:2057)
	at org.apache.spark.storage.BlockManager.$anonfun$removeRdd$4(BlockManager.scala:1993)
	at org.apache.spark.storage.BlockManager.$anonfun$removeRdd$4$adapted(BlockManager.scala:1993)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.c

GBTRegressionModel: uid=GBTRegressor_9b18197766ee, numTrees=20, numFeatures=15

## Best model 2


In [38]:
from pprint import pprint
model2 = bestModel
pprint(model2.extractParamMap())

{Param(parent='GBTRegressor_9b18197766ee', name='validationTol', doc='Threshold for stopping early when fit with validation is used. If the error rate on the validation input changes by less than the validationTol, then learning will stop early (before `maxIter`). This parameter is ignored when fit without validation is used.'): 0.01,
 Param(parent='GBTRegressor_9b18197766ee', name='minWeightFractionPerNode', doc='Minimum fraction of the weighted sample count that each child must have after split. If a split causes the fraction of the total weight in the left or right child to be less than minWeightFractionPerNode, the split will be discarded as invalid. Should be in interval [0.0, 0.5).'): 0.0,
 Param(parent='GBTRegressor_9b18197766ee', name='seed', doc='random seed.'): -8040837446833772744,
 Param(parent='GBTRegressor_9b18197766ee', name='stepSize', doc='Step size (a.k.a. learning rate) in interval (0, 1] for shrinking the contribution of each estimator.'): 0.1,
 Param(parent='GBTReg

## Save the model to HDFS

In [34]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 ../models/model2")

25/05/04 16:23:56 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
                                                                                

''

## Predict for test data using best model2

In [23]:
from pyspark.ml.regression import GBTRegressionModel

# Load the model
model2 = GBTRegressionModel.load("project/models/model2")

                                                                                

In [24]:
predictions = model2.transform(test_data)
predictions.show()

[Stage 123:>                                                        (0 + 1) / 1]

+--------------------+-----+-----------------+
|            features|label|       prediction|
+--------------------+-----+-----------------+
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...|  0.0|1.075400044122528|
|[0.0,0.0,-1.50446...| 0.01|1.075400044122528|
|[0.0,0.0,-1.50446...| 0.08|1.075400044122528|
|[0.0,0.0,-1.50446...| 0.09|1.075400044122528|
|[0.0,0.0,-1.50446...| 0.49|1.075400044122528|
|[0.0,0.0,-1.50446...| 0.49|1.075400044122528|
|[0.0,0.0,-1.50446...| 0.49|1.075400044122528|
|[0.0,0.0,-1.50446...| 0.49|1.075400044122528|
|[0.0,0.0,-1.

                                                                                

In [26]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > ../output/model2_predictions.csv")

                                                                                

''

## Evaluate the best model2

In [27]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))



Root Mean Squared Error (RMSE) on test data = 2.552106406030934
R^2 on test data = 0.7310987322461698


                                                                                

# Compare best models

In [40]:
models = [[str(model1),rmse1, r21], [str(model2),rmse2, r22]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

[Stage 274:>                                                        (0 + 1) / 1]

+------------------------------------------------------------------------------+------------------+-------------------+
|model                                                                         |RMSE              |R2                 |
+------------------------------------------------------------------------------+------------------+-------------------+
|LinearRegressionModel: uid=LinearRegression_eb028268bea9, numFeatures=15      |3.6681455890473145|0.40207776140191664|
|GBTRegressionModel: uid=GBTRegressor_9b18197766ee, numTrees=20, numFeatures=15|2.552106406030934 |0.7310987322461698 |
+------------------------------------------------------------------------------+------------------+-------------------+



                                                                                

In [41]:
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > ../output/evaluation.csv")

''