In [204]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [205]:
# Download Training dataset file.

import requests
URL = "https://www.dropbox.com/s/cem9ea35wllfsqg/train.csv?dl=1"
response = requests.get(URL)
open("train.csv", "wb").write(response.content)

200589097

In [206]:
# Download Testing dataset file.

URL = "https://www.dropbox.com/s/hrqzwaihgeibx4x/test.csv?dl=1"
response = requests.get(URL)
open("test.csv", "wb").write(response.content)

70794289

In [207]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from math import radians, cos, sin, asin, sqrt
from pyspark.sql.types import FloatType

spark = SparkSession.builder.master("local[1]").appName("SparkByExamples.com").getOrCreate()
df = spark.read.csv("./train.csv", header=True)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: string (nullable = true)



In [208]:
df.show()

+---------+---------+-------------------+-------------------+---------------+-------------------+------------------+-------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|   pickup_longitude|   pickup_latitude|  dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+-------------------+------------------+-------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1|-73.982154846191406|40.767936706542969|-73.964630126953125|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.980415344238281|40.738563537597656|-73.999481201171875|40.731151580810547|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:

In [209]:
# Check for NaN or null values

df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+
| id|vendor_id|pickup_datetime|dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|store_and_fwd_flag|trip_duration|
+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+
|  0|        0|              0|               0|              0|               0|              0|                0|               0|                 0|            0|
+---+---------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+



In [210]:
# Removing id from `id` column.

df = df.withColumn("id", expr("replace(id, 'id', '')"))

In [211]:
# Covert Datatype of the dataframe.

