In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import GBTRegressor, DecisionTreeRegressor
from xgboost.spark import SparkXGBRegressor
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import hour, dayofweek, log1p
from pyspark.sql.functions import round, when, col
from pyspark.sql.functions import log1p
import psutil
import threading
import builtins
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from sklearn.neighbors import KNeighborsRegressor
from sklearn.metrics import mean_squared_error, r2_score
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DoubleType
import numpy as np
import time

Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.


In [2]:
spark = SparkSession.builder.appName("NewYorkCityTaxiTripDuration") \
    .master("local[*]") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.memory.fraction", "0.9") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

25/04/08 18:09:44 WARN Utils: Your hostname, dgour-HP-Laptop-15-db1xxx resolves to a loopback address: 127.0.1.1; using 192.168.1.56 instead (on interface wlo1)
25/04/08 18:09:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/08 18:09:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
full_data = spark.read.csv("train.csv", header=True, inferSchema=True)

                                                                                

In [4]:
full_data = full_data.sample(fraction=0.1, seed=42)

In [5]:
# Indexers
vendor_indexer = StringIndexer(inputCol="vendor_id", outputCol="vendor_id_index", handleInvalid="keep")
store_flag_indexer = StringIndexer(inputCol="store_and_fwd_flag", outputCol="store_and_fwd_index", handleInvalid="keep")

# Δημιουργία χρονικών χαρακτηριστικών
full_data = full_data.withColumn("pickup_hour", hour(col("pickup_datetime")))
full_data = full_data.withColumn("pickup_weekday", dayofweek(col("pickup_datetime")))
full_data = full_data.withColumn("dropoff_hour", hour(col("dropoff_datetime")))
full_data = full_data.withColumn("dropoff_weekday", dayofweek(col("dropoff_datetime")))

full_feature_cols = [
    "vendor_id_index", "store_and_fwd_index",
    "passenger_count", "pickup_longitude", "pickup_latitude",
    "dropoff_longitude", "dropoff_latitude",
    "pickup_hour", "pickup_weekday",
    "dropoff_hour", "dropoff_weekday"
]

# VectorAssembler
assembler = VectorAssembler(inputCols=full_feature_cols, outputCol="features")

# Z-score κανονικοποίηση
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

pipeline = Pipeline(stages=[
    vendor_indexer,
    store_flag_indexer,
    assembler,
    scaler
])

# Fit και Transform
pipeline_model = pipeline.fit(full_data)
full_data = pipeline_model.transform(full_data)

# Τελική επιλογή στηλών
full_data = full_data.select("scaled_features", "trip_duration")
full_data = full_data.withColumn("trip_duration", log1p(col("trip_duration")))


                                                                                

In [6]:
# Ορισμός κλάσης για την παρακολούθηση των τιμών των υπολογιστικών πόρων(CPU, memory, Disk I/O)

class ResourceMonitorListener:
    def __init__(self):
        self.cpu_percentages = []
        self.mem_percentages = []
        self.disk_reads = []
        self.disk_writes = []
        self.monitoring = False

    def start_monitoring(self):
        self.monitoring = True
        self.cpu_percentages = []
        self.mem_percentages = []
        self.disk_reads = [psutil.disk_io_counters().read_bytes]
        self.disk_writes = [psutil.disk_io_counters().write_bytes]

# Δημιουργία thread: κάθε 1 δευτερόλεπτο ενημερώνει τις λίστες που περιέχουν τις τιμές των πόρων
        def monitor_loop():
            while self.monitoring:
                self.cpu_percentages.append(psutil.cpu_percent(interval=1))
                self.mem_percentages.append(psutil.virtual_memory().percent)
                disk_io = psutil.disk_io_counters()
                self.disk_reads.append(disk_io.read_bytes)
                self.disk_writes.append(disk_io.write_bytes)

        self.monitor_thread = threading.Thread(target=monitor_loop, daemon=True)
        self.monitor_thread.start()

