In [2]:
import findspark
findspark.init("D:\spark-3.2.1-bin-hadoop3.2")
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression

In [3]:
spark = SparkSession.builder.appName("lr_exmaple").getOrCreate()

In [4]:
data = spark.read.csv("data/dummy_data.csv", inferSchema=True, header=True)

In [5]:
data.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: string (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [6]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [7]:
data.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [17]:
import pyspark.sql.functions as F
import random
# data_modified = data.select("*", (F.col("fare_amount") - F.col("total_amount")).alias("weather"))
# data_modified.columns
data_modified = data.withColumn("weather", F.round(F.rand() * 10))

In [18]:
from datetime import datetime
from pyspark.sql.types import DoubleType
def getTripDuration(datetime_start, datetime_end):
    try:
        result = (datetime.strptime(datetime_end, "%Y-%m-%d %H:%M:%S") - datetime.strptime(datetime_start, "%Y-%m-%d %H:%M:%S")).total_seconds()
        return result
    except Exception:
        return 0

tripDurationFunction = F.udf(getTripDuration, DoubleType())
# datetime_object = datetime.strptime("2019-12-18 15:52:30", "%Y-%m-%d %H:%M:%S")
# datetime_object2 = datetime.strptime("2019-12-18 15:54:39", "%Y-%m-%d %H:%M:%S")
# print(f"This is datetime: {(datetime_object2 - datetime_object).total_seconds()}")
data_modified = data_modified.withColumn("trip_duration", tripDurationFunction("lpep_pickup_datetime", "lpep_dropoff_datetime"))
data_modified.show(3)

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+-------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|weather|trip_duration|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+-------+-------------+
|       2| 2019-12-18 15:52:30|  2019-12-18 15:54:39|                 N|         1|         264| 

In [19]:
data_modified.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge',
 'weather',
 'trip_duration']

In [22]:
assembler = VectorAssembler(inputCols=['PULocationID','DOLocationID','passenger_count','trip_distance','fare_amount','extra','mta_tax','tip_amount','tolls_amount','improvement_surcharge', 'total_amount','payment_type','trip_type','congestion_surcharge','weather','trip_duration'], outputCol="features")


In [24]:
output = assembler.transform(data_modified)
output.head(1)

[Row(VendorID=2, lpep_pickup_datetime='2019-12-18 15:52:30', lpep_dropoff_datetime='2019-12-18 15:54:39', store_and_fwd_flag='N', RatecodeID=1, PULocationID=264, DOLocationID=264, passenger_count=5, trip_distance=0.0, fare_amount=3.5, extra=0.5, mta_tax=0.5, tip_amount=0.01, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=4.81, payment_type=1, trip_type=1, congestion_surcharge=0.0, weather=2.0, trip_duration=129.0, features=DenseVector([264.0, 264.0, 5.0, 0.0, 3.5, 0.5, 0.5, 0.01, 0.0, 0.3, 4.81, 1.0, 1.0, 0.0, 2.0, 129.0]))]

In [25]:
final_data = output.select("features", "total_amount")

In [27]:
final_data.show()

+--------------------+------------+
|            features|total_amount|
+--------------------+------------+
|[264.0,264.0,5.0,...|        4.81|
|[66.0,65.0,2.0,1....|       24.36|
|[181.0,228.0,1.0,...|       15.34|
|[129.0,263.0,2.0,...|       25.05|
|[210.0,150.0,1.0,...|        11.3|
|[35.0,39.0,1.0,3....|        14.8|
|[25.0,61.0,1.0,2....|        12.3|
|[225.0,89.0,1.0,4...|        21.8|
|[129.0,129.0,1.0,...|         6.8|
|[129.0,83.0,1.0,0...|         6.8|
|[82.0,173.0,1.0,1...|        10.8|
|[74.0,69.0,1.0,3....|        15.3|
|[74.0,41.0,1.0,1....|         7.8|
|[41.0,127.0,1.0,5...|        20.3|
|[7.0,260.0,1.0,1....|        10.8|
|[7.0,7.0,1.0,1.42...|         8.3|
|[7.0,133.0,1.0,15...|       53.16|
|[134.0,28.0,1.0,1...|         7.8|
|[89.0,39.0,1.0,2....|        11.3|
|[66.0,65.0,3.0,1....|         7.8|
+--------------------+------------+
only showing top 20 rows



In [28]:
train_data, test_data = final_data.randomSplit([0.7,0.3])

In [29]:
train_data.describe().show()

+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|                67|
|   mean|15.569701492537295|
| stddev| 9.920033750270859|
|    min|               4.8|
|    max|             53.16|
+-------+------------------+



In [30]:
test_data.describe().show()

+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|                31|
|   mean|17.497419354838712|
| stddev|15.890901373163604|
|    min|               4.8|
|    max|              78.1|
+-------+------------------+



In [31]:
lr = LinearRegression(labelCol="total_amount")
lr_model = lr.fit(train_data)

In [32]:
test_results = lr_model.evaluate(test_data)

In [33]:
test_results.residuals.show()



+--------------------+
|           residuals|
+--------------------+
|  1.1156543055488335|
|2.856515024518558E-7|
|1.020307571764078...|
|  -0.572135291787589|
|1.181408038064546...|
|1.204721709058276...|
|1.097926105586566...|
|-3.00240863282397...|
| 9.72314833092014E-8|
|1.293392513090907...|
|-2.66408530791295...|
|5.384456081003463...|
| -0.7437696236711648|
| -0.5721315556215458|
|4.242300040147029E-7|
|7.688969994035233E-8|
|2.537640142463715E-8|
|3.612127308372237E-8|
| -0.2860673466072967|
|-2.93007900609154...|
+--------------------+
only showing top 20 rows



In [34]:
test_results.rootMeanSquaredError

0.3319012336608198

In [35]:
test_results.r2

0.9995492236652422

In [36]:
final_data.describe().show()

+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|                98|
|   mean|16.179489795918347|
| stddev|12.077604508545692|
|    min|               4.8|
|    max|              78.1|
+-------+------------------+

