### Importing 

In [23]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import seaborn as sns
import matplotlib.pyplot as plt



import json
import requests
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import isnan
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StringType
from pyspark.sql.functions import expr
from pyspark.sql.functions import dayofmonth, month, hour, minute, dayofweek
from pyspark.sql.functions import isnan, isnull, sum
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import countDistinct
from pyspark.sql.types import IntegerType, FloatType


# ignore warnings
import warnings
warnings.filterwarnings('ignore')

### Creating Spark APp

In [24]:
spark = SparkSession.builder \
    .appName("Project") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

### Reading CSV File

In [28]:
merged = spark.read.csv("taxi19_cleaned.csv", header = True)

In [26]:
merged.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'tip_amount',
 'tolls_amount',
 'congestion_surcharge',
 'dropoff_day',
 'day_of_month',
 'dropoff_month',
 'dropoff_hour']

In [27]:
#merged = merged.filter(~(col("passenger_count") >5))

In [6]:
merged= merged.dropDuplicates()

In [29]:
taxi_zones = pd.read_csv("taxi_zones.csv")

In [30]:
manhattan = taxi_zones[taxi_zones["borough"] == "Manhattan"]
location_id = manhattan["LocationID"].tolist()

zone = manhattan["zone"].tolist()

manhattan_zones = dict(zip(location_id, zone))
manhattan_ids = list(manhattan_zones.keys())
#Filtering pickup and dropoff location if either is in manhattan

merged =  merged.filter(col("DOLocationID").isin(manhattan_ids))

### Converting datatypes to correct ones

In [31]:
float_columns = ['trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount','tolls_amount', 
                 'improvement_surcharge', 'total_amount', 'busyness']

date_columns = ['tpep_pickup_datetime', 'tpep_dropoff_datetime']

int_columns = ['passenger_count',
 'PULocationID',
 'DOLocationID',
 'dropoff_hour',
 'dropoff_day',
 'dropoff_month',
 'day_of_month']

category_columns = ['VendorID', 'RatecodeID', 'Store_and_fwd_flag', 'payment_type']

In [32]:
from pyspark.sql.functions import col, to_timestamp, regexp_replace
from pyspark.sql.types import StringType

for column in merged.columns:
    if column in float_columns: 
        merged = merged.withColumn(column, col(column).cast('float'))
    elif column in date_columns:
        merged = merged.withColumn(column, to_timestamp(regexp_replace(col(column).cast(StringType()), 'T', ' '), "yyyy-MM-dd HH:mm:ss"))
    elif column in int_columns:
        merged = merged.withColumn(column, col(column).cast('int'))
    else:
        merged = merged.withColumn(column, col(column).cast(StringType()))

In [33]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [34]:
merged.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'tip_amount',
 'tolls_amount',
 'congestion_surcharge',
 'dropoff_day',
 'day_of_month',
 'dropoff_month',
 'dropoff_hour']

from pyspark.sql.functions import count
grouped_df = merged.groupBy('DOLocationID', 'dropoff_hour_of_day', 'day_of_week',
                            'month'                            
                           ).agg(sum('passenger_count').alias('busyness'))

grouped_df = merged2.groupBy('DOLocationID', 'dropoff_hour_of_day', 'day_of_week', 'month').sum('passenger_count').alias('busyness')


# Join the grouped DataFrame back to the original DataFrame
merged = merged.join(grouped_df, on=['DOLocationID', 'dropoff_hour_of_day','day_of_week',
                            'month'], how='left')

In [35]:
grouped_df = merged.groupBy('DOLocationID','dropoff_hour', 'dropoff_day', 'dropoff_month') \
                    .sum('passenger_count') \
                    .withColumnRenamed('sum(passenger_count)', 'busyness')
# Join the grouped DataFrame back to the original Da
busyness = merged.join(grouped_df, on=['DOLocationID','dropoff_hour', 'dropoff_day', 'dropoff_month'
                            ], how='left')

In [13]:
merged= merged.orderBy('tpep_dropoff_datetime')

