### Setup drive

In [1]:
# record time
import time

# start record
start = time.time()

In [2]:
# NYC dataset

TRAIN_DATASET = "./train.csv"

In [3]:
""" Global Variables """

MAX_DURATION = 7200  # Maximum trip duration in seconds
MIN_DURATION = 180   # Minimum trip duration in seconds
MAX_DISTANCE = 60.0  # Maximum distance in kilometers
MIN_DISTANCE = 1.5   # Minimum distance in kilometers

DAY_OF_WEEK_MAPPING = {
    1: 'Sunday',
    2: 'Monday',
    3: 'Tuesday',
    4: 'Wednesday',
    5: 'Thursday',
    6: 'Friday',
    7: 'Saturday'
}

### Library imports

In [4]:
# normal imports
import os
import requests
from IPython.display import Image

# for dataframes
import numpy as np
import pandas as pd

# plots
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import seaborn as sns

# geodesic
from geopy.distance import geodesic

# calculation
from math import radians, cos, sin, asin, sqrt

### Setup Spark

In [5]:
# Initialize findspark and import SparkSession
import findspark
findspark.init()

In [6]:
# eda
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, dayofweek, hour, count, when, year, month, minute, quarter, monotonically_increasing_id
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType
from pyspark.sql import functions as F
from pyspark.sql import types as T

# machine learning
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [7]:
# Create a Spark session and context
spark = SparkSession.builder.master("local").appName("Colab").config('spark.ui.port', '4050').getOrCreate()
sc = spark.sparkContext

# ENSF 612 - NYC Taxi Analysis


### Read CSV

In [8]:
# # Initialize SparkSession with further optimized settings
# spark = SparkSession.builder \
#     .appName("Large DataFrame Conversion") \
#     .config("spark.driver.memory", "16g") \
#     .config("spark.executor.memory", "16g") \
#     .config("spark.executor.memoryOverhead", "4g") \
#     .config("spark.driver.cores", "4") \
#     .config("spark.executor.cores", "4") \
#     .config("spark.network.timeout", "12000s") \
#     .config("spark.executor.heartbeatInterval", "12000s") \
#     .config("spark.sql.execution.arrow.enabled", "true") \
#     .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
#     .getOrCreate()

spark = SparkSession.builder.appName("TaxiDataAnalysis").getOrCreate()