def convert_datatype(df):
    df = df.withColumn("id",df.id.cast('int'))
    df = df.withColumn("vendor_id", df.vendor_id.cast('int'))
    df = df.withColumn("pickup_datetime", to_timestamp(df.pickup_datetime, "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("dropoff_datetime", to_timestamp(df.dropoff_datetime, "yyyy-MM-dd HH:mm:ss"))
    df = df.withColumn("passenger_count",df.passenger_count.cast('int'))
    df = df.withColumn("pickup_longitude",df.pickup_longitude.cast('double'))
    df = df.withColumn("pickup_latitude",df.pickup_latitude.cast('double'))
    df = df.withColumn("dropoff_longitude",df.dropoff_longitude.cast('double'))
    df = df.withColumn("dropoff_latitude",df.dropoff_latitude.cast('double'))
    df = df.withColumn('store_and_fwd_flag', F.when(df.store_and_fwd_flag == 'N', 0).otherwise(1))
    df = df.withColumn("trip_duration",df.trip_duration.cast('int'))
    
    return df

In [212]:
# Call Convert datatype

df = convert_datatype(df)

In [213]:
# Check for columns data types

for col in df.dtypes:
    print(col[0]+" , "+col[1])

id , int
vendor_id , int
pickup_datetime , timestamp
dropoff_datetime , timestamp
passenger_count , int
pickup_longitude , double
pickup_latitude , double
dropoff_longitude , double
dropoff_latitude , double
store_and_fwd_flag , int
trip_duration , int


In [214]:
# Calculate difference in dropoff and pickup time

df = df.withColumn('diff_in_dropff_and_pickup',df["dropoff_datetime"].cast("double") - df['pickup_datetime'].cast("double"))

In [215]:
# Calculate the difference between trip duration and the difference of the pickup and dropoff time

df = df.withColumn('diff_in_trip_duration_and_manual_calc_time',df["trip_duration"].cast("double") - df['diff_in_dropff_and_pickup'].cast("double"))

In [216]:
# Checking for unique values in `diff_in_trip_duration_and_manual_calc_time`

df.select('diff_in_trip_duration_and_manual_calc_time').distinct().collect()

[Row(diff_in_trip_duration_and_manual_calc_time=0.0)]

In [217]:
# Checking if trip duration is 0 or negative, if there then we can remove it.

df.filter(df.trip_duration <= 0).count()

0

In [218]:
# Checking for unique values in `store_and_fwd_flag`

df.select('store_and_fwd_flag').distinct().collect()

[Row(store_and_fwd_flag=1), Row(store_and_fwd_flag=0)]

In [219]:
# Calculate distance between two latitute and longtitude.

def distance(pickUpLat1, dropOffLat2, pickUpLon1, dropOffLon2):
     
    # The math module contains a function named
    # radians which converts from degrees to radians.
    lon1 = radians(pickUpLon1)
    lon2 = radians(dropOffLon2)
    lat1 = radians(pickUpLat1)
    lat2 = radians(dropOffLat2)
      
    # Haversine formula
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
 
    c = 2 * asin(sqrt(a))
    
    # Radius of earth in kilometers. Use 3956 for miles
    r = 6371
      
    # calculate the result
    return(c * r)

In [220]:
# Calculating distance between pickup and dropoff latitude and longtitude.

udf_func = udf(distance, FloatType())
df = df.withColumn("distance", udf_func(df.pickup_latitude, df.dropoff_latitude, df.pickup_longitude, df.dropoff_longitude))

In [221]:
# distance is in K.M.

df.select("distance").show()

+----------+
|  distance|
+----------+
| 1.4985207|
| 1.8055072|
| 6.3850985|
| 1.4854984|
| 1.1885885|
| 1.0989425|
| 1.3262786|
| 5.7149806|
| 1.3103533|
| 5.1211615|
| 3.8061395|
| 3.7730958|
|  1.859483|
|0.99168485|
|  6.382836|
|  0.656578|
|  3.428086|
| 2.5386717|
| 4.6052012|
| 1.3032712|
+----------+
only showing top 20 rows



In [222]:
# Only keep records where distance between non overlapped points

print("Count unique values Before filtering for distance < 0 is ", df.count()) 
df = df.filter(df.distance > 0)

Count unique values Before filtering for distance < 0 is  1458644


In [223]:
# Counting total number of unique values in dataframe.

df.count()

1452747

In [224]:
######################################################################################################################

In [225]:
print(df.show(5))

+-------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+-------------------------+------------------------------------------+---------+
|     id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|diff_in_dropff_and_pickup|diff_in_trip_duration_and_manual_calc_time| distance|
+-------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+-------------------------+------------------------------------------+---------+
|2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 0|          455|                    455.

In [227]:
# get the pickup hour dayofweek, month number and year
from pyspark.sql.functions import col
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

df = df.withColumn('day_week', date_format(col("pickup_datetime"), "u"))
df = df.withColumn('year', date_format(col("pickup_datetime"), "y"))
df = df.withColumn('month', date_format(col("pickup_datetime"), "M"))
df = df.withColumn('hour', date_format(col("pickup_datetime"), "H"))

df = df.withColumn("day_week",col("day_week").cast("int"))
df = df.withColumn("year",col("year").cast("int"))
df = df.withColumn("month",col("month").cast("int"))
df = df.withColumn("hour",col("hour").cast("int"))

# Drop datetime column
#df = df.drop('vendor_id', 'pickup_datetime', 'dropoff_datetime', 'store_and_fwd_flag', 'diff_in_dropff_and_pickup', 'diff_in_trip_duration_and_manual_calc_time')
#df = df.drop('id')

df = df.select(col('distance'), col('trip_duration'))

for col in df.dtypes:
    print(col[0]+" , "+col[1])

print(df.show(5))

distance , float
trip_duration , int
+---------+-------------+
| distance|trip_duration|
+---------+-------------+
|1.4985207|          455|
|1.8055072|          663|
|6.3850985|         2124|
|1.4854984|          429|
|1.1885885|          435|
+---------+-------------+
only showing top 5 rows

None


In [262]:
import numpy as np
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

#vecAssembler2.setParams(handleInvalid="skip").transform(dfWithNullsAndNaNs).show()

# Make sure the column data types are correct
df = df.withColumn("trip_duration",df.trip_duration.cast('int'))
df = df.withColumn("distance",df.distance.cast('float'))

# Combine all your features into a single vector
assemble = VectorAssembler(inputCols=df.columns, outputCol='features')
assembled_data = assemble.setParams(handleInvalid="skip").transform(df)
print(assembled_data.count())
assembled_data.show(5)

from pyspark.ml.feature import StandardScaler
scale=StandardScaler(inputCol='features',outputCol='standardized')
data_scale=scale.fit(assembled_data)
data_scale_output=data_scale.transform(assembled_data)
data_scale_output.show(2)

1452747
+---------+-------------+--------------------+
| distance|trip_duration|            features|
+---------+-------------+--------------------+
|1.4985207|          455|[1.49852073192596...|
|1.8055072|          663|[1.80550718307495...|
|6.3850985|         2124|[6.38509845733642...|
|1.4854984|          429|[1.48549842834472...|
|1.1885885|          435|[1.18858850002288...|
+---------+-------------+--------------------+
only showing top 5 rows

+---------+-------------+--------------------+--------------------+
| distance|trip_duration|            features|        standardized|
+---------+-------------+--------------------+--------------------+
|1.4985207|          455|[1.49852073192596...|[0.34852223397543...|
|1.8055072|          663|[1.80550718307495...|[0.41992038114496...|
+---------+-------------+--------------------+--------------------+
only showing top 2 rows



In [255]:
#Check column data types
for col in assembled_data.dtypes:
    print(col[0]+" , "+col[1])

distance , float
trip_duration , int
features , vector


In [260]:
df2 = data_scale_output.select('standardized')
df2.show(5)

print(type)

print(data_scale_output.count())

+--------------------+
|        standardized|
+--------------------+
|[0.34852223397543...|
|[0.41992038114496...|
|[1.48503035766737...|
|[0.34549353891707...|
|[0.27643896442665...|
+--------------------+
only showing top 5 rows

<class 'type'>
1452747


In [263]:
# Try creating the model
kmeans = KMeans(featuresCol='standardized', k=2)
kmeans.setMaxIter(10)
model = kmeans.fit(df2)

PythonException: ignored