In [17]:
week = merged.filter(col('tpep_dropoff_datetime') <= '2019-01-02 00:00:00')

In [14]:
grouped_df.count()

                                                                                

524347

In [15]:
print('m : ', merged.count())
print('m : ', busyness.count())

                                                                                

m :  67265543




m :  67265543


                                                                                

In [18]:
week.count()

                                                                                

0

In [19]:
merged.show()

[Stage 24:>                                                         (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+----------+------------+--------------------+-----------+------------+-------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|payment_type|fare_amount|tip_amount|tolls_amount|congestion_surcharge|dropoff_day|day_of_month|dropoff_month|dropoff_hour|
+--------+--------------------+---------------------+---------------+-------------+------------+------------+------------+-----------+----------+------------+--------------------+-----------+------------+-------------+------------+
|       1| 2019-01-20 17:59:44|  2019-01-20 18:07:51|              1|          1.5|         239|         143|           1|        7.5|       1.0|         0.0|                   0|          1|          20|            1|          18|
|       1| 2019-01-21 00:04:57|  2019-01-21 00:14:46|              1|   

                                                                                

In [36]:
total_passenger_count = grouped_df.select(sum('busyness')).collect()[0][0]
total_passenger_count

                                                                                

109886427

In [46]:
grouped_df2 = taxi18.groupBy('DOLocationID', 'dropoff_hour_of_day', 'day_of_week', 'month') \
                    .sum('passenger_count') \
                    .withColumnRenamed('sum(passenger_count)', 'busyness')
# Join the grouped DataFrame back to the original DataFrame
taxi18 = taxi18.join(grouped_df, on=['DOLocationID', 'dropoff_hour_of_day','day_of_week','day_of_month',
                            'month'], how='left')

In [21]:


# Select distinct values from 'busyness' column and convert to a list
distinct_busyness_list = df_with_busyness.select(col('busyness')).distinct().collect()
distinct_busyness_list = [row['busyness'] for row in distinct_busyness_list]


                                                                                

In [55]:
from pyspark.sql.functions import sum

df_grouped = merged.groupBy('DOLocationID', 'dropoff_hour_of_day', 'month', 'day_of_week') \
               .agg(sum('passenger_count').alias('busyness')) \
               .select('DOLocationID', 'dropoff_hour_of_day', 'month', 'day_of_week', 'busyness')


In [22]:
len(distinct_busyness_list)

7382

### General ALL features

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Assuming you have a Spark DataFrame named "merged" with features and target column
# Select the relevant columns
selected_features = ['PULocationID', 'DOLocationID', 'passenger_count','fare_amount',
                     'tip_amount', 'tolls_amount','trip_distance','day_of_month',
                     'dropoff_hour_of_day', 'day_of_week', 'month']
target_column = 'busyness'

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(merged).select("features", target_column)

# Split the data into training and testing sets (70% for training, 30% for testing)
train_data = assembled_df.limit(int(assembled_df.count() * 0.7))
test_data = assembled_df.subtract(train_data)

# Create a RandomForestRegressor
regressor = RandomForestRegressor(numTrees=2, featuresCol="features", labelCol=target_column)

# Create a pipeline for training
pipeline = Pipeline(stages=[regressor])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Select the predicted and actual values
result = predictions.select(target_column, "prediction")

# Show the predicted and actual values
result.show()

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

# Print the evaluation metrics
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)


23/07/09 23:35:00 WARN MemoryStore: Not enough space to cache rdd_206_0 in memory! (computed 4.1 GiB so far)
23/07/09 23:35:00 WARN BlockManager: Persisting block rdd_206_0 to disk instead.
23/07/09 23:38:16 WARN MemoryStore: Not enough space to cache rdd_206_0 in memory! (computed 4.1 GiB so far)
23/07/09 23:40:03 WARN MemoryStore: Not enough space to cache rdd_206_0 in memory! (computed 4.1 GiB so far)
23/07/09 23:41:42 WARN MemoryStore: Not enough space to cache rdd_206_0 in memory! (computed 4.1 GiB so far)
23/07/09 23:43:31 WARN MemoryStore: Not enough space to cache rdd_206_0 in memory! (computed 4.1 GiB so far)
23/07/09 23:45:17 WARN MemoryStore: Not enough space to cache rdd_206_0 in memory! (computed 4.1 GiB so far)
                                                                                

+--------+------------------+
|busyness|        prediction|
+--------+------------------+
|  5240.0| 3177.641486960656|
|  1954.0|2504.2754554649746|
|  1939.0|2504.2754554649746|
|  2396.0|1906.0266136001214|
|  2162.0|2720.2507188221207|
|   856.0|2504.2754554649746|
|   275.0|2504.2754554649746|
|   897.0|2504.2754554649746|
|   476.0|   372.45373024402|
|  1251.0|2322.4567904752025|
|   323.0| 2995.822821970884|
|  5831.0|2720.2507188221207|
|  1871.0|2504.2754554649746|
|  1757.0| 3177.641486960656|
|  2386.0|2720.2507188221207|
|  3875.0|2720.2507188221207|
|    29.0|2169.2755786044845|
|  6335.0|2720.2507188221207|
|  3689.0| 3663.849604650041|
|  1917.0|2720.2507188221207|
+--------+------------------+
only showing top 20 rows



                                                                                

Root Mean Squared Error (RMSE): 1630.140224082292
Mean Absolute Error (MAE): 1270.2846925770416
R-squared (R2): 0.3290205213096953


In [35]:
# Assuming you have already trained the model and made predictions

# Get the trained RandomForestRegressor model from the pipeline
trained_model = model.stages[0]

# Get the feature importance values
feature_importance = trained_model.featureImportances

# Create a list of tuples with feature names and their importance values
feature_importance_list = [(feature, importance) for feature, importance in zip(selected_features, feature_importance)]

# Sort the feature importance list by importance values in descending order
sorted_feature_importance = sorted(feature_importance_list, key=lambda x: x[1], reverse=True)

# Print the sorted feature importance list
for feature, importance in sorted_feature_importance:
    print(f"Feature: {feature}, Importance: {importance}")


Feature: dropoff_hour_of_day, Importance: 0.31561950397606175
Feature: DOLocationID, Importance: 0.2859065808401694
Feature: trip_distance, Importance: 0.23859211646214631
Feature: fare_amount, Importance: 0.055598592952849246
Feature: PULocationID, Importance: 0.04561896241373737
Feature: month, Importance: 0.030446803821005862
Feature: day_of_week, Importance: 0.02584469632081911
Feature: tip_amount, Importance: 0.0023727432132108865
Feature: passenger_count, Importance: 0.0
Feature: tolls_amount, Importance: 0.0
Feature: day_of_month, Importance: 0.0


In [12]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [40]:
merged2 = spark.read.csv("new_sort.csv", header = True)

In [58]:
from pyspark.sql.functions import count
"""grouped_df = merged2.groupBy('DOLocationID', 'dropoff_hour_of_day', 'day_of_week',
                            'month'                            
                           ).agg(sum('passenger_count').alias('busyness'))"""
grouped_df = merged.groupBy('DOLocationID', 'dropoff_hour_of_day', 'day_of_week', 'month').sum('passenger_count').alias('busyness')


# Join the grouped DataFrame back to the original DataFrame
merged2 = merged2.join(grouped_df, on=['DOLocationID', 'dropoff_hour_of_day','day_of_week',
                            'month'], how='left')

In [43]:


# Select distinct values from 'busyness' column and convert to a list
distinct_busyness_list1 = merged2.select(col('busyness')).distinct().collect()
distinct_busyness_list1 = [row['busyness'] for row in distinct_busyness_list1]


                                                                                

In [44]:
from pyspark.sql.functions import col, to_timestamp, regexp_replace
from pyspark.sql.types import StringType

for column in merged2.columns:
    if column in float_columns: 
        merged2 = merged2.withColumn(column, col(column).cast('float'))
    elif column in date_columns:
        merged2 = merged2.withColumn(column, to_timestamp(regexp_replace(col(column).cast(StringType()), 'T', ' '), "yyyy-MM-dd HH:mm:ss"))
    elif column in int_columns:
        merged2 = merged2.withColumn(column, col(column).cast('int'))
    else:
        merged2 = merged2.withColumn(column, col(column).cast(StringType()))
        
        

In [52]:
merged.columns

['tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'PULocationID',
 'DOLocationID',
 'fare_amount',
 'tip_amount',
 'tolls_amount',
 'journey_length_in_minutes',
 'day_of_month',
 'dropoff_minute_of_hour',
 'pickup_minute_of_hour',
 'dropoff_hour_of_day',
 'pickup_hour_of_day',
 'day_of_week',
 'month']

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession



# Assuming you have a Spark DataFrame named "merged" with features and target column
# Select the relevant columns
selected_features = ['PULocationID', 'DOLocationID', 'passenger_count','fare_amount',
                     'tip_amount', 'tolls_amount','trip_distance','dropoff_hour', 'dropoff_day', 'dropoff_month', 'day_of_month']
target_column = 'busyness'

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(merged).select("features", target_column)

# Split the data into training and testing sets (70% for training, 30% for testing)
train_data = assembled_df.limit(int(assembled_df.count() * 0.7))
test_data = assembled_df.subtract(train_data)

# Create a RandomForestRegressor
regressor = RandomForestRegressor(numTrees=2, featuresCol="features", labelCol=target_column)

# Create a pipeline for training
pipeline = Pipeline(stages=[regressor])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Select the predicted and actual values
result = predictions.select(target_column, "prediction")

# Show the predicted and actual values
result.show()

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

# Print the evaluation metrics
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)


