In [3]:
!hdfs dfs -ls hdfs:///

Found 3 items
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///tmp
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:06 hdfs:///user
drwx-wx-wx   - hive hadoop          0 2022-01-04 02:04 hdfs:///var


In [4]:
!hdfs dfs -cp file:///yellow_tripdata_2020-01.csv /user

In [5]:
!hdfs dfs -ls hdfs:///user

Found 11 items
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/dataproc
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/hbase
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/hdfs
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/hive
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/mapred
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/pig
drwxr-xr-x   - root hadoop          0 2022-01-04 02:06 hdfs:///user/root
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/spark
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/yarn
-rw-r--r--   2 root hadoop  593610736 2022-01-04 02:07 hdfs:///user/yellow_tripdata_2020-01.csv
drwxrwxrwt   - hdfs hadoop          0 2022-01-04 02:04 hdfs:///user/zookeeper


## GOALS:

1. Provide basic data exploration results on the dataset <br>

2. predicting the reception of Tips for a taxi New York City rider given the features bellow : <br>
"trip_distance","fare_amount","payment_type","tip_amount","tolls_amount"...etc <br>
The amount of tips received is numerical. Therefore, we will categorize this feature in order to apply **logistic regression** classifier.<br>  the categories are following : 
*  The driver didn't receive any tips is :**" NNtips"**
*  The driver receive a tips is **"Tip"**
*  The driver receive generous Tip **"GTips"**
3. To run this file, first you need to import the dataset with the command "wget https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-01.csv"

In [6]:
spark.read.csv("hdfs:///user/yellow_tripdata_2020-01.csv",header='true',inferSchema=True).count()

                                                                                

6405008

In [7]:
df=spark.read.csv("hdfs:///user/yellow_tripdata_2020-01.csv",header=True, inferSchema=True).sample(0.0001)

                                                                                

In [8]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [9]:
df=df.drop("VendorID","tpep_pickup_datetime","tpep_dropoff_datetime","RatecodeID","congestion_surcharge","DOLocationID","PULocationID","store_and_fwd_flag","total_amount")

In [10]:
df.show(5)

+---------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+
|passenger_count|trip_distance|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|
+---------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+
|              2|          2.4|           1|       11.0|  3.0|    0.5|       1.0|         0.0|                  0.3|
|              2|         0.91|           1|        7.5|  0.5|    0.5|      1.13|         0.0|                  0.3|
|              1|          3.8|           2|       17.5|  3.0|    0.5|       0.0|         0.0|                  0.3|
|              1|          3.4|           1|       12.0|  0.5|    0.5|      2.37|         0.0|                  0.3|
|              1|         3.94|           1|       14.0|  0.5|    0.5|      3.56|         0.0|                  0.3|
+---------------+-------------+------------+-----------+-----+--

In [11]:
from pyspark.sql.functions import col, expr, when
tipsCol = when(
    col("tip_amount")>2.2,"GTips").when(col("tip_amount")==0, "NNTips").otherwise("Tips")


In [12]:
df=df.withColumn("tipsCol", tipsCol)

In [13]:
df.show(5)

+---------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+-------+
|passenger_count|trip_distance|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|tipsCol|
+---------------+-------------+------------+-----------+-----+-------+----------+------------+---------------------+-------+
|              2|          2.4|           1|       11.0|  3.0|    0.5|       1.0|         0.0|                  0.3|   Tips|
|              2|         0.91|           1|        7.5|  0.5|    0.5|      1.13|         0.0|                  0.3|   Tips|
|              1|          3.8|           2|       17.5|  3.0|    0.5|       0.0|         0.0|                  0.3| NNTips|
|              1|          3.4|           1|       12.0|  0.5|    0.5|      2.37|         0.0|                  0.3|  GTips|
|              1|         3.94|           1|       14.0|  0.5|    0.5|      3.56|         0.0|                  0.3|  GTips|


In [14]:
df.select('tipsCol').distinct().collect()

                                                                                

[Row(tipsCol='NNTips'), Row(tipsCol='GTips'), Row(tipsCol='Tips')]

In [15]:
from pyspark.ml.feature import StringIndexer
df= StringIndexer(inputCol="tipsCol",outputCol="label").fit(df).transform(df)

                                                                                

In [16]:
df.select('label').distinct().collect()

                                                                                

[Row(label=0.0), Row(label=1.0), Row(label=2.0)]

In [17]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=[
    'passenger_count','trip_distance','payment_type','fare_amount','extra','mta_tax','tolls_amount','improvement_surcharge'
], outputCol='features',handleInvalid="skip")

In [18]:
df_assembled = assembler.transform(df)

In [19]:
df_assembled.select('features', 'tip_amount').show(5)