# Define the schema corresponding to the CSV file format
schema = StructType([
    StructField("id", StringType(), True),
    StructField("vendor_id", IntegerType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("pickup_longitude", FloatType(), True),
    StructField("pickup_latitude", FloatType(), True),
    StructField("dropoff_longitude", FloatType(), True),
    StructField("dropoff_latitude", FloatType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("trip_duration", IntegerType(), True)
])

# Read the CSV file into a DataFrame
taxi_df = spark.read.csv(TRAIN_DATASET, schema=schema, header=True)

## Initial Explaratory Data Analysis

### Data Cleaning

In [None]:
trip_durations = taxi_df.select("trip_duration").rdd.flatMap(lambda x: x).collect()

# Now, using Matplotlib to plot the histogram with a log scale
plt.figure(figsize=(10,6))
sns.histplot(trip_durations, log_scale=True, bins=100, kde=False, color="red")

plt.title("Trip Duration Distribution")
plt.xlabel("trip_duration")
plt.ylabel("count")
plt.xscale("log") # Set the scale of the x-axis to logarithmic
plt.yscale("linear") # The y-scale is linear in the graph provided
plt.grid(True, which="both", ls="--", linewidth=0.5) # Adding a grid

# Show the plot
plt.show()

In [9]:
# drop the columns we don't need
taxi_df = taxi_df.drop("id", "vendor_id", "store_and_fwd_flag")
taxi_df.show(5)

+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+-------------+
|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|trip_duration|
+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+-------------+
|2016-03-14 17:24:55|2016-03-14 17:32:30|              1|      -73.982155|      40.767937|        -73.96463|       40.765602|          455|
|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|      -73.980415|      40.738564|        -73.99948|        40.73115|          663|
|2016-01-19 11:35:24|2016-01-19 12:10:48|              1|       -73.97903|       40.76394|        -74.00533|       40.710087|         2124|
|2016-04-06 19:32:31|2016-04-06 19:39:40|              1|       -74.01004|       40.71997|        -74.01227|        40.70672|          429|
|2016-03-26 13:30:55

In [10]:
# print current schema
taxi_df.printSchema()

root
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- trip_duration: integer (nullable = true)



In [11]:
# print current summary
taxi_df.describe().show()

+-------+------------------+-------------------+------------------+-------------------+-------------------+-----------------+
|summary|   passenger_count|   pickup_longitude|   pickup_latitude|  dropoff_longitude|   dropoff_latitude|    trip_duration|
+-------+------------------+-------------------+------------------+-------------------+-------------------+-----------------+
|  count|           1458644|            1458644|           1458644|            1458644|            1458644|          1458644|
|   mean|1.6645295219395548| -73.97348630489282|40.750920908391734|  -73.9734159469458|   40.7517995149002|959.4922729603659|
| stddev| 1.314242167823114|0.07090185842270283| 0.032881186257633|0.07064326809720287|0.03589055560563683|5237.431724497642|
|    min|                 0|         -121.93334|         34.359695|        -121.933304|           32.18114|                1|
|    max|                 9|          -61.33553|         51.881084|          -61.33553|           43.92103|          3

In [12]:
# check for null values
null_counts = taxi_df.select([count(when(col(c).isNull(), c)).alias(c) for c in taxi_df.columns])
null_counts.show()

+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+-------------+
|pickup_datetime|dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|trip_duration|
+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+-------------+
|              0|               0|              0|               0|              0|                0|               0|            0|
+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+-------------+



### Pre-processing

In [13]:
def calculate_haversine_distance(pick_up_lat, drop_off_lat, pick_up_lon, drop_off_lon):
    # Convert decimal degrees to radians
    pick_up_lat, drop_off_lat, pick_up_lon, drop_off_lon = map(radians, [pick_up_lat, drop_off_lat, pick_up_lon, drop_off_lon])

    # Haversine formula
    dlon = drop_off_lon - pick_up_lon
    dlat = drop_off_lat - pick_up_lat
    a = sin(dlat / 2)**2 + cos(pick_up_lat) * cos(drop_off_lat) * sin(dlon / 2)**2
    c = 2 * asin(sqrt(a))

    # Radius of earth in kilometers
    earth_radius = 6371
    return c * earth_radius

In [14]:
def calculate_velocity(distance, duration):
    if duration and duration > 0:
        # Convert duration from seconds to hours and calculate velocity
        return distance / (duration / 3600)
    else:
        # Return None or a default value if duration is zero or negative
        return None

In [15]:
# Read the zip code data and create a broadcast variable
def read_zipcode_file(filename):
    """Reads the zip code data from a file and returns a mapping of zip codes to coordinates."""
    zipcode_mapping = {}

    with open(filename, 'r') as file:
        for line in file:
            zip_code, lat, lon = line.strip().split(', ')
            lat_lon = (float(lat), float(lon))
            zipcode_mapping[zip_code] = lat_lon

    return zipcode_mapping

In [16]:
# Define the UDF
def find_nearest_zipcode_udf(lat, lon):
    nearest_zip = None
    shortest_distance = None

    for zip_code, (zip_lat, zip_lon) in broadcast_zipcode_mapping.value.items():
        distance = geodesic((lat, lon), (zip_lat, zip_lon)).miles
        if shortest_distance is None or distance < shortest_distance:
            nearest_zip = zip_code
            shortest_distance = distance

    return nearest_zip

In [17]:
# Register UDF with FloatType return type
haversine_distance_udf = udf(calculate_haversine_distance, FloatType())

# Add distance column using the UDF
taxi_df_with_distance = taxi_df.withColumn(
    "distance",
    haversine_distance_udf(
        col("pickup_latitude"),
        col("dropoff_latitude"),
        col("pickup_longitude"),
        col("dropoff_longitude")
    )
)

taxi_df_with_distance.show(5)

+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+-------------+---------+
|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|trip_duration| distance|
+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+-------------+---------+
|2016-03-14 17:24:55|2016-03-14 17:32:30|              1|      -73.982155|      40.767937|        -73.96463|       40.765602|          455|1.4985207|
|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|      -73.980415|      40.738564|        -73.99948|        40.73115|          663|1.8055072|
|2016-01-19 11:35:24|2016-01-19 12:10:48|              1|       -73.97903|       40.76394|        -74.00533|       40.710087|         2124|6.3850985|
|2016-04-06 19:32:31|2016-04-06 19:39:40|              1|       -74.01004|       40.71997|        -7

In [18]:
print(f"Removing samples with distance less than 0. Current number of samples = {taxi_df_with_distance.count()}")
taxi_df_with_distance = taxi_df_with_distance.filter(taxi_df_with_distance.distance > 0)
print(f"Current number of samples = {taxi_df_with_distance.count()}")

Removing samples with distance less than 0. Current number of samples = 1458644
Current number of samples = 1452747


In [19]:
print(f"Only keeping samples between {MIN_DURATION} seconds and {MAX_DURATION} seconds.\nOnly keeping samples between {MIN_DISTANCE} km and {MAX_DISTANCE} km.")
taxi_df_with_distance = taxi_df_with_distance.filter((taxi_df_with_distance.trip_duration > MIN_DURATION) & (taxi_df_with_distance.trip_duration < MAX_DURATION) & (taxi_df_with_distance.distance > MIN_DISTANCE) & (taxi_df_with_distance.distance < MAX_DISTANCE))
print(f"Current number of samples = {taxi_df_with_distance.count()}")

Only keeping samples between 180 seconds and 7200 seconds.
Only keeping samples between 1.5 km and 60.0 km.
Current number of samples = 961378


In [20]:
calculate_velocity_udf = udf(calculate_velocity, FloatType())

# Add velocity column using the UDF
taxi_df_with_velocity = taxi_df_with_distance.withColumn(
    "velocity",
    calculate_velocity_udf("distance", "trip_duration")
)

taxi_df_with_velocity.show(5)

+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+-------------+---------+---------+
|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|trip_duration| distance| velocity|
+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+-------------+---------+---------+
|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|      -73.980415|      40.738564|        -73.99948|        40.73115|          663|1.8055072|9.8036585|
|2016-01-19 11:35:24|2016-01-19 12:10:48|              1|       -73.97903|       40.76394|        -74.00533|       40.710087|         2124|6.3850985|10.822201|
|2016-05-21 07:54:58|2016-05-21 08:20:49|              1|       -73.96928|       40.79778|        -73.92247|        40.76056|         1551|5.7149806|13.264945|
|2016-03-10 21:45:01|2016-03-10 22:05:26

In [21]:
# Calculate the quintiles for the 'velocity' column
quintiles = taxi_df_with_velocity.approxQuantile("velocity", [0.2, 0.4, 0.6, 0.8], 0.01)

# Reverse categorize the velocity into severity scales
# Now, lower velocity indicates higher severity (heavier traffic)
def reverse_categorize_velocity(value):
    for i, threshold in enumerate(reversed(quintiles), start=1):
        if value <= threshold:
            return 5 - i + 1  # Reverse the scale
    return 1  # Lowest severity for highest velocity

# Register the function as a UDF (User Defined Function)
reverse_categorize_velocity_udf = F.udf(reverse_categorize_velocity, T.IntegerType())

# Create a new column in the DataFrame with the reversed severity scale
taxi_df_with_reversed_severity = taxi_df_with_velocity.withColumn("severity_scale", reverse_categorize_velocity_udf(F.col("velocity")))

# Show the DataFrame with the new severity_scale column
taxi_df_with_reversed_severity.show(5)

+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+-------------+---------+---------+--------------+
|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|trip_duration| distance| velocity|severity_scale|
+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+-------------+---------+---------+--------------+
|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|      -73.980415|      40.738564|        -73.99948|        40.73115|          663|1.8055072|9.8036585|             5|
|2016-01-19 11:35:24|2016-01-19 12:10:48|              1|       -73.97903|       40.76394|        -74.00533|       40.710087|         2124|6.3850985|10.822201|             5|
|2016-05-21 07:54:58|2016-05-21 08:20:49|              1|       -73.96928|       40.79778|        -73.92247|        40.76056|

In [22]:
"""

  Day of the week
  ========================
  1 = Sunday
  2 = Monday
  3 = Tuesday
  4 = Wednesday
  5 = Thursday
  6 = Friday
  7 = Saturday

"""

# transform pickup datetime
taxi_df_transformed = taxi_df_with_reversed_severity.withColumn("year", year("pickup_datetime")) \
                             .withColumn("quarter_of_year", quarter("pickup_datetime")) \
                             .withColumn("month", month("pickup_datetime")) \
                             .withColumn("week_day", dayofweek("pickup_datetime")) \
                             .withColumn("hour", hour("pickup_datetime")) \
                             .withColumn("minute", minute("pickup_datetime"))

# drop pickup and dropoff datetime
taxi_df_week = taxi_df_transformed.drop("pickup_datetime", "dropoff_datetime")
taxi_df_week.show(5)

+---------------+----------------+---------------+-----------------+----------------+-------------+---------+---------+--------------+----+---------------+-----+--------+----+------+
|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|trip_duration| distance| velocity|severity_scale|year|quarter_of_year|month|week_day|hour|minute|
+---------------+----------------+---------------+-----------------+----------------+-------------+---------+---------+--------------+----+---------------+-----+--------+----+------+
|              1|      -73.980415|      40.738564|        -73.99948|        40.73115|          663|1.8055072|9.8036585|             5|2016|              2|    6|       1|   0|    43|
|              1|       -73.97903|       40.76394|        -74.00533|       40.710087|         2124|6.3850985|10.822201|             5|2016|              1|    1|       3|  11|    35|
|              1|       -73.96928|       40.79778|        -73.92247|        40.76056|

In [23]:
# create zip code mapping
all_coordinates_text = "./all_coordinates.txt"
zipcode_mapping = read_zipcode_file(all_coordinates_text)
broadcast_zipcode_mapping = spark.sparkContext.broadcast(zipcode_mapping)

In [24]:
# Register UDF
nearest_zipcode_udf = udf(find_nearest_zipcode_udf, StringType())

# Apply UDF on DataFrame
taxi_df_with_zip_code = taxi_df_week.withColumn("pickup", nearest_zipcode_udf(F.col("pickup_latitude"), F.col("pickup_longitude")))

In [25]:
taxi_df_with_zip_code = taxi_df_with_zip_code.withColumn("dropoff", nearest_zipcode_udf(F.col("dropoff_latitude"), F.col("dropoff_longitude")))

In [26]:
taxi_df_final = taxi_df_with_zip_code.drop("pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude", "passenger_count")
taxi_df_final.show()

+-------------+---------+---------+--------------+----+---------------+-----+--------+----+------+------+-------+
|trip_duration| distance| velocity|severity_scale|year|quarter_of_year|month|week_day|hour|minute|pickup|dropoff|
+-------------+---------+---------+--------------+----+---------------+-----+--------+----+------+------+-------+
|          663|1.8055072|9.8036585|             5|2016|              2|    6|       1|   0|    43| 10010|  10012|
|         2124|6.3850985|10.822201|             5|2016|              1|    1|       3|  11|    35| 10103|  10038|
|         1551|5.7149806|13.264945|             5|2016|              2|    5|       7|   7|    54| 10025|  11106|
|         1225|5.1211615|15.049944|             5|2016|              1|    3|       5|  21|    45| 10016|  10024|
|         1274|3.8061395|10.755182|             5|2016|              2|    5|       3|  22|     8| 10019|  10014|
|         1128|3.7730958|12.041796|             5|2016|              2|    5|       1|  

In [27]:
taxi_df_final.count()

961378

In [28]:
pickup_pd = taxi_df_final.select("pickup").toPandas()

# Plotting the histogram
plt.figure(figsize=(10, 6))
plt.hist(pickup_pd['pickup'], bins=50, color='blue', edgecolor='black')
plt.title('Histogram of Pickup')
plt.xlabel('Pickup')
plt.ylabel('Frequency')
plt.grid(True)
plt.show()

In [None]:
pandas_df = taxi_df_final.toPandas()
output_path = "train_cleaned.csv"
pandas_df.to_csv(output_path, index=False)

  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "c:\Users\chris\AppData\Local\Programs\Python\Python310\lib\socket.py", line 708, in readinto
    raise
TimeoutError: timed out


In [None]:
# Stop the SparkSession
spark.stop()

Py4JJavaError: An error occurred while calling o226.csv.
: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1249)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1454)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:192)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$writeAndCommit$3(FileFormatWriter.scala:275)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:552)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:275)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:402)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:374)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:850)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


### End of notebook

In [None]:
end = time.time()
delta_time = end - start
print(F"It took {delta_time} s to run the notebook.")