23/07/11 10:32:57 WARN MemoryStore: Not enough space to cache rdd_354_0 in memory! (computed 4.1 GiB so far)
23/07/11 10:32:57 WARN BlockManager: Persisting block rdd_354_0 to disk instead.
23/07/11 10:34:03 WARN MemoryStore: Not enough space to cache rdd_354_0 in memory! (computed 4.1 GiB so far)
23/07/11 10:34:40 WARN MemoryStore: Not enough space to cache rdd_354_0 in memory! (computed 4.1 GiB so far)
23/07/11 10:35:19 WARN MemoryStore: Not enough space to cache rdd_354_0 in memory! (computed 4.1 GiB so far)
23/07/11 10:35:59 WARN MemoryStore: Not enough space to cache rdd_354_0 in memory! (computed 4.1 GiB so far)
23/07/11 10:36:40 WARN MemoryStore: Not enough space to cache rdd_354_0 in memory! (computed 4.1 GiB so far)
                                                                                

+--------+------------------+
|busyness|        prediction|
+--------+------------------+
|       8| 253.1792791993746|
|     158| 258.9214328374647|
|     126| 334.2635796503811|
|     790|  488.776433393954|
|     515| 515.5278005958105|
|     558|  365.539249323331|
|     233|334.89102936410745|
|     187| 345.7211654202948|
|      46|255.96326191881093|
|     846| 410.1282851547487|
|     248| 382.4267910374152|
|     284|222.70836173148197|
|     559|  535.709833230047|
|      68| 370.5612437741019|
|     117| 363.2063754310085|
|     472| 373.1270266110638|
|     480| 363.2063754310085|
|     331| 399.5306963216479|
|     380| 532.1099580339934|
|       4|231.62493852530557|
+--------+------------------+
only showing top 20 rows



                                                                                