# Διακοπή του thread 
    def stop_monitoring(self):
        self.monitoring = False
        self.monitor_thread.join()

        self.disk_reads = [x for x in self.disk_reads if isinstance(x, (int, float))]
        self.disk_writes = [x for x in self.disk_writes if isinstance(x, (int, float))]

        # Υπολογισμός των τελικών τιμών
        cpu_avg = sum(self.cpu_percentages) / len(self.cpu_percentages) if self.cpu_percentages else 0
        mem_avg = sum(self.mem_percentages) / len(self.mem_percentages) if self.mem_percentages else 0

        # Χρήση των ενσωματωμένων max/min της Python(και όχι του PySpark)
        disk_read_mb = ((builtins.max(self.disk_reads) - builtins.min(self.disk_reads)) / (1024 * 1024)) if self.disk_reads else 0
        disk_write_mb = ((builtins.max(self.disk_writes) - builtins.min(self.disk_writes)) / (1024 * 1024)) if self.disk_writes else 0

        # Επιστρέφει ένα λεξικό που περιέχει τις μετρήσεις
        return {
            "cpu_usage": cpu_avg,
            "mem_usage": mem_avg,
            "disk_read_mb": disk_read_mb,
            "disk_write_mb": disk_write_mb
        }

In [7]:
train_data, test_data = full_data.randomSplit([0.8, 0.2], seed=42)

In [9]:
# Εύρεση των κατάλληλων παραμέτρων για το μοντέλο Random Forest Regressor
# Πραγματοποιήθηκε για όλα τα μοντέλα, αλλά παρουσιάζεται η διαδικασία για 2 μοντέλα(RF και DT).

# Ορισμός του Random Forest Regressor
rf = RandomForestRegressor(featuresCol="scaled_features", labelCol="trip_duration")

# Ορισμός παραμέτρων για Grid Search
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100])  
             .addGrid(rf.maxDepth, [5, 10])  
             .addGrid(rf.minInstancesPerNode, [1, 5])  
             .build())

evaluator_rmse = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="r2")

# Ορισμός του CrossValidator
cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_rmse,  # Βασικό metric για βελτιστοποίηση
    numFolds=3,
    parallelism=2  # Επιτάχυνση με parallelism
)

# Εκπαίδευση του μοντέλου 
cv_model = cv.fit(train_data)

best_model = cv_model.bestModel
predictions = best_model.transform(test_data)

# Υπολογισμός RMSE και R-squared
rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

# Εμφάνιση αποτελεσμάτων
print(f"Best Model RMSE: {rmse:.4f} | R²: {r2:.4f}")
print(" Best Parameters:")
print(f"numTrees: {best_model.getNumTrees}")
print(f"maxDepth: {best_model.getOrDefault('maxDepth')}")
print(f"minInstancesPerNode: {best_model.getOrDefault('minInstancesPerNode')}")




Best Model RMSE: 0.6123 | R²: 0.4128
 Best Parameters:
numTrees: 100
maxDepth: 10
minInstancesPerNode: 1


                                                                                

In [8]:
# Εύρεση των κατάλληλων παραμέτρων για το μοντέλο Decision Tree Regressor

# Ορισμός του Decision Tree Regressor
dt = DecisionTreeRegressor(featuresCol="scaled_features", labelCol="trip_duration")

# Πλέγμα παραμέτρων για Grid Search
paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [5, 10, 15])
             .addGrid(dt.minInstancesPerNode, [1, 5, 10])
             .addGrid(dt.maxBins, [32, 64])  
             .build())

# Αξιολογητές
evaluator_rmse = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="rmse")
evaluator_r2 = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="r2")

# CrossValidator
cv = CrossValidator(
    estimator=dt,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator_rmse,
    numFolds=3,
    parallelism=2
)

# Εκπαίδευση
cv_model = cv.fit(train_data)

# Πρόβλεψη
best_model = cv_model.bestModel
predictions = best_model.transform(test_data)

