In [30]:
import pyspark
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from math import sin, cos, sqrt, atan2, radians

In [2]:
## Functions

In [3]:
def get_df_from_csv_paths(paths):
    df = spark.read.format("csv").option("header", "true").\
        option('inferschema','true').\
        load(paths.split(','))
    return df

In [4]:
def add_date_hour_minute_second(df):
    df = df.withColumn("date", to_date(col("start_time")))\
        .withColumn("hour",hour("start_time"))\
        .withColumn("minute",minute("start_time"))\
        .withColumn("second",second("start_time"))
    return df

In [5]:
def get_distance_from_lon_lat(lat1, lon1, lat2, lon2):
    # approximate radius of earth in km
    R = 6373.0
    lat1 = radians(lat1)
    lon1 = radians(lon1)
    lat2 = radians(lat2)
    lon2 = radians(lon2)
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2)**2 + cos(lat1) * cos(lat2) * sin(dlon / 2)**2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    return R * c
get_distance_from_lon_lat(52.2296756, 21.0122287, 52.406374, 16.9251681)

278.54558935106695

In [31]:
BIKE_SCHEMA_201308_201906 = StructType([
	StructField('duration', IntegerType(), True),
	StructField('start_time', TimestampType(), True),
	StructField('end_time', TimestampType(), True),
	StructField('start_stationID', IntegerType(), True),
	StructField('start_station_name', StringType(), True),
	StructField('start_latitude', DoubleType(), True),
	StructField('start_longitude', DoubleType(), True),
	StructField('end_stationID', IntegerType(), True),
	StructField('end_station_name', StringType(), True),
	StructField('end_latitude', DoubleType(), True),
	StructField('end_longitude', DoubleType(), True),
	StructField('bikeID', IntegerType(), True),
	StructField('user_type', StringType(), True),
	StructField('birth_year', StringType(), True),
	StructField('gender', IntegerType(), True)
])

In [33]:
def create_df_from_csv_paths(spark, paths, schema = None):
    """
    Create a dataframe from csv paths. 
    """
    if schema:
        df = spark.read.format("csv").option("header", "true").\
            schema(schema).\
            load(paths.split(','))
    else:
        df = spark.read.format("csv").option("header", "true").\
            option('inferschema','true').\
            load(paths.split(','))
    return df

In [36]:
df = create_df_from_csv_paths(spark, bike_paths, BIKE_SCHEMA_201308_201906)

In [37]:
df.printSchema()

root
 |-- duration: integer (nullable = true)
 |-- start_time: timestamp (nullable = true)
 |-- end_time: timestamp (nullable = true)
 |-- start_stationID: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_latitude: double (nullable = true)
 |-- start_longitude: double (nullable = true)
 |-- end_stationID: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_latitude: double (nullable = true)
 |-- end_longitude: double (nullable = true)
 |-- bikeID: integer (nullable = true)
 |-- user_type: string (nullable = true)
 |-- birth_year: string (nullable = true)
 |-- gender: integer (nullable = true)



In [45]:
# Generate path monthly between time1 and time2
def generate_paths(front, tail, time1, time2, output_format):
    months = pd.date_range(time1,time2, 
              freq='MS').strftime(output_format).tolist()
    paths = [front + month + tail for month in months]
    return paths

In [46]:
head = 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_'
tail = '.csv'
generate_paths(head,tail,'2013-08-01','2014-12-01','%Y-%m')