Root Mean Squared Error (RMSE): 245.3499950858821
Mean Absolute Error (MAE): 187.72172456109138
R-squared (R2): 0.32581389659718585


In [46]:
len(distinct_busyness_list1)

7877

In [51]:
grouped_df.select(col('busyness')).distinct().count()

                                                                                

7877

In [9]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession



# Assuming you have a Spark DataFrame named "merged" with features and target column
# Select the relevant columns
selected_features = ['DOLocationID','dropoff_hour', 'dropoff_day', 'dropoff_month', 'day_of_month']
target_column = 'busyness'

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(grouped_df).select("features", target_column)

# Split the data into training and testing sets (70% for training, 30% for testing)
train_data = assembled_df.limit(int(assembled_df.count() * 0.7))
test_data = assembled_df.subtract(train_data)

# Create a RandomForestRegressor
regressor = RandomForestRegressor(numTrees=2, featuresCol="features", labelCol=target_column)

# Create a pipeline for training
pipeline = Pipeline(stages=[regressor])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Select the predicted and actual values
result = predictions.select(target_column, "prediction")

# Show the predicted and actual values
result.show()

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

# Print the evaluation metrics
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)


                                                                                

+--------+------------------+
|busyness|        prediction|
+--------+------------------+
|      59| 47.45243096967346|
|      11|138.03111921435945|
|      17|143.47356133290634|
|     162|138.03111921435945|
|     280|154.40721865713093|
|      41|166.97667842978294|
|     267|147.06870174599013|
|     163|166.07047140841624|
|       2| 35.32143266701962|
|     420|209.57475357423579|
|     352| 223.4541572222643|
|      47| 47.45243096967346|
|     227|  79.2823421931985|
|      64| 75.29488309067432|
|      10| 59.14717331350285|
|      14|  79.4248013912474|
|      69| 129.8899557312832|
|      35| 47.45243096967346|
|      45|  79.2823421931985|
|      58| 129.8899557312832|
+--------+------------------+
only showing top 20 rows



