In [23]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [24]:
sp = SparkSession.builder.appName(' Home work ').getOrCreate()

In [25]:
train_data = f'./data/fhv_tripdata_2021-01.parquet'
test_data = f'./data/fhv_tripdata_2021-02.parquet'

In [26]:
def get_dataframe(filename):
#     df = sp.read.parquet(filename, inferSchema=True).cache
    df = sp.read.parquet(filename, inferSchema=True)
#     Getting the number of Record in dataframe
    print("Number of Row in this Dataframe is : ", record_counter(df))
    df = time_conversion(df)
    print("Average Duration is: ", getAvgDuration(df))
    df_betw_1_60 = df_filter(df)
    print("Number Record droped is : ",record_counter(df) - record_counter(df_betw_1_60) )
    df_betw_1_60.show()
    print("Number of Null/NA for each Row")
    Null_count = count_null(df, 'PUlocationID', 'DOLocationID')
    Null_count.show()
    print('Percentageof NA in PUlocationID is :')
    print((extrac_row_value(Null_count) * 100) / record_counter(df))
    df = replace_null(df_betw_1_60, 'PUlocationID', 'DOLocationID')
    print("Select Needed Column for ML")
    df = selectCol(df)
    df = castCol(df)
    df = ohe(df)
    df = pruneCol(df)
    return df

In [27]:
def time_conversion(df):
    df = df.withColumn('duration', (unix_timestamp(df.dropOff_datetime) - unix_timestamp(df.pickup_datetime)) / 60)
    return  df

In [28]:
def castCol(df):
    #  Column Casting Process:
    feature_columns = df.withColumn("PUlocationID", col("PUlocationID").cast(StringType())) \
                .withColumn("DOLocationID", col("DOLocationID").cast(StringType()))
    
    return feature_columns

In [29]:
def selectCol(df):
    # Feature Extraction
    categorical = ["PULocationID","DOLocationID"]
    numerical = ["duration"]
    # In future project, tips-amount should be use as label
    # Read Article : https://stackoverflow.com/questions/47871874/does-spark-do-one-pass-through-the-data-for-multiple-withcolumn
    feature_columns = df.select(categorical + numerical)
    
    return feature_columns


In [30]:
# Data preparation and feature engineering

In [31]:
#  Implementing One Hot encoding on "PULocationID" and "DOLocationID" column
def ohe(feature_columns):
    from pyspark.ml.feature import StringIndexer
    feature_columns = StringIndexer(
    inputCol='PUlocationID', 
    outputCol='Pick_UP', 
    handleInvalid='keep').fit(feature_columns).transform(feature_columns)
    
    feature_columns = StringIndexer(
    inputCol='DOLocationID', 
    outputCol='Drop_OFF', 
    handleInvalid='keep').fit(feature_columns).transform(feature_columns)

    return feature_columns

In [32]:
# Drop unnecessary columns
def pruneCol(df):
    feature_columns = df.drop('PUlocationID', 'DOLocationID')
    return feature_columns

In [33]:
def record_counter(df):
    return df.count()

In [34]:
def getAvgDuration(df):
    return df.agg(avg(col('duration'))).show()

In [35]:
def df_filter(df):
#     df = df.select(df.where(df.duration.between(1,60)))
#     return df.show()
#     df.filter(df.where(df.duration.between(1,60)))
    return df.filter(df.duration.between(1,61))

In [36]:
# https://sparkbyexamples.com/pyspark/pyspark-find-count-of-null-none-nan-values/#:~:text=In%20PySpark%20DataFrame%20you%20can,count()%20and%20when().
def count_null(df, col1, col2):
    df_Columns=[col1,col1]

    return df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_Columns])

In [37]:
def extrac_row_value(df):
    return df.head()[0]

In [38]:
def replace_null(df, col1, col2):
    return df.na.fill(value=-1,subset=[col1, col2])


In [39]:
#  Test
train_data = get_dataframe(train_data)
train_data.show()

Number of Row in this Dataframe is :  1154112
+----------------+
|   avg(duration)|
+----------------+
|19.1672240937939|
+----------------+

Average Duration is:  None
Number Record droped is :  43014
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|          duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+------------------+
|              B00009|2021-01-01 01:27:00|2021-01-01 01:44:00|        null|        null|   null|                B00009|              17.0|
|              B00009|2021-01-01 01:50:00|2021-01-01 02:07:00|        null|        null|   null|                B00009|              17.0|
|              B00037|2021-01-01 01:13:09|2021-01-01 01:21:26|        null|        72.0|   null|       

                                                                                

+------------------+-------+--------+
|          duration|Pick_UP|Drop_OFF|
+------------------+-------+--------+
|              17.0|    0.0|     0.0|
|              17.0|    0.0|     0.0|
| 8.283333333333333|    0.0|    30.0|
|15.216666666666667|    0.0|     5.0|
|              9.05|    0.0|    67.0|
|11.866666666666667|    0.0|    76.0|
| 8.883333333333333|    0.0|    42.0|
|17.966666666666665|    0.0|    24.0|
|23.366666666666667|    0.0|    42.0|
| 2.033333333333333|    0.0|    30.0|
| 4.316666666666666|    0.0|    30.0|
|14.183333333333334|    0.0|    55.0|
|              3.05|    0.0|    46.0|
|              10.7|    0.0|    40.0|
|11.166666666666666|    0.0|    34.0|
|              61.0|    0.0|     0.0|
|21.533333333333335|    0.0|   109.0|
|10.233333333333333|    0.0|    64.0|
| 33.13333333333333|    0.0|     5.0|
|             23.55|    0.0|    45.0|
+------------------+-------+--------+
only showing top 20 rows



In [40]:
# Assemble all the features with VectorAssembler
def feature_assembler(df):
    from pyspark.ml.feature import VectorAssembler
    x_features = ['Pick_UP',
                    'Drop_OFF'                   ]
    feature_assembler = VectorAssembler(
    inputCols=x_features, 
    outputCol='features')
    transformed_data = feature_assembler.transform(df)
    
    return transformed_data

In [41]:
transformed_train_data = feature_assembler(train_data)
transformed_test_data = feature_assembler(test_data)

In [42]:
#  Apply Model Function
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = 'features', labelCol='duration', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(transformed_train_data)

print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

22/05/22 21:53:41 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/22 21:53:41 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS

Coefficients: [0.03965500955265982,0.006032146879372192]
Intercept: 15.638580165296206


                                                                                

In [43]:
# Evaluation

trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 11.532699
r2: 0.018638


In [None]:
# Model validation
# Test Prediction
predictions = lr_model.transform(transformed_test_data)
predictions.select("prediction","duration","features").show()