### Data Fields Description

- **id**: A unique identifier for each trip  
- **vendor_id**: A code indicating the provider associated with the trip record  
- **pickup_datetime**: Date and time when the meter was engaged  
- **dropoff_datetime**: Date and time when the meter was disengaged  
- **passenger_count**: The number of passengers in the vehicle (driver-entered value)  
- **pickup_longitude**: The longitude where the meter was engaged  
- **pickup_latitude**: The latitude where the meter was engaged  
- **dropoff_longitude**: The longitude where the meter was disengaged  
- **dropoff_latitude**: The latitude where the meter was disengaged  
- **store_and_fwd_flag**: Indicates if the trip record was held in vehicle memory before being sent to the vendor (Y = stored and forwarded, N = not)  
- **trip_duration**: Duration of the trip in seconds  
- **distance**: Distance of the trip (if included in your dataset)
 in seconds

In [1]:
import pyspark

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t

In [8]:
spark = SparkSession\
.builder\
.appName('spark_labs')\
.getOrCreate()

In [15]:
df=spark.read.format('csv').option("header",True).load('train.csv')

In [19]:
df.show(1)

+---------+---------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|store_and_fwd_flag|trip_duration|     distance|
+---------+---------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1|  -73.9821548462|  40.7679367065|    -73.964630127|   40.7656021118|                 N|          455|1.49899119898|
+---------+---------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+------------------+-------------+-------------+
only showing top 1 row



In [17]:
df.columns

['id',
 'vendor_id',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude',
 'store_and_fwd_flag',
 'trip_duration',
 'distance']

In [18]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: string (nullable = true)
 |-- distance: string (nullable = true)



In [99]:
schema=t.StructType([
t.StructField('id',t.StringType(),True),
 t.StructField('vendor_id',t.IntegerType(),True),
 t.StructField('pickup_datetime',t.TimestampType(),True),
 t.StructField('dropoff_datetime',t.TimestampType(),True),
 t.StructField('passenger_count',t.IntegerType(),True),
 t.StructField('pickup_longitude',t.StringType(),True),
 t.StructField('pickup_latitude',t.StringType(),True),
 t.StructField('dropoff_longitude',t.StringType(),True),
 t.StructField('dropoff_latitude',t.StringType(),True),
 t.StructField('store_and_fwd_flag',t.StringType(),True),
 t.StructField('trip_duration',t.DoubleType(),True),
 t.StructField('distance',t.DoubleType(),True)]



)

In [100]:
df = spark.read \
    .option("header", True) \
    .schema(schema) \
    .csv("train.csv")


In [101]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- distance: double (nullable = true)



In [102]:
df.count()

1458644

In [103]:
df.select(f.col('passenger_count')).distinct().show()

+---------------+
|passenger_count|
+---------------+
|              1|
|              6|
|              3|
|              5|
|              4|
|              2|
|              0|
|              9|
|              7|
|              8|
+---------------+



In [104]:
df.filter(f.col('id').isNull()).count()

0

In [105]:
df.where(f.col('vendor_id').isNull()).count()

0

In [106]:
df.describe('trip_duration').show()
df.describe('distance').show()

+-------+-----------------+
|summary|    trip_duration|
+-------+-----------------+
|  count|          1458644|
|   mean|959.4922729603659|
| stddev|5237.431724497664|
|    min|              1.0|
|    max|        3526282.0|
+-------+-----------------+

+-------+-----------------+
|summary|         distance|
+-------+-----------------+
|  count|          1458644|
|   mean|3.441944066475521|
| stddev|4.297886810313032|
|    min|              0.0|
|    max|    1241.29822576|
+-------+-----------------+



In [107]:
df.where(f.col('distance')==0.0).groupBy('distance').count().withColumnRenamed('count','trips_numbers').show()
df.where(f.col('distance')==1241.29822576).groupBy('distance').count().withColumnRenamed('count','trips_numbers').show()

+--------+-------------+
|distance|trips_numbers|
+--------+-------------+
|     0.0|         5897|
+--------+-------------+

+-------------+-------------+
|     distance|trips_numbers|
+-------------+-------------+
|1241.29822576|            1|
+-------------+-------------+



In [108]:
df.where(f.col('distance')==0.0).select(['trip_duration','pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude']).show()