+--------------------+----------+
|            features|tip_amount|
+--------------------+----------+
|[2.0,2.4,1.0,11.0...|       1.0|
|[2.0,0.91,1.0,7.5...|      1.13|
|[1.0,3.8,2.0,17.5...|       0.0|
|[1.0,3.4,1.0,12.0...|      2.37|
|[1.0,3.94,1.0,14....|      3.56|
+--------------------+----------+
only showing top 5 rows



In [20]:
df_assembled.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- tipsCol: string (nullable = false)
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)



In [21]:
# Split into training and testing sets in a 80:20 ratio
df_assembled_train, df_assembled_test = df_assembled.randomSplit([0.8,0.2], seed=17)

In [22]:
[df_assembled_train.count(), df_assembled_test.count()]

                                                                                

[495, 108]

In [23]:
training_ratio = df_assembled_train.count() / df_assembled.count()
print(training_ratio)



0.8208955223880597


                                                                                

In [24]:
# Import the Decision Tree Classifier class
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(df_assembled_train)

                                                                                

In [25]:
# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(df_assembled_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

[Stage 45:>                                                         (0 + 1) / 1]

+-----+----------+------------------------------------------------------------+
|label|prediction|probability                                                 |
+-----+----------+------------------------------------------------------------+
|2.0  |2.0       |[0.0,0.0,1.0]                                               |
|1.0  |1.0       |[0.05128205128205128,0.9145299145299145,0.03418803418803419]|
|2.0  |2.0       |[0.0,0.0,1.0]                                               |
|0.0  |1.0       |[0.05128205128205128,0.9145299145299145,0.03418803418803419]|
|1.0  |1.0       |[0.05128205128205128,0.9145299145299145,0.03418803418803419]|
+-----+----------+------------------------------------------------------------+
only showing top 5 rows



                                                                                

In [26]:
prediction.groupBy('label', 'prediction').count().show()



+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0|    1|
|  1.0|       1.0|   23|
|  0.0|       1.0|    6|
|  2.0|       2.0|   24|
|  1.0|       0.0|   14|
|  2.0|       1.0|    2|
|  0.0|       0.0|   38|
+-----+----------+-----+



                                                                                

### <ins> Metrics for our model:<ins>

In [27]:
y_true = prediction.select(['label']).collect()
y_pred = prediction.select(['prediction']).collect()

                                                                                

In [28]:
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

         0.0       0.72      0.86      0.78        44
         1.0       0.74      0.62      0.68        37
         2.0       1.00      0.89      0.94        27

    accuracy                           0.79       108
   macro avg       0.82      0.79      0.80       108
weighted avg       0.80      0.79      0.79       108



<ins> **Summary** </ins> <br>
From the classification report above, the precision of the labeled data (0,1,2) is high. <br>
This means that most of the labeled data from a class wasn't predicted as another type of class. <br>
For example, the labeled data 0 has 74 % precision. This means that not many labeled data "0" was predicted as "1" or "2" <br>
<br>
The recall shows also that most of our labels "0","1",and "2" are correctly predicted with our model decision tree. 
<br>
The average weighted accuracy is also high with 84%. Our model is more likely to classify our classes correctly 

<ins> Confusion Matrix <ins>

In [29]:
print(confusion_matrix(y_true, y_pred))

[[38  6  0]
 [14 23  0]
 [ 1  2 24]]


In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "weightedPrecision"})
print("weighted precision %.2f " % weighted_precision)



weighted precision 0.80 


                                                                                

<ins> **Conclusion** </ins> <br>
Given the results above, I have been able to build a classifier that will predict where the trip is more likely to result <br>
in either "No tip", "Tip"or "Generous tip". <br>
This classifier can help in fraud detection. For instance, a taxi driver has received tips. however, he desclaims <br>
receiving any tips in order to avoid tax consequences.<br>
Our model can detect the fraud with 80% accuracy 

*****************************************************************************************

<ins>In this section, I will build **LogisticRegression Model** using  **Pipeline** method <ins>

In [148]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [149]:
df_rg=spark.read.csv("hdfs:///user/yellow_tripdata_2020-01.csv",header=True, inferSchema=True).sample(0.0001)

                                                                                

In [150]:
df_rg=df_rg.drop("VendorID","tpep_pickup_datetime","tpep_dropoff_datetime","RatecodeID","congestion_surcharge","DOLocationID","PULocationID","store_and_fwd_flag","total_amount")

In [151]:
from pyspark.sql.functions import col, expr, when
tipsCol = when(
    col("tip_amount")>2.2,"GTips").when(col("tip_amount")==0, "NNTips").otherwise("Tips")
df_rg=df_rg.withColumn("tipsCol", tipsCol)

In [152]:
indexer_rg= StringIndexer(inputCol="tipsCol",outputCol="label_rg")
assembler_rg = VectorAssembler(inputCols=[
    'passenger_count','trip_distance','payment_type','fare_amount','extra','mta_tax','tolls_amount','improvement_surcharge'
], outputCol='features',handleInvalid="skip")
rg = LogisticRegression(labelCol="label_rg")

In [153]:
pipeline = Pipeline(stages=[indexer_rg, assembler_rg, rg])

In [154]:
df_rg.printSchema()

root
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- tipsCol: string (nullable = false)



In [155]:
# Split into training and testing sets in a 80:20 ratio
df_rg_train, df_rg_test = df_rg.randomSplit([0.8,0.2], seed=17)

In [156]:
pipeline = pipeline.fit(df_rg_train)

                                                                                

In [157]:
predictions = pipeline.transform(df_rg_test)

In [158]:
y_true1 = predictions.select(['label_rg']).collect()
y_pred1 = predictions.select(['prediction']).collect()

                                                                                

In [159]:
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true1, y_pred1))

              precision    recall  f1-score   support

         0.0       0.72      0.68      0.70        50
         1.0       0.61      0.71      0.66        35
         2.0       1.00      0.91      0.96        35

    accuracy                           0.76       120
   macro avg       0.78      0.77      0.77       120
weighted avg       0.77      0.76      0.76       120



In [160]:
print(confusion_matrix(y_true1, y_pred1))

[[34 16  0]
 [10 25  0]
 [ 3  0 32]]


From the classification report, the accuracy of our model is 77% less than the decision tree classifier <br>
