In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Udemy").getOrCreate()


# Importing Data & Peeking 

In [21]:
df = spark.read.csv(r'.\nyc-taxi-trip-duration\train.csv',
                    inferSchema=True, header=True)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/d:/Subhrajyoti/Self Study/pyspark/nyc-taxi-trip-duration/train.csv.

In [4]:
df.show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|    

In [38]:
df.describe().show()

+-------+---------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+------------------+------------------+
|summary|       id|          vendor_id|   passenger_count|   pickup_longitude|     pickup_latitude|  dropoff_longitude|   dropoff_latitude|store_and_fwd_flag|     trip_duration|
+-------+---------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+------------------+------------------+
|  count|  1458644|            1458644|           1458644|            1458644|             1458644|            1458644|            1458644|           1458644|           1458644|
|   mean|     NULL| 1.5349502688798637|1.6645295219395548| -73.97348630489282|  40.750920908391734|  -73.9734159469458|   40.7517995149002|              NULL| 959.4922729603659|
| stddev|     NULL|0.49877715390740074| 1.314242167823115|0.07090185842270368|0.032881186257633095|0.070643268

# Data preprocessing

### Changing Schema

In [78]:
from pyspark.sql.types import (StructField,StringType,
                               IntegerType,StructType,
                               TimestampType,DoubleType)

In [69]:
dtypes  =   [StringType(),StringType(),TimestampType(),TimestampType(),
            IntegerType(),DoubleType(),DoubleType(),
            DoubleType(),DoubleType(),StringType(),IntegerType()]


In [80]:
data_schema = [StructField(i,j,True) for i,j in zip(df.columns,dtypes)]
data_schema

[StructField('id', StringType(), True),
 StructField('vendor_id', StringType(), True),
 StructField('pickup_datetime', TimestampType(), True),
 StructField('dropoff_datetime', TimestampType(), True),
 StructField('passenger_count', IntegerType(), True),
 StructField('pickup_longitude', DoubleType(), True),
 StructField('pickup_latitude', DoubleType(), True),
 StructField('dropoff_longitude', DoubleType(), True),
 StructField('dropoff_latitude', DoubleType(), True),
 StructField('store_and_fwd_flag', StringType(), True),
 StructField('trip_duration', IntegerType(), True)]

In [79]:
final_struct = StructType(fields =data_schema)