+-------------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+
|trip_duration|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+-------------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+
|        227.0|2016-02-29 18:39:12|2016-02-29 18:42:59|              1|  -73.9818191528|  40.7689628601|   -73.9818191528|   40.7689628601|
|       1109.0|2016-05-10 18:07:52|2016-05-10 18:26:21|              2|  -73.9590682983|  40.7756614685|   -73.9590682983|   40.7756614685|
|        947.0|2016-05-16 23:15:13|2016-05-16 23:31:00|              6|  -73.8084869385|   40.687335968|   -73.8084869385|    40.687335968|
|        580.0|2016-01-25 19:45:12|2016-01-25 19:54:52|              1|  -73.7867202759|  40.6470413208|   -73.7867202759|   40.6470413208|
|         27.0|2016-

In [109]:
df.where(f.col('trip_duration')==1.0).groupBy('trip_duration').count().withColumnRenamed('count','trips_numbers').show()
df.where(f.col('trip_duration')==3526282.0).groupBy('trip_duration').count().withColumnRenamed('count','trips_numbers').show()

+-------------+-------------+
|trip_duration|trips_numbers|
+-------------+-------------+
|          1.0|           33|
+-------------+-------------+

+-------------+-------------+
|trip_duration|trips_numbers|
+-------------+-------------+
|    3526282.0|            1|
+-------------+-------------+



In [110]:
df.where(f.col('trip_duration')==1.0).select(['distance','pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude']).show()

+----------------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+
|        distance|    pickup_datetime|   dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+----------------+-------------------+-------------------+---------------+----------------+---------------+-----------------+----------------+
|0.00896551914398|2016-04-17 11:44:49|2016-04-17 11:44:50|              1|  -73.9403839111|  40.7864227295|   -73.9402999878|   40.7863731384|
|6.42970707264E-4|2016-05-17 09:03:38|2016-05-17 09:03:39|              1|  -73.8198928833|  40.7408218384|   -73.8198852539|   40.7408218384|
|   0.13306521099|2016-04-11 19:01:35|2016-04-11 19:01:36|              1|  -73.9459991455|  40.7922019958|   -73.9474105835|   40.7927398682|
|4.24308140677E-4|2016-04-24 11:35:40|2016-04-24 11:35:41|              1|  -73.8414230347|  40.6952781677|   -73.8414230347|    40.695274353|

In [180]:
# Calculate the number of trips by hour for each vendor?
df=df.withColumn("hours",f.hour(f.col('pickup_datetime')))
df.groupBy("hours") \
  .pivot("vendor_id") \
  .agg(f.count("*").alias("trips")) \
  .orderBy("hours") \
  .show()

+-----+-----+-----+
|hours|    1|    2|
+-----+-----+-----+
|    0|24561|28687|
|    1|18090|20481|
|    2|13570|14402|
|    3|10399|10496|
|    4| 7827| 7965|
|    5| 7581| 7421|
|    6|16218|17030|
|    7|26068|29532|
|    8|31103|35950|
|    9|31145|36518|
|   10|29793|35644|
|   11|31262|37214|
|   12|33247|38626|
|   13|33717|37756|
|   14|35194|39098|
|   15|34192|37619|
|   16|30091|34222|
|   17|35092|41391|
|   18|41242|49358|
|   19|41474|48834|
+-----+-----+-----+
only showing top 20 rows



In [141]:
#pickup mostly on weekends or working days?
df = df.withColumn("day_of_week", f.date_format(f.col("pickup_datetime"), "E"))
df = df.withColumn(
    "day_type",
    f.when(f.col("day_of_week").isin("Sat", "Sun"), "Weekend").otherwise("Weekday")
)

df.groupBy("day_type").count().withColumnRenamed("count","Trips Number").show()

+--------+------------+
|day_type|Trips Number|
+--------+------------+
| Weekday|     1042410|
| Weekend|      416234|
+--------+------------+



In [147]:
#pickup mostly on nights or mornings
df = df.withColumn("period", f.date_format(f.col("pickup_datetime"), "a"))
df = df.withColumn(
    "shifts",
    f.when(f.col("period").isin("AM"), "nights").otherwise("mornings")
)
df.groupBy("period").count().withColumnRenamed("count","Trips Number").show()

+------+------------+
|period|Trips Number|
+------+------------+
|    PM|      929687|
|    AM|      528957|
+------+------------+