[Stage 208:>                                                        (0 + 8) / 8]

Root Mean Squared Error (RMSE): 177.8238185948071
Mean Absolute Error (MAE): 129.9281161012205
R-squared (R2): 0.3879057736174324


                                                                                

In [36]:
grouped_df.select(col('dropoff_day')).distinct().show()



+-----------+
|dropoff_day|
+-----------+
|          1|
|          6|
|          3|
|          5|
|          4|
|          7|
|          2|
+-----------+



                                                                                

In [39]:
grouped_df= grouped_df.orderBy('dropoff_hour',
 'dropoff_day',
 'dropoff_month',
 'day_of_month')

In [38]:
grouped_df.columns

['DOLocationID',
 'dropoff_hour',
 'dropoff_day',
 'dropoff_month',
 'day_of_month',
 'busyness']

### Grouped DF

In [30]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession



# Assuming you have a Spark DataFrame named "merged" with features and target column
# Select the relevant columns
selected_features = ['DOLocationID', 'dropoff_hour_of_day', 'day_of_week', 'month']
target_column = 'busyness'

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(grouped_df).select("features", target_column)

# Split the data into training and testing sets (70% for training, 30% for testing)
train_data = assembled_df.limit(int(assembled_df.count() * 0.7))
test_data = assembled_df.subtract(train_data)

# Create a RandomForestRegressor
regressor = RandomForestRegressor(numTrees=2, featuresCol="features", labelCol=target_column)

# Create a pipeline for training
pipeline = Pipeline(stages=[regressor])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Select the predicted and actual values
result = predictions.select(target_column, "prediction")

# Show the predicted and actual values
result.show()

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

# Print the evaluation metrics
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)


IllegalArgumentException: dropoff_hour_of_day does not exist. Available: DOLocationID, dropoff_hour, dropoff_day, dropoff_month, day_of_month, busyness

In [32]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession



# Assuming you have a Spark DataFrame named "merged" with features and target column
# Select the relevant columns
selected_features = ['DOLocationID', 'dropoff_hour_of_day', 'day_of_week', 'month']
target_column = 'busyness'

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(grouped_df).select("features", target_column)

# Split the data into training and testing sets (70% for training, 30% for testing)
train_data = assembled_df.limit(int(assembled_df.count() * 0.7))
test_data = assembled_df.subtract(train_data)

# Create a RandomForestRegressor
regressor = RandomForestRegressor(numTrees=2, featuresCol="features", labelCol=target_column)