# Αξιολόγηση
rmse = evaluator_rmse.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"\nDecision Tree Regressor (Tuned) RMSE: {rmse:.4f} | R²: {r2:.4f}")
print("Best Parameters:")
print(f" - maxDepth: {best_model.getOrDefault('maxDepth')}")
print(f" - minInstancesPerNode: {best_model.getOrDefault('minInstancesPerNode')}")
print(f" - maxBins: {best_model.getOrDefault('maxBins')}")





Decision Tree Regressor (Tuned) RMSE: 0.5621 | R²: 0.5050
Best Parameters:
 - maxDepth: 15
 - minInstancesPerNode: 10
 - maxBins: 64


                                                                                

In [9]:
# Βελτίωση της απόδοσης του Spark:
# 1) Κατανομή των δεδομένων σε 20 partitions.
# 2) Αποθήκευση των δεδομένων στη μνήμη RAM.Αν ξαναχρειαστούμε τα δεδομένα δε θα διαβαστούν από το δίσκο.
train_data = train_data.repartition(20).cache()
train_count = train_data.count()

# Κλάση υπεύθυνη για την παρακολούθηση των τιμών των υπολογιστικών πόρων(CPU, memory, Disk I/O)
monitor = ResourceMonitorListener()

# Διαφορετικά μεγέθη δεδομένων για scaling test
sizes = [0.1, 0.25, 0.5, 1]

for size in sizes:
    print(f"Training with {size * 100}% of data")

    # Επιλογή τυχαίου δείγματος δεδομένων
    if size == 1:
        train_subset = train_data  # Χρησιμοποιούμε όλο το train_data για 100%
    else:
        # Επιλογή τυχαίου τμήματος δεδομένων για τα άλλα ποσοστά
        train_subset = train_data.sample(fraction=size, seed=42)

    # Μοντέλα 
    models = {
        "Random Forest Regressor": RandomForestRegressor(
            featuresCol="scaled_features",
            labelCol="trip_duration",
            numTrees=100,
            maxDepth=10,
            minInstancesPerNode=1
        ),

        "XGBoost Regressor": SparkXGBRegressor(features_col="scaled_features", label_col="trip_duration", num_workers=2),

        "GBT Regressor": GBTRegressor(
            featuresCol='scaled_features',
            labelCol='trip_duration',
            maxIter=100,
            maxDepth=5
        ),
        "Decision Tree Regressor": DecisionTreeRegressor(
            featuresCol='scaled_features',
            labelCol='trip_duration',
            maxDepth=15,
            minInstancesPerNode=10,
            maxBins=64
        )
    }

    # Αξιολόγηση(δημοφιλείς μετρικές)
    evaluator_rmse = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="rmse")
    evaluator_r2 = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="r2")

    for name, model in models.items():
        print(f"\nTraining {name}...")

        # Υπολογισμοί των πόρων που χρησιμοποιήθηκαν κατά το training και το inference
        monitor.start_monitoring()
        start_train = time.time()

        trained_model = model.fit(train_subset)
        training_time = time.time() - start_train

        train_metrics = monitor.stop_monitoring()

        # Καθυστέρηση για σταθεροποίηση των μετρήσεων
        time.sleep(3)

        start_infer = time.time()
        monitor.start_monitoring()

        predictions = trained_model.transform(test_data)
        inference_latency = time.time() - start_infer

        infer_metrics = monitor.stop_monitoring()

        # Δε θέλουμε predictions<0 (γιατί δεν υπάρχουν διαδρομές που διαρκούν<0 sec)
        # Θέλουμε ακέραιους αριθμούς ως prediction.
        predictions = predictions.withColumn(
            "prediction",
            when(col("prediction") < 0, 0).otherwise(round(col("prediction"), 0))
        )

        # Υπολογισμός μετρικών
        rmse = evaluator_rmse.evaluate(predictions)
        r2 = evaluator_r2.evaluate(predictions)

        # Εκτύπωση αποτελεσμάτων
        print(f"\n{name} (Data Size: {size * 100}%)")
        print(f"RMSE: {rmse:.4f}")
        print(f"R-Squared: {r2:.4f}")
        print(f"Training Time: {training_time:.2f} sec - Inference Latency: {inference_latency:.2f} sec")
        print(f"*Training* CPU: {train_metrics['cpu_usage']:.2f}% | Memory: {train_metrics['mem_usage']:.2f}%")
        print(f"*Training* Disk Read: {train_metrics['disk_read_mb']:.2f} MB | Write: {train_metrics['disk_write_mb']:.2f} MB")
        print(f"*Inference* CPU: {infer_metrics['cpu_usage']:.2f}% | Memory: {infer_metrics['mem_usage']:.2f}%")
        print(f"*Inference* Disk Read: {infer_metrics['disk_read_mb']:.2f} MB | Write: {infer_metrics['disk_write_mb']:.2f} MB")

                                                                                