['s3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2013-08.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2013-09.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2013-10.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2013-11.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2013-12.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2014-01.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2014-02.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2014-03.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2014-04.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2014-05.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2014-06.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/yellow_taxi/yellow_tripdata_2014-07.csv',
 's3a://ny-taxi-trip-data/yellow_taxi/ye

In [6]:
spark = SparkSession\
    .builder\
    .appName("test_local")\
    .getOrCreate()

In [7]:
# read in
y_taxi_path = '/Users/apple/Desktop/Insight/Project/Datasets/yellow_tripdata_2018-01.csv'
yellow_taxi_trips = get_df_from_csv_paths(y_taxi_path)
yellow_taxi_trips = yellow_taxi_trips.withColumn('duration', (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long"))/60)

g_taxi_path = '/Users/apple/Desktop/Insight/Project/Datasets/green_tripdata_2018-01.csv'
green_taxi_trips = get_df_from_csv_paths(g_taxi_path)
green_taxi_trips = green_taxi_trips.withColumn('duration', (col("lpep_dropoff_datetime").cast("long") - col("lpep_pickup_datetime").cast("long"))/60)

bike_paths = "/Users/apple/Desktop/Insight/Project/Datasets/201801-citibike-tripdata.csv,/Users/apple/Desktop/Insight/Project/Datasets/201306-citibike-tripdata.csv"
bike_trips = get_df_from_csv_paths(bike_paths)

In [8]:
p = "/Users/apple/Desktop/Insight/Project/Datasets/yellow_tripdata_2018-0[12].csv"
t = get_df_from_csv_paths(p)

In [9]:
t.count()

8759874

In [10]:
yellow_taxi_trips.count()

8759874

In [11]:
yellow_taxi_trips.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- duration: double (nullable = true)



In [12]:
# paths = "/Users/apple/Desktop/Insight/Project/Datasets/*-citibike-tripdata.csv"
# test_trips = get_df_from_csv_paths(paths)

In [13]:
# select and generate columns
yellow = yellow_taxi_trips.select(['tpep_pickup_datetime','duration','PULocationID','DOLocationID','trip_distance','total_amount'])
yellow = yellow.withColumnRenamed("tpep_pickup_datetime","start_time")
yellow = add_date_hour_minute_second(yellow)

green = green_taxi_trips.select(['lpep_pickup_datetime','duration','PULocationID','DOLocationID','trip_distance','total_amount'])
green = green.withColumnRenamed("lpep_pickup_datetime","start_time")
green = add_date_hour_minute_second(green)

bike = bike_trips.select(['starttime','tripduration','start station latitude','start station longitude','end station latitude','end station longitude'])
bike = bike.withColumnRenamed("starttime","start_time")\
    .withColumnRenamed("tripduration","duration")\
    .withColumnRenamed("start station latitude","start_lat")\
    .withColumnRenamed("start station longitude","start_lon")\
    .withColumnRenamed("end station latitude","end_lat")\
    .withColumnRenamed("end station longitude","end_lon")
bike = add_date_hour_minute_second(bike)

In [14]:
taxi = yellow.union(green)

In [15]:
taxi.count()

9553403

In [29]:
bike_trips.printSchema()

root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- start station id: integer (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: double (nullable = true)
 |-- start station longitude: double (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: string (nullable = true)
 |-- end station longitude: string (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: string (nullable = true)
 |-- gender: integer (nullable = true)



In [16]:
bike.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- duration: integer (nullable = true)
 |-- start_lat: double (nullable = true)
 |-- start_lon: double (nullable = true)
 |-- end_lat: string (nullable = true)
 |-- end_lon: string (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- second: integer (nullable = true)



In [17]:
taxi.printSchema()

root
 |-- start_time: timestamp (nullable = true)
 |-- duration: double (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- second: integer (nullable = true)



In [18]:
taxi.show(1)

+-------------------+--------+------------+------------+-------------+------------+----------+----+------+------+
|         start_time|duration|PULocationID|DOLocationID|trip_distance|total_amount|      date|hour|minute|second|
+-------------------+--------+------------+------------+-------------+------------+----------+----+------+------+
|2018-01-01 00:21:05|     3.3|          41|          24|          0.5|         5.8|2018-01-01|   0|    21|     5|
+-------------------+--------+------------+------------+-------------+------------+----------+----+------+------+
only showing top 1 row



In [19]:
taxi.createOrReplaceTempView("taxi")

In [20]:
# Beacause the LocationID 264 & 265 represents the unknown areas, so remove them from taxi data
taxi_cleaned = spark.sql("SELECT *\
    FROM taxi\
    WHERE PULocationID <= 263")

In [21]:
taxi_cleaned.createOrReplaceTempView("taxi")

In [22]:
taxi_pop = spark.sql("SELECT date,hour,PULocationID,DOLocationID,COUNT(*) AS count, AVG(trip_distance) AS avg_dis \
    FROM taxi\
    GROUP BY date,hour,PULocationID,DOLocationID")
taxi_pop.show(1)

+----------+----+------------+------------+-----+------------------+
|      date|hour|PULocationID|DOLocationID|count|           avg_dis|
+----------+----+------------+------------+-----+------------------+
|2018-01-01|   0|          74|          75|   27|0.8940740740740741|
+----------+----+------------+------------+-----+------------------+
only showing top 1 row



In [23]:
bike.show(1)

+--------------------+--------+-----------+------------+-----------+------------+----------+----+------+------+
|          start_time|duration|  start_lat|   start_lon|    end_lat|     end_lon|      date|hour|minute|second|
+--------------------+--------+-----------+------------+-----------+------------+----------+----+------+------+
|2018-01-01 13:50:...|     970|40.76727216|-73.99392888|40.74901271|-73.98848395|2018-01-01|  13|    50|    57|
+--------------------+--------+-----------+------------+-----------+------------+----------+----+------+------+
only showing top 1 row



In [24]:
pwd

'/Users/apple/Desktop/Insight/Project/repo'

In [25]:
taxi_locID_path = '/Users/apple/Desktop/Insight/Project/repo/taxi_LocID_convert/taxi_locID_lon_lat.csv'
taxi_locIDs = get_df_from_csv_paths(taxi_locID_path)
taxi_locIDs.show(1)

+----------+------------------+------------------+-------+--------------+
|location_i|         longitude|          latitude|borough|          zone|
+----------+------------------+------------------+-------+--------------+
|         1|-74.17153349999995|40.689483499999895|    EWR|Newark Airport|
+----------+------------------+------------------+-------+--------------+
only showing top 1 row



In [26]:
taxi_locIDs.createOrReplaceTempView("loc_id")
taxi_pop.createOrReplaceTempView("taxi_pop")

In [27]:
taxi_lon_lat = spark.sql("SELECT date,hour,P.longitude AS start_lon,P.latitude AS start_lat,D.longitude AS end_lon,D.latitude AS end_lat,count,avg_dis\
    FROM taxi_pop,loc_id as P,loc_id as D\
    WHERE PULocationID = P.location_i\
    AND DOLocationID = D.location_i")
taxi_lon_lat.show(1)

+----------+----+------------------+-----------------+------------------+-----------------+-----+------------------+
|      date|hour|         start_lon|        start_lat|           end_lon|          end_lat|count|           avg_dis|
+----------+----+------------------+-----------------+------------------+-----------------+-----+------------------+
|2018-01-01|   0|-73.93741040191821|40.80561276899989|-73.94541517951379|40.79047907499991|   27|0.8940740740740741|
+----------+----+------------------+-----------------+------------------+-----------------+-----+------------------+
only showing top 1 row



In [28]:
taxi_lon_lat.printSchema()

root
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)
 |-- start_lon: double (nullable = true)
 |-- start_lat: double (nullable = true)
 |-- end_lon: double (nullable = true)
 |-- end_lat: double (nullable = true)
 |-- count: long (nullable = false)
 |-- avg_dis: double (nullable = true)