# Create a pipeline for training
pipeline = Pipeline(stages=[regressor])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Select the predicted and actual values
result = predictions.select(target_column, "prediction")

# Show the predicted and actual values
result.show()

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

# Print the evaluation metrics
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)


IllegalArgumentException: dropoff_hour_of_day does not exist. Available: DOLocationID, dropoff_hour, dropoff_day, dropoff_month, day_of_month, busyness

In [14]:
grouped_df.show()



+------------+-------------------+-----------+-----+--------+
|DOLocationID|dropoff_hour_of_day|day_of_week|month|busyness|
+------------+-------------------+-----------+-----+--------+
|          68|                 10|          1|    1|    3129|
|          90|                 16|          1|    1|    3803|
|         249|                 19|          1|    1|    4526|
|          68|                 23|          2|    1|    3128|
|         164|                  9|          3|    1|    6235|
|         144|                 11|          3|    1|    1873|
|         231|                 16|          3|    1|    3062|
|         233|                 17|          3|    1|    2977|
|          48|                  7|          4|    1|    2551|
|         239|                 11|          4|    1|    4114|
|         243|                 14|          4|    1|     173|
|         113|                 22|          4|    1|    3447|
|         141|                 10|          5|    1|    2985|
|       

[Stage 6:>                                                          (0 + 1) / 1]                                                                                

In [34]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession



# Assuming you have a Spark DataFrame named "merged" with features and target column
# Select the relevant columns
selected_features = ['DOLocationID', 'dropoff_hour_of_day', 'day_of_week', 'day_of_month','month']
target_column = 'busyness'

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(grouped_df2).select("features", target_column)

# Split the data into training and testing sets (70% for training, 30% for testing)
train_data = assembled_df.limit(int(assembled_df.count() * 0.7))
test_data = assembled_df.subtract(train_data)

# Create a RandomForestRegressor
regressor = RandomForestRegressor(numTrees=2, featuresCol="features", labelCol=target_column)

# Create a pipeline for training
pipeline = Pipeline(stages=[regressor])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Select the predicted and actual values
result = predictions.select(target_column, "prediction")

# Show the predicted and actual values
result.show()

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

# Print the evaluation metrics
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)


                                                                                

+--------+------------------+
|busyness|        prediction|
+--------+------------------+
|     182|192.60974411828676|
|      11|183.02707651089622|
|      22| 36.93785143716613|
|     229|192.60974411828676|
|     187|156.88475311678036|
|      10|  92.1977939321148|
|       8| 90.32598484348091|
|     102|183.02707651089622|
|      11| 36.93785143716613|
|     366|204.04156114740928|
|      15|155.89170478584305|
|     216| 207.7164088501429|
|     448|271.34246957403315|
|     466|158.22854126732014|
|     547|  248.982188505142|
|     136|223.56363170926045|
|     518|246.72098736462448|
|     480|271.34246957403315|
|     251| 225.2649509667736|
|     305|223.56363170926045|
+--------+------------------+
only showing top 20 rows





Root Mean Squared Error (RMSE): 203.2281787477033
Mean Absolute Error (MAE): 146.4695838133056
R-squared (R2): 0.307839082920046


                                                                                

In [38]:
ordered_df= grouped_df2.orderBy('dropoff_hour_of_day', 'day_of_week', 'day_of_month', 'month')

In [39]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import SparkSession



# Assuming you have a Spark DataFrame named "merged" with features and target column
# Select the relevant columns
selected_features = ['DOLocationID', 'dropoff_hour_of_day', 'day_of_week', 'day_of_month','month']
target_column = 'busyness'

# Assemble the features into a vector column
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(ordered_df).select("features", target_column)

# Split the data into training and testing sets (70% for training, 30% for testing)
train_data = assembled_df.limit(int(assembled_df.count() * 0.7))
test_data = assembled_df.subtract(train_data)

# Create a RandomForestRegressor
regressor = RandomForestRegressor(numTrees=2, featuresCol="features", labelCol=target_column)