Training with 10.0% of data

Training Random Forest Regressor...


                                                                                


Random Forest Regressor (Data Size: 10.0%)
RMSE: 0.6716
R-Squared: 0.2935
Training Time: 30.80 sec - Inference Latency: 0.69 sec
*Training* CPU: 81.81% | Memory: 88.64%
*Training* Disk Read: 25.69 MB | Write: 301.15 MB
*Inference* CPU: 13.90% | Memory: 92.80%
*Inference* Disk Read: 0.30 MB | Write: 0.00 MB

Training XGBoost Regressor...


2025-04-06 17:20:48,677 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 2 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.
Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.
2025-04-06 17:20:53,823 INFO XGBoost-PySpark: _train_booster Training on CPUs
[17:20:54] Task 1 got rank 1[17:20:54] Task 0 got rank 0

2025-04-06 17:20:56,823 INFO XG


XGBoost Regressor (Data Size: 10.0%)
RMSE: 0.5631
R-Squared: 0.5032
Training Time: 8.70 sec - Inference Latency: 0.26 sec
*Training* CPU: 27.69% | Memory: 92.39%
*Training* Disk Read: 91.49 MB | Write: 194.69 MB
*Inference* CPU: 23.20% | Memory: 92.70%
*Inference* Disk Read: 0.11 MB | Write: 29.16 MB

Training GBT Regressor...


                                                                                


GBT Regressor (Data Size: 10.0%)
RMSE: 0.5762
R-Squared: 0.4799
Training Time: 86.32 sec - Inference Latency: 0.14 sec
*Training* CPU: 67.13% | Memory: 92.40%
*Training* Disk Read: 20.22 MB | Write: 57.43 MB
*Inference* CPU: 4.40% | Memory: 87.20%
*Inference* Disk Read: 0.35 MB | Write: 0.48 MB

Training Decision Tree Regressor...


                                                                                


Decision Tree Regressor (Data Size: 10.0%)
RMSE: 0.6614
R-Squared: 0.3149
Training Time: 2.47 sec - Inference Latency: 0.08 sec
*Training* CPU: 56.23% | Memory: 87.20%
*Training* Disk Read: 1.22 MB | Write: 4.93 MB
*Inference* CPU: 3.30% | Memory: 87.20%
*Inference* Disk Read: 0.00 MB | Write: 0.57 MB
Training with 25.0% of data

Training Random Forest Regressor...


2025-04-06 17:23:47,586 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 2 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}



Random Forest Regressor (Data Size: 25.0%)
RMSE: 0.6568
R-Squared: 0.3242
Training Time: 37.51 sec - Inference Latency: 0.84 sec
*Training* CPU: 81.33% | Memory: 88.29%
*Training* Disk Read: 4.01 MB | Write: 44.23 MB
*Inference* CPU: 11.70% | Memory: 91.20%
*Inference* Disk Read: 0.00 MB | Write: 0.00 MB

Training XGBoost Regressor...


Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.
Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.
2025-04-06 17:23:49,638 INFO XGBoost-PySpark: _train_booster Training on CPUs
[17:23:50] Task 0 got rank 0[17:23:50] Task 1 got rank 1

