# Some imports

In [63]:
import os
from IPython.display import display, HTML
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

os.environ["JAVA_HOME"] = "/home/legorge/jre1.8.0_441"
display(HTML("<style>pre { white-space: pre !important; }</style>"))

from pyspark.sql import SparkSession, functions as F

# Spark Session

In [2]:
spark = SparkSession.builder.appName("demo").getOrCreate()

In [10]:
spark

## Get the data

In [None]:
df = spark.read.parquet('data/data.parquet')
df.show()

# Clean the data

In [55]:

cols = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 'trip_distance', 
        'fare_amount', 'total_amount', 'tip_amount', 'tolls_amount', 'congestion_surcharge']
clean_df = df.select(*cols).withColumn('real_amount', F.round(F.col('total_amount') - F.col('tolls_amount') - F.col('tip_amount'), 2))
clean_df = clean_df.drop('total_amount', 'tip_amount', 'tolls_amount', 'fare_amount')
clean_df.show(10)


# Feature extraction

In [56]:
# [schema.dataType for schema in clean_df.schema if ]
# dir(clean_df.schema)


clean_df = clean_df.withColumn("pickup_day_of_week", F.dayofweek("tpep_pickup_datetime")) \
       .withColumn("pickup_hour", F.hour("tpep_pickup_datetime"))
clean_df = clean_df.withColumn("dropoff_day_of_week", F.dayofweek("tpep_dropoff_datetime")) \
       .withColumn("dropoff_hour", F.hour("tpep_dropoff_datetime"))

clean_df = clean_df.withColumn("duration", F.expr("timestampdiff(SECOND, tpep_pickup_datetime, tpep_dropoff_datetime) / 3600")) 

clean_df = clean_df.drop('tpep_pickup_datetime', 'tpep_dropoff_datetime').cache()

clean_df.show()

# Split train/test data

In [None]:
def split_dataframe(df, train_weight, test_weight, seed=None):
    # Split the DataFrame into training and test sets
    train_df, test_df = df.randomSplit([train_weight, test_weight], seed)
    return train_df, test_df

train_df, test_df = split_dataframe(clean_df, 0.7, 0.3)
# (2 075 719, 888 905)


In [None]:
clean_df.count(), train_df.count(), test_df.count()


# Random Forest Regression

In [None]:


def run_random_forest_regression(df, features, label):
    # Assemble features into a single vector column
    assembler = VectorAssembler(inputCols=features, outputCol="features")
    
    # Define the Random Forest Regression model
    rf = RandomForestRegressor(featuresCol="features", labelCol=label)
    
    # Set up the Pipeline
    pipeline = Pipeline(stages=[assembler, rf])
    
    # Fit the model
    model = pipeline.fit(df)
    
    # Print feature importances
    print("Feature Importances: " + str(model.stages[-1].featureImportances))
    
    # Generate predictions
    predictions = model.transform(df)
    predictions.select("features", label, "prediction").show()

    return model