# Create a pipeline for training
pipeline = Pipeline(stages=[regressor])

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the test data
predictions = model.transform(test_data)

# Select the predicted and actual values
result = predictions.select(target_column, "prediction")

# Show the predicted and actual values
result.show()

# Evaluate the model's performance
evaluator = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

# Print the evaluation metrics
print("Root Mean Squared Error (RMSE):", rmse)
print("Mean Absolute Error (MAE):", mae)
print("R-squared (R2):", r2)


                                                                                

+--------+------------------+
|busyness|        prediction|
+--------+------------------+
|     120|137.52713478997873|
|      82|   129.61040607411|
|     154|   129.61040607411|
|     182|139.70813745229728|
|      11|139.70813745229728|
|      14|139.70813745229728|
|     229|139.70813745229728|
|     187|142.34056395789304|
|     165|139.70813745229728|
|     102|139.70813745229728|
|      92|142.34056395789304|
|     120|139.70813745229728|
|     157|142.34056395789304|
|      56| 194.5580890455339|
|     112|198.98400742775397|
|     257|198.98400742775397|
|     366|198.98400742775397|
|      32|198.98400742775397|
|      48|198.98400742775397|
|    1014| 194.5580890455339|
+--------+------------------+
only showing top 20 rows





Root Mean Squared Error (RMSE): 243.38463414167302
Mean Absolute Error (MAE): 180.3343209058462
R-squared (R2): 0.11190836619367084


                                                                                

In [40]:
merged.filter(col('passenger_count')< 0).count()

                                                                                

0

In [44]:
grouped_df2.count()

                                                                                

555330

In [13]:
grouped_df.count()

                                                                                

524714

In [45]:
merged.show()

[Stage 701:>                                                        (0 + 1) / 1]

+--------------------+---------------------+---------------+-------------+------------+------------+-----------+----------+------------+-------------------------+------------+----------------------+---------------------+-------------------+------------------+-----------+-----+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|fare_amount|tip_amount|tolls_amount|journey_length_in_minutes|day_of_month|dropoff_minute_of_hour|pickup_minute_of_hour|dropoff_hour_of_day|pickup_hour_of_day|day_of_week|month|
+--------------------+---------------------+---------------+-------------+------------+------------+-----------+----------+------------+-------------------------+------------+----------------------+---------------------+-------------------+------------------+-----------+-----+
| 2017-01-01 00:00:55|  2017-01-01 00:14:52|              2|         4.41|         233|          74|       14.5|       2.0|         0.0|                    13.95|    

                                                                                

In [48]:
grouped_df2 = taxi18.groupBy('DOLocationID', 'dropoff_hour_of_day', 'day_of_week', 'month') \
                    .sum('passenger_count') \
                    .withColumnRenamed('sum(passenger_count)', 'busyness')
# Join the grouped DataFrame back to the original DataFrame
taxi18 = taxi18.join(grouped_df, on=['DOLocationID', 'dropoff_hour_of_day','day_of_week','day_of_month',
                            'month'], how='left')

In [15]:
week = merged.filter(~(col('tpep_dropoff_datetime')> "2019-01-02 00:00:00") )

In [None]:
merged.columns

In [16]:
merged.count()

                                                                                

68971099

In [17]:
week.count()

                                                                                

0

In [21]:
merged.show()



+------------+------------+-----------+-------------+------------+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+----------+------------+--------------------+--------+
|DOLocationID|dropoff_hour|dropoff_day|dropoff_month|day_of_month|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|payment_type|fare_amount|tip_amount|tolls_amount|congestion_surcharge|busyness|
+------------+------------+-----------+-------------+------------+--------+--------------------+---------------------+---------------+-------------+------------+------------+-----------+----------+------------+--------------------+--------+
|         141|          21|          5|            1|           3|       1| 2019-01-03 21:43:46|  2019-01-03 21:45:47|              1|          0.6|         263|           1|        4.0|       1.5|         0.0|                   0|       1|
|         151|          21|         

                                                                                