2025-04-06 17:23:52,501 INFO XGBoost-PySpark: _fit Finished xgboost training!   
Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'm


XGBoost Regressor (Data Size: 25.0%)
RMSE: 0.5312
R-Squared: 0.5579
Training Time: 5.10 sec - Inference Latency: 0.10 sec
*Training* CPU: 21.97% | Memory: 89.42%
*Training* Disk Read: 0.66 MB | Write: 0.38 MB
*Inference* CPU: 2.50% | Memory: 89.70%
*Inference* Disk Read: 0.00 MB | Write: 0.00 MB

Training GBT Regressor...


                                                                                


GBT Regressor (Data Size: 25.0%)
RMSE: 0.5615
R-Squared: 0.5062
Training Time: 82.13 sec - Inference Latency: 0.14 sec
*Training* CPU: 65.31% | Memory: 92.34%
*Training* Disk Read: 39.24 MB | Write: 109.80 MB
*Inference* CPU: 11.30% | Memory: 87.00%
*Inference* Disk Read: 0.00 MB | Write: 0.41 MB

Training Decision Tree Regressor...


                                                                                


Decision Tree Regressor (Data Size: 25.0%)
RMSE: 0.6387
R-Squared: 0.3609
Training Time: 2.89 sec - Inference Latency: 0.08 sec
*Training* CPU: 66.20% | Memory: 86.97%
*Training* Disk Read: 0.08 MB | Write: 0.50 MB
*Inference* CPU: 2.50% | Memory: 86.90%
*Inference* Disk Read: 0.00 MB | Write: 0.00 MB
Training with 50.0% of data

Training Random Forest Regressor...


2025-04-06 17:26:55,276 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 2 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}



Random Forest Regressor (Data Size: 50.0%)
RMSE: 0.6579
R-Squared: 0.3221
Training Time: 55.47 sec - Inference Latency: 0.94 sec
*Training* CPU: 83.47% | Memory: 87.21%
*Training* Disk Read: 81.64 MB | Write: 195.86 MB
*Inference* CPU: 13.80% | Memory: 91.00%
*Inference* Disk Read: 0.00 MB | Write: 0.18 MB

Training XGBoost Regressor...


Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.
Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.
2025-04-06 17:26:57,218 INFO XGBoost-PySpark: _train_booster Training on CPUs
[17:26:58] Task 0 got rank 0[17:26:58] Task 1 got rank 1

2025-04-06 17:27:00,449 INFO XGBoost-PySpark: _fit Finished xgboost training!   
Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'm


XGBoost Regressor (Data Size: 50.0%)
RMSE: 0.5215
R-Squared: 0.5740
Training Time: 5.33 sec - Inference Latency: 0.12 sec
*Training* CPU: 23.08% | Memory: 91.42%
*Training* Disk Read: 0.86 MB | Write: 0.32 MB
*Inference* CPU: 5.90% | Memory: 91.90%
*Inference* Disk Read: 0.00 MB | Write: 0.00 MB

Training GBT Regressor...


                                                                                


GBT Regressor (Data Size: 50.0%)
RMSE: 0.5560
R-Squared: 0.5158
Training Time: 87.65 sec - Inference Latency: 0.11 sec
*Training* CPU: 66.51% | Memory: 93.24%
*Training* Disk Read: 164.70 MB | Write: 191.29 MB
*Inference* CPU: 3.00% | Memory: 88.50%
*Inference* Disk Read: 0.00 MB | Write: 0.49 MB

Training Decision Tree Regressor...


                                                                                


Decision Tree Regressor (Data Size: 50.0%)
RMSE: 0.6263
R-Squared: 0.3856
Training Time: 3.40 sec - Inference Latency: 0.07 sec
*Training* CPU: 59.52% | Memory: 88.57%
*Training* Disk Read: 0.55 MB | Write: 0.08 MB
*Inference* CPU: 2.00% | Memory: 88.60%
*Inference* Disk Read: 0.00 MB | Write: 0.00 MB
Training with 100% of data

Training Random Forest Regressor...


