# Cleaning The Taxi Trip DataSet
The objective of this notebook is to carry out a first filtering of the dataset of the trips in taxi of Chicago based on the analysis made in the notebook `exploratory_taxi_data.ipynb`

## 1 Create our environment

#### Create the Spark Session

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext

In [3]:
sc_conf = SparkConf()

In [4]:
sc_conf.set('spark.driver.port', '62678')
sc_conf.set('spark.rdd.compress', 'True')
sc_conf.set('spark.driver.host', '127.0.0.1')
sc_conf.set('spark.serializer.objectStreamReset', '100')
sc_conf.set('spark.master', 'local[*]')
sc_conf.set('spark.executor.id', 'driver')
sc_conf.set('spark.submit.deployMode', 'client')
sc_conf.set('spark.ui.showConsoleProgress', 'true')
sc_conf.set('spark.app.name', 'pyspark-shell')
sc_conf.set("spark.executor.memory","6g")
sc_conf.set("spark.driver.memory","6g")

<pyspark.conf.SparkConf at 0x10dad6ac8>

In [5]:
sc_conf.getAll()

dict_items([('spark.driver.port', '62678'), ('spark.rdd.compress', 'True'), ('spark.driver.host', '127.0.0.1'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.executor.id', 'driver'), ('spark.submit.deployMode', 'client'), ('spark.ui.showConsoleProgress', 'true'), ('spark.app.name', 'pyspark-shell'), ('spark.executor.memory', '6g'), ('spark.driver.memory', '6g')])

In [6]:
sc = SparkContext(conf=sc_conf)
sql = SQLContext(sc)
session = sql.sparkSession
session

In [7]:
session.sparkContext.getConf().getAll()

[('spark.executor.memory', '6g'),
 ('spark.driver.port', '62678'),
 ('spark.driver.host', '127.0.0.1'),
 ('spark.driver.memory', '6g'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1557511777630'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'pyspark-shell')]

In [8]:
import os
import shutil
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import types as T
import pandas as pd
pd.options.display.max_columns = None

## 2 Read the data

In [9]:
taxi_schema = StructType([
    StructField("Trip ID", StringType(), nullable = True),
    StructField("Taxi ID", StringType(), nullable = True),
    StructField("Trip Start Timestamp", StringType(), nullable = True),
    StructField("Trip End Timestamp", StringType(), nullable = True),
    StructField("Trip Seconds", IntegerType(), nullable = True),
    StructField("trip_miles", DoubleType(), nullable = True),
    StructField("Pickup Census Tract", LongType(), nullable = True),
    StructField("Dropoff Census Tract", LongType(), nullable = True),
    StructField("Pickup Community Area", IntegerType(), nullable = True),
    StructField("Dropoff Community Area", IntegerType(), nullable = True),
    StructField("Fare", DoubleType(), nullable = True),
    StructField("Tips", DoubleType(), nullable = True),
    StructField("Tolls", DoubleType(), nullable = True),
    StructField("Extras", DoubleType(), nullable = True),
    StructField("Trip Total", DoubleType(), nullable = True),
    StructField("Payment Type", StringType(), nullable = True),
    StructField("Company", StringType(), nullable = True),
    StructField("Pickup Centroid Latitude", DoubleType(), nullable = True),
    StructField("Pickup Centroid Longitude", DoubleType(), nullable = True),
    StructField("Pickup Centroid Location", StringType(), nullable = True),
    StructField("Dropoff Centroid Latitude", DoubleType(), nullable = True),
    StructField("Dropoff Centroid Longitude", DoubleType(), nullable = True),
    StructField("Dropoff Centroid  Location", StringType(), nullable = True),
    StructField("Community Areas", IntegerType(), nullable = True)])

In [10]:
taxi_df = session.read.csv('../Data/Taxi_Trips.csv.gz',
                              header=True,
                              schema = taxi_schema)

## 3 Change the columns names

In [11]:
# We change the columns names
for col in taxi_df.columns:
    new_col=col.lower().replace(" ","_")
    taxi_df = taxi_df.withColumnRenamed(col,new_col)
taxi_df.printSchema()

root
 |-- trip_id: string (nullable = true)
 |-- taxi_id: string (nullable = true)
 |-- trip_start_timestamp: string (nullable = true)
 |-- trip_end_timestamp: string (nullable = true)
 |-- trip_seconds: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- pickup_census_tract: long (nullable = true)
 |-- dropoff_census_tract: long (nullable = true)
 |-- pickup_community_area: integer (nullable = true)
 |-- dropoff_community_area: integer (nullable = true)
 |-- fare: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- extras: double (nullable = true)
 |-- trip_total: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- company: string (nullable = true)
 |-- pickup_centroid_latitude: double (nullable = true)
 |-- pickup_centroid_longitude: double (nullable = true)
 |-- pickup_centroid_location: string (nullable = true)
 |-- dropoff_centroid_latitude: double (nullable = true)
 |-- dropoff_centroid_

## 4 Convert the date from string format to datetime

In [12]:
# We convert the trip start timestamp and the trip end timestamp from string format to datetim
taxi_df = taxi_df.withColumn("trip_start_timestamp",
                             F.from_unixtime(F.unix_timestamp(F.col("trip_start_timestamp"),
                                                           format="MM/dd/yyyy hh:mm:ss aa")))
taxi_df = taxi_df.withColumn("trip_end_timestamp",
                             F.from_unixtime(F.unix_timestamp(F.col("trip_end_timestamp"),
                                                           format="MM/dd/yyyy hh:mm:ss aa")))

## 5 Filter and save the dataset in a csv

In [13]:
# We convert the value 'Unknown' in the column 'payment_type' to null
taxi_df = taxi_df.withColumn('payment_type',
                            F.when(F.col('payment_type')=='Unknown',None).otherwise(F.col('payment_type')))

In [14]:
# We drop the useless columns, trips that have some null value in the defined axes or those with strange values
taxi_df.drop('pickup_census_tract',
             'dropoff_census_tract',
             'pickup_centroid_location',
             'dropoff_centroid__location',
             'community_areas')\
       .dropna(how='any',
               subset=['trip_id',
                       'taxi_id',
                       'trip_start_timestamp',
                       'trip_end_timestamp',
                       'trip_seconds',
                       'trip_miles',
                       'pickup_community_area',
                       'dropoff_community_area',
                       'fare',
                       'tips',
                       'tolls',
                       'extras',
                       'trip_total',
                       'payment_type',
                       'company',
                       'pickup_centroid_latitude',
                       'pickup_centroid_longitude',
                       'dropoff_centroid_latitude',
                       'dropoff_centroid_longitude'])\
        .filter((F.col("trip_start_timestamp") <= (F.col("trip_end_timestamp"))) &
                (F.col("trip_seconds") > 60) &
                (F.col("trip_miles") > 0.5) &
                (F.col("fare") > 0) &
                (F.col("tips") >= 0) &
                (F.col("tolls") >= 0) &
                (F.col("extras") >= 0) &
                (F.col("trip_total") > 0)).repartition(1).write.save("../Data/taxi_chicago_filter",
                                                                      format="csv",header=True)

In [15]:
# We change the name of the csv
path = "../Data/taxi_chicago_filter/"
extension = ".csv"

result = os.listdir(path)

file = list(filter(lambda x: x.endswith(extension), result))

old_path = path + file[0]
new_path = path[:-1] + extension

os.rename(old_path, new_path)

# 6 Delete the folder taxi_chicago_filter

In [16]:
shutil.rmtree('../Data/taxi_chicago_filter')

## 7 Conclusion

Once a first filtering of the dataset of the taxi trips has been carried out, what we are going to do is to eliminate those trips that have a point of departure or arrival in the water. This will be done in the notebook `delete_location_water.ipynb`