In [148]:
#numbers of trips per number of passenger?
df.groupBy('passenger_count').count().withColumnRenamed("count","Trips Number").show()

+---------------+------------+
|passenger_count|Trips Number|
+---------------+------------+
|              1|     1033540|
|              6|       48333|
|              3|       59896|
|              5|       78088|
|              4|       28404|
|              2|      210318|
|              0|          60|
|              9|           1|
|              7|           3|
|              8|           1|
+---------------+------------+



In [152]:
# numbers of trips per group of passenger or numbers of trips per one passenger ?
df=df.withColumn("passenger_type",f.when(f.col('passenger_count').isin('1'),'indevdual').otherwise('group'))
df.groupBy('passenger_type').count().withColumnRenamed("count","Trips Number").show()

+--------------+------------+
|passenger_type|Trips Number|
+--------------+------------+
|         group|      425104|
|     indevdual|     1033540|
+--------------+------------+



In [192]:
#Average Trip Distance by Hour of the Day (by Vendor)?
df.createOrReplaceTempView('df_taxis')
spark.sql("""
SELECT
  hours,
  AVG(CASE WHEN vendor_id = '1' THEN distance END) AS vendor_1_avg,
  AVG(CASE WHEN vendor_id = '2' THEN distance END) AS vendor_2_avg
FROM df_taxis
GROUP BY hours
ORDER BY hours


""").show()

+-----+------------------+------------------+
|hours|      vendor_1_avg|      vendor_2_avg|
+-----+------------------+------------------+
|    0| 3.950332664159948| 4.042105190429521|
|    1|3.9529790654384396|3.9405278700869504|
|    2|3.8263518226143214| 3.906989928821063|
|    3| 4.120809896019147| 4.093569472995954|
|    4| 4.743824883419862|5.0031034014771425|
|    5|  5.15582874527346| 5.623020177757666|
|    6|3.9885436228283977| 4.234857261813437|
|    7| 3.312511199825404|3.4546098763022863|
|    8| 2.974064619245385| 3.055264414962803|
|    9|2.9207535381776637|2.9359324613518836|
|   10|2.9671389720233834| 3.026167927346365|
|   11| 3.052363968198795|3.1018983661323274|
|   12|3.0247739548857164|3.1006005533016476|
|   13|3.2236894404348435| 3.287538376634198|
|   14|3.3363056974225853|3.4640759017200584|
|   15| 3.348922497662815|3.4705730064710987|
|   16|3.4096260824623044|  3.59216904979076|
|   17|3.2134226203215768|3.3976227716734764|
|   18|3.1119905190629167|3.120671

In [193]:
#Average Trip Duration by Hour of the Day (by Vendor)

df.createOrReplaceTempView('df_taxis')
spark.sql("""
SELECT
  hours,
  AVG(CASE WHEN vendor_id = '1' THEN trip_duration END) AS vendor_1_avg,
  AVG(CASE WHEN vendor_id = '2' THEN trip_duration END) AS vendor_2_avg
FROM df_taxis
GROUP BY hours
ORDER BY hours


""").show()

+-----+-----------------+------------------+
|hours|     vendor_1_avg|      vendor_2_avg|
+-----+-----------------+------------------+
|    0|853.0281747485851|1008.2582005786593|
|    1|737.8971807628524|1048.9923831844148|
|    2|692.4378039793662|1076.2985002083044|
|    3|707.1176074622559|1071.3333650914635|
|    4|715.6201609812189| 1124.757438794727|
|    5|689.3605065294817| 958.1036248484032|
|    6|790.0834258231595| 804.4359365825014|
|    7|742.0170707380697|  910.973283218204|
|    8|824.5490467157509|1011.0853685674548|
|    9|841.0579868357681|1012.3943260857659|
|   10|844.9043735105562|1007.3095051060487|
|   11|873.9017017465293|1044.1598591927768|
|   12|876.0890606671279|1094.6220162584787|
|   13| 893.661061185752|1156.0063036338595|
|   14|936.8904074558163| 1200.818839838355|
|   15|958.3235552175947|1264.7195300247215|
|   16|951.0301086703665|1193.6766115364385|
|   17|919.6063490254189|1124.6683578555724|
|   18|856.0129237185394|1086.5058551805178|
|   19|790