2025-04-06 17:30:34,340 INFO XGBoost-PySpark: _fit Running xgboost-2.1.4 on 2 workers with
	booster params: {'objective': 'reg:squarederror', 'device': 'cpu', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}



Random Forest Regressor (Data Size: 100%)
RMSE: 0.6641
R-Squared: 0.3091
Training Time: 79.06 sec - Inference Latency: 1.14 sec
*Training* CPU: 86.23% | Memory: 88.16%
*Training* Disk Read: 8.12 MB | Write: 213.23 MB
*Inference* CPU: 13.10% | Memory: 85.70%
*Inference* Disk Read: 0.00 MB | Write: 0.00 MB

Training XGBoost Regressor...


Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.
Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'manylinux_2_28' variant.
2025-04-06 17:30:36,356 INFO XGBoost-PySpark: _train_booster Training on CPUs
[17:30:37] Task 1 got rank 1[17:30:37] Task 0 got rank 0

2025-04-06 17:30:40,012 INFO XGBoost-PySpark: _fit Finished xgboost training!   
Note: You have installed the 'manylinux2014' variant of XGBoost. Certain features such as GPU algorithms or federated learning are not available. To use these features, please upgrade to a recent Linux distro with glibc 2.28+, and install the 'm


XGBoost Regressor (Data Size: 100%)
RMSE: 0.5118
R-Squared: 0.5897
Training Time: 5.82 sec - Inference Latency: 0.09 sec
*Training* CPU: 20.67% | Memory: 87.53%
*Training* Disk Read: 9.62 MB | Write: 6.05 MB
*Inference* CPU: 2.40% | Memory: 88.00%
*Inference* Disk Read: 0.00 MB | Write: 0.00 MB

Training GBT Regressor...


                                                                                


GBT Regressor (Data Size: 100%)
RMSE: 0.5589
R-Squared: 0.5107
Training Time: 93.29 sec - Inference Latency: 0.10 sec
*Training* CPU: 67.43% | Memory: 91.32%
*Training* Disk Read: 0.41 MB | Write: 38.70 MB
*Inference* CPU: 2.30% | Memory: 86.50%
*Inference* Disk Read: 0.00 MB | Write: 0.00 MB

Training Decision Tree Regressor...





Decision Tree Regressor (Data Size: 100%)
RMSE: 0.6146
R-Squared: 0.4084
Training Time: 4.51 sec - Inference Latency: 0.07 sec
*Training* CPU: 65.68% | Memory: 86.52%
*Training* Disk Read: 0.25 MB | Write: 0.48 MB
*Inference* CPU: 2.60% | Memory: 86.60%
*Inference* Disk Read: 0.00 MB | Write: 16.70 MB


                                                                                

In [8]:
# Ξεχωριστό cell για το μοντέλο KNN, καθώς είναι διαφορετική η όλη διαδικασία για το συγκεκριμένο μοντέλο σε αντίθεση με τα προηγούμενα.

# UDF για μετατροπή Vector σε λίστα
vector_to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(DoubleType()))

monitor = ResourceMonitorListener()

# Repartition και cache
train_data = train_data.repartition(20).cache()
train_count = train_data.count()

sizes = [0.1, 0.25, 0.5, 1]