StructType([StructField('id', StringType(), True), StructField('vendor_id', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropoff_datetime', TimestampType(), True), StructField('passenger_count', IntegerType(), True), StructField('pickup_longitude', DoubleType(), True), StructField('pickup_latitude', DoubleType(), True), StructField('dropoff_longitude', DoubleType(), True), StructField('dropoff_latitude', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('trip_duration', IntegerType(), True)])

In [99]:
df = spark.read.csv(r'.\nyc-taxi-trip-duration\train.csv',
                                           schema = final_struct,header=True)
df.show(3)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|    

In [100]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)



### Selecting portion / grouping of data

#### By select

In [101]:
df.select(['vendor_id','pickup_datetime']).show(5)

+---------+-------------------+
|vendor_id|    pickup_datetime|
+---------+-------------------+
|        2|2016-03-14 17:24:55|
|        1|2016-06-12 00:43:35|
|        2|2016-01-19 11:35:24|
|        2|2016-04-06 19:32:31|
|        2|2016-03-26 13:30:55|
+---------+-------------------+
only showing top 5 rows



#### By Filter

In [102]:
df.filter("passenger_count > 7").select(['vendor_id','pickup_datetime',
                                         'passenger_count']).show()

+---------+-------------------+---------------+
|vendor_id|    pickup_datetime|passenger_count|
+---------+-------------------+---------------+
|        2|2016-06-24 08:09:21|              9|
|        2|2016-01-01 01:15:20|              8|
+---------+-------------------+---------------+



In [103]:
df.filter(df['passenger_count'] > 7).select(['vendor_id','pickup_datetime',
                                         'passenger_count']).show()

+---------+-------------------+---------------+
|vendor_id|    pickup_datetime|passenger_count|
+---------+-------------------+---------------+
|        2|2016-06-24 08:09:21|              9|
|        2|2016-01-01 01:15:20|              8|
+---------+-------------------+---------------+



- & (and), |(or) ans ~(not) can be used to combine filter logics

#### By createOrReplaceTempView using SQL

In [104]:
df.createOrReplaceTempView("Trips")

In [106]:
df_portion = spark.sql(" Select * from Trips where passenger_count > 4")
df_portion.show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+-------------+
|id0801584|        2|2016-01-30 22:01:40|2016-01-30 22:09:03|              6|-73.98285675048828|40.74219512939453|-73.99208068847656|40.749183654785156|                 N|          443|
|id0675800|        2|2016-02-15 09:25:15|2016-02-15 09:35:49|              6|-73.97775268554688|40.75463104248047|-74.00167846679688| 40.75642013549805|                 N|          634|
|id0913838|        2|2016-02-03 16:22:50|2016-02-03 16:37:20|         

### column manipulation 

In [50]:
#df.withColumn("NewColumn",df['col'] operations).show()

#df.withColumn("NewColumn", (df['dropoff_datetime']-df['pickup_datetime'])).show(5)

#### GroupBy & Aggregate method

In [112]:
from pyspark.sql.functions import stddev,max,min

In [52]:
df.groupBy("vendor_id").mean().show()

+---------+--------------------+---------------------+--------------------+----------------------+---------------------+
|vendor_id|avg(passenger_count)|avg(pickup_longitude)|avg(pickup_latitude)|avg(dropoff_longitude)|avg(dropoff_latitude)|
+---------+--------------------+---------------------+--------------------+----------------------+---------------------+
|        1|               -74.0|    40.75086441572701|  -73.97345260053869|     40.75172198288115|                 NULL|
|        2|               -74.0|    40.75098219127021|  -73.97346282851106|    40.751897855869316|                 NULL|
+---------+--------------------+---------------------+--------------------+----------------------+---------------------+



In [53]:
df.groupBy("vendor_id").agg({'passenger_count':'mean',
                             'pickup_longitude':'max',
                             'store_and_fwd_flag':'count'}).show()

+---------+-------------------------+---------------------+--------------------+
|vendor_id|count(store_and_fwd_flag)|max(pickup_longitude)|avg(passenger_count)|
+---------+-------------------------+---------------------+--------------------+
|        1|                        0|   42.814937591552734|               -74.0|
|        2|                        0|      41.670166015625|               -74.0|
+---------+-------------------------+---------------------+--------------------+



In [54]:
df.orderBy('pickup_datetime').show(3)

+---------+---------+-------------------+----------------+---------------+------------------+------------------+------------------+----------------+------------------+
|       id|vendor_id|    pickup_datetime|dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+----------------+---------------+------------------+------------------+------------------+----------------+------------------+
|id0621643|        2|2016-01-01 00:00:22|            NULL|           NULL|40.716880798339844|-73.96932983398438|40.769378662109375|            NULL|              NULL|
|id1384355|        1|2016-01-01 00:00:28|            NULL|           NULL| 40.73356246948242|-73.85426330566406|40.891788482666016|            NULL|              NULL|
|id2568735|        1|2016-01-01 00:01:24|            NULL|           NULL|40.759864807128906|-73.87660217285156| 40.74866485595703|            NULL|            

In [55]:
df.orderBy(df['pickup_datetime'].desc()).show(3)

+---------+---------+-------------------+----------------+---------------+-----------------+------------------+------------------+----------------+------------------+
|       id|vendor_id|    pickup_datetime|dropoff_datetime|passenger_count| pickup_longitude|   pickup_latitude| dropoff_longitude|dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+----------------+---------------+-----------------+------------------+------------------+----------------+------------------+
|id3004672|        1|2016-06-30 23:59:58|            NULL|           NULL|40.73202896118164|-73.99017333984375| 40.75667953491211|            NULL|              NULL|
|id3505355|        1|2016-06-30 23:59:53|            NULL|           NULL|40.67999267578125|-73.95980834960938| 40.65540313720703|            NULL|              NULL|
|id1217141|        1|2016-06-30 23:59:47|            NULL|           NULL|40.73758316040039|-73.98616027832031|40.729522705078125|            NULL|              NULL

#### Dealing NA values

In [56]:
#df.na.drop()
#df.na.fill("Value",subset=["cilumns"])

#### Datetime manipulation

In [57]:
from pyspark.sql.functions import (dayofmonth,hour,dayofyear,
                                   month,year,weekofyear,dayofweek,
                                   date_format,format_number)

In [58]:
df.select(dayofweek(df['pickup_datetime'])).show(3)

+--------------------------+
|dayofweek(pickup_datetime)|
+--------------------------+
|                         5|
|                         5|
|                         5|
+--------------------------+
only showing top 3 rows



In [117]:
new_df = df.withColumn("Dayofweek",dayofweek(df['pickup_datetime']))

In [120]:
new_df.sample(fraction=0.00001).show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+---------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|Dayofweek|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+---------+
|id2478415|        2|2016-04-12 18:18:10|2016-04-12 18:21:27|              5|-73.96436309814453|40.767677307128906|-73.96813201904297| 40.76789093017578|                 N|          197|        3|
|id0929764|        1|2016-05-26 08:05:21|2016-05-26 08:10:51|              1|-73.98748016357422|40.738887786865234|-73.97865295410156| 40.75092697143555|                 N|          330|        5|
|id1697121|    

In [127]:
new_df.groupBy('Dayofweek').mean()\
    .select(['Dayofweek',
            format_number('avg(passenger_count)',3).alias("Avg passenger count")
             ]).show()


+---------+-------------------+
|Dayofweek|Avg passenger count|
+---------+-------------------+
|        1|              1.717|
|        6|              1.662|
|        3|              1.636|
|        5|              1.638|
|        4|              1.633|
|        7|              1.729|
|        2|              1.634|
+---------+-------------------+



# MLlib

In [1]:
df

NameError: name 'df' is not defined