for size in sizes:
    print(f"\nKNN Training with {size * 100}% of data")

    # Υποσύνολο δεδομένων
    train_subset = train_data if size == 1 else train_data.sample(fraction=size, seed=42)

    # Μετατροπή σε pandas
    train_pd = train_subset.select(
        vector_to_array_udf("scaled_features").alias("features"), "trip_duration"
    ).dropna().toPandas()

    test_pd = test_data.select(
        vector_to_array_udf("scaled_features").alias("features"), "trip_duration"
    ).dropna().toPandas()

    X_train = np.vstack(train_pd["features"].values)
    y_train = train_pd["trip_duration"].values
    X_test = np.vstack(test_pd["features"].values)
    y_test = test_pd["trip_duration"].values

    # Εκπαίδευση
    print("Training K-Nearest Neighbors Regressor (KNN)...")
    monitor.start_monitoring()
    start_train = time.time()

    knn_model = KNeighborsRegressor(
        n_neighbors=7,
        weights='distance',
        algorithm='auto',
        leaf_size=30,
        p=2,
        n_jobs=-1
    )
    knn_model.fit(X_train, y_train)

    training_time = time.time() - start_train
    train_metrics = monitor.stop_monitoring()

    # Inference
    time.sleep(3)
    monitor.start_monitoring()
    start_infer = time.time()

    y_pred = knn_model.predict(X_test)

    inference_latency = time.time() - start_infer
    infer_metrics = monitor.stop_monitoring()

    # Καθαρισμός αρνητικών προβλέψεων
    y_pred = np.where(y_pred < 0, 0, np.round(y_pred))

    # Μετρικές
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)

    # Εκτύπωση αποτελεσμάτων
    print(f"\nKNN Results for {size * 100}% data:")
    print(f"RMSE: {rmse:.4f}")
    print(f"R-Squared: {r2:.4f}")
    print(f"Training Time: {training_time:.2f} sec | Inference Latency: {inference_latency:.2f} sec")
    print(f"*Training* CPU: {train_metrics['cpu_usage']:.2f}% | Memory: {train_metrics['mem_usage']:.2f}%")
    print(f"*Training* Disk Read: {train_metrics['disk_read_mb']:.2f} MB | Write: {train_metrics['disk_write_mb']:.2f} MB")
    print(f"*Inference* CPU: {infer_metrics['cpu_usage']:.2f}% | Memory: {infer_metrics['mem_usage']:.2f}%")
    print(f"*Inference* Disk Read: {infer_metrics['disk_read_mb']:.2f} MB | Write: {infer_metrics['disk_write_mb']:.2f} MB")


                                                                                


KNN Training with 10.0% of data


                                                                                

Training K-Nearest Neighbors Regressor (KNN)...

KNN Results for 10.0% data:
RMSE: 0.6926
R-Squared: 0.2486
Training Time: 0.02 sec | Inference Latency: 1.53 sec
*Training* CPU: 10.40% | Memory: 77.00%
*Training* Disk Read: 0.00 MB | Write: 0.00 MB
*Inference* CPU: 72.35% | Memory: 77.00%
*Inference* Disk Read: 0.38 MB | Write: 0.00 MB

KNN Training with 25.0% of data


                                                                                

Training K-Nearest Neighbors Regressor (KNN)...

KNN Results for 25.0% data:
RMSE: 0.6662
R-Squared: 0.3048
Training Time: 0.07 sec | Inference Latency: 1.46 sec
*Training* CPU: 12.10% | Memory: 77.20%
*Training* Disk Read: 0.00 MB | Write: 0.00 MB
*Inference* CPU: 71.50% | Memory: 77.10%
*Inference* Disk Read: 0.00 MB | Write: 0.12 MB

KNN Training with 50.0% of data


                                                                                

Training K-Nearest Neighbors Regressor (KNN)...

KNN Results for 50.0% data:
RMSE: 0.6431
R-Squared: 0.3522
Training Time: 0.15 sec | Inference Latency: 2.13 sec
*Training* CPU: 10.30% | Memory: 77.40%
*Training* Disk Read: 0.00 MB | Write: 0.00 MB
*Inference* CPU: 68.60% | Memory: 77.40%
*Inference* Disk Read: 0.39 MB | Write: 0.07 MB

KNN Training with 100% of data


                                                                                

Training K-Nearest Neighbors Regressor (KNN)...

KNN Results for 100% data:
RMSE: 0.6288
R-Squared: 0.3807
Training Time: 0.40 sec | Inference Latency: 3.21 sec
*Training* CPU: 13.60% | Memory: 79.00%
*Training* Disk Read: 0.00 MB | Write: 0.00 MB
*Inference* CPU: 77.60% | Memory: 79.00%
*Inference* Disk Read: 0.00 MB | Write: 0.05 MB
