# Part 3 - Simple Analysis

In this step, we perform the first simple analysis of the taxi trip data in order to get a better understanding.

In [None]:
dwh_basedir = "file:///srv/jupyter/nyc-dwh"
structured_basedir = dwh_basedir + "/structured"
refined_basedir = dwh_basedir + "/refined"

# 0. Setup Environment

## 0.1 Spark Session

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

if not 'spark' in locals():
    spark = SparkSession.builder \
        .master("local[*]") \
        .config("spark.driver.memory","64G") \
        .getOrCreate()

spark

## 0.2 Matplotlib

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()

## 0.3 Geopandas and friends

In [None]:
import pandas as pd
import geopandas as gpd
import contextily as ctx
from shapely.geometry import Point

# Helper function to fetch background map tiles
def add_basemap(ax, zoom, url='http://tile.stamen.com/terrain/tileZ/tileX/tileY.png'):
    xmin, xmax, ymin, ymax = ax.axis()
    basemap, extent = ctx.bounds2img(xmin, ymin, xmax, ymax, zoom=zoom, url=url)
    ax.imshow(basemap, extent=extent, interpolation='bilinear')
    # restore original x/y limits
    ax.axis((xmin, xmax, ymin, ymax))

# 1. Read Taxi Data

Now we can read in the taxi data from the structured zone.

In [None]:
taxi_trips = spark.read.parquet(refined_basedir + "/taxi-trip")
taxi_trips.limit(10).toPandas()

Just to be sure, let us inspect the schema. It should match exactly the specified one.

In [None]:
taxi_trips.printSchema()

## 1.3 Create Sample

For some actions, we only need a subset of the whole data (some visualizations won't event work with the full data), therefore we create a subsample that is small enough to be handeled efficiently by Python. We'd like to have around 100,000 records in the sample.

Spark offers the required functionality to create a *random* sample of the data, but we need to specify a fraction instead of an absolute number. Therefore we first count the number of records and then make up an appropriate fraction.

In [None]:
taxi_trips.count()

In order to get around 100,000 records, we use a fraction of 0.001. This will give us 170,000 records, which is good enough for us.

In [None]:
taxi_trips_sample = # YOUR CODE HERE

taxi_trips_sample.count()

# 2. Simple Geo Visualisation

In order to get an understanding of the data, let us first make a geo visualization using some Python functionality. We'd eventually want to draw the pickup locations on top of a map, so we understand the whole area that is served by the taxi cabs. We might want to use this information later when it comes to the ML part.

## 2.1 Estimate Extent

As a first step, let us estimate a realistic extent of the pickup location. There are some broken records in the data set, which would render the visualisation meaningless, therefore we try to estimate extends such that 95% of all points lie within that border.

In [None]:
quantile = taxi_trips_sample \
    .filter((taxi_trips_sample["pickup_longitude"] > -75) & (taxi_trips_sample["pickup_longitude"] < -65)) \
    .filter((taxi_trips_sample["pickup_latitude"] > 35) & (taxi_trips_sample["pickup_latitude"] < 45)) \
    .stat.approxQuantile(["pickup_longitude", "pickup_latitude"], [0.025,0.975], 0.01)

min_pickup_longitude = quantile[0][0]
max_pickup_longitude = quantile[0][1]
min_pickup_latitude = quantile[1][0]
max_pickup_latitude = quantile[1][1]

print("min_pickup_longitude=" + str(min_pickup_longitude))
print("max_pickup_longitude=" + str(max_pickup_longitude))
print("min_pickup_latitude=" + str(min_pickup_latitude))
print("max_pickup_latitude=" + str(max_pickup_latitude))

## 2.2 Visualize pickup location

Now by using some appropriate Python libraries, we can visualize the pickup locations nicely on a map.

Since the data contains some bogus coordinates and some (maybe correct) outliers, we limit the area to the extends that we estimated above. This means that we throw away all records which lie outside of the core area (only for this visualization, of course!)

In [None]:
df = taxi_trips_sample.select("pickup_longitude","pickup_latitude") \
    .filter((taxi_trips_sample["pickup_longitude"] >= min_pickup_longitude) & (taxi_trips_sample["pickup_longitude"] <= max_pickup_longitude)) \
    .filter((taxi_trips_sample["pickup_latitude"] >= min_pickup_latitude) & (taxi_trips_sample["pickup_latitude"] <= max_pickup_latitude)) \
    .toPandas()

# Convert DataFrame to GeoDataFrame  
coords = pd.Series(zip(df["pickup_longitude"], df["pickup_latitude"]))
geo_df = gpd.GeoDataFrame(df, crs = {'init': 'epsg:4326'}, geometry = coords.apply(Point)).to_crs(epsg=3857)

# ... and make the plot
ax = geo_df.plot(figsize=(15, 10), alpha=0.1)

# Add basemap below
add_basemap(ax, 12)

Retrieve the geo extends for later reuse for more visualizations.

In [None]:
geo_min_x, geo_max_x = ax.get_xlim()
geo_min_y, geo_max_y = ax.get_ylim()

print("geo_min_x=" + str(geo_min_x))
print("geo_max_x=" + str(geo_max_x))
print("geo_min_y=" + str(geo_min_y))
print("geo_max_y=" + str(geo_max_y))

# 3. Simple Questions

Using the taxi trips table, we can already answer some simple questions.

## 3.1 Average Fare per Mile

In [None]:
# Perform aggregation
df = # YOUR CODE HERE

result = # YOUR CODE HERE

result.toPandas()

## 3.2 Average fare per minute

In [None]:
# Perform aggregation
df = taxi_trips.withColumn("fare_per_minute", taxi_trips["total_amount"]/taxi_trips["trip_time_in_secs"]*60)

result = df.select(
    f.avg(df["fare_per_minute"]).alias("avg_fare_per_minute"),
    f.stddev(df["fare_per_minute"]).alias("stddev_fare_per_minute")
)

result.toPandas()

# 4. Make some Pictures

Just to get a rough feeling about the data, we make some pictures of the taxi trip data.

## 4.1 Average trips per day of week

Let us see if the average number of trips is the same for every week day.

In [None]:
# Step 1: Calculate the number of trips for every day
trips_per_day = # YOUR CODE HERE

# Step 2: Calculate average number of trips per day of week
result = # YOUR CODE HERE

pdf = result.toPandas()

plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.bar(pdf["pickup_dayofweek"], pdf["avg_count"], align='center', alpha=0.5)
plt.ylabel('Frequency')
plt.title('Day of week')

## 4.2 Make a Plot of Fare per Day

The next picture contains the total fare amount (including tip and other expenses) for every day in 2013.

In [None]:
# Calculate two aggregations
#  1. Total fare amount per day
#  2. Total number of trips per day
daily = # YOUR CODE HERE

# Convert to Pandas    
pdf = daily.toPandas()

# Make a Plot
plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf["date"],pdf["amount"], color="red")
plt.plot(pdf["date"],pdf["count"], color="green")

plt.legend(handles=[
    mpatches.Patch(color='red', label='Total Fare Amount per Day'),
    mpatches.Patch(color='green', label='Total Number of Trips der Day')
])

## 4.3 Make a plot of average trips for each hour

Let us plot the average number of trips and amount of income per hour. This will be done using a two step aggregation:
1. Calculate the total number of trips and total fare for every hour in the whole year
2. Calculate the average numbers per hour from this data

In [None]:
# Step 1: Calculate totals for every hour of the year
hourly_trips = taxi_trips \
    .withColumn("date", f.to_date('pickup_datetime')) \
    .withColumn("hour", f.hour('pickup_datetime')) \
    .groupBy("hour", "date").agg(
        f.sum("total_amount").alias("total_amount"),
        f.count("total_amount").alias("total_count")
    )

# Step 2: Calculate average values per hour of day
hourly_avg = hourly_trips \
    .groupBy("hour").agg(
        f.avg("total_amount").alias("avg_amount"),
        f.avg("total_count").alias("avg_count")
    )\
    .orderBy("hour")

# Convert to Pandas    
pdf = hourly_avg.toPandas()

# Make a Plot
plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf["hour"],pdf["avg_amount"], color="red")
plt.plot(pdf["hour"],pdf["avg_count"], color="green")

plt.legend(handles=[
    mpatches.Patch(color='red', label='Average Total Fare per Hour'),
    mpatches.Patch(color='green', label='Average Trip Count per Hour')
])

## 4.3 Passenger Counts

Another simple question is a histogram of the passenger count of all trips. Note that again the data contains some bogus data, therefore we limit the analysis to records with a passenger count less than 20.

In [None]:
result = taxi_trips \
    .filter(f.col("passenger_count") < 20) \
    .groupBy("passenger_count") \
    .count()
pdf = result.toPandas()

plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.bar(pdf["passenger_count"], pdf["count"], align='center', alpha=0.5)
plt.ylabel('Frequency')
plt.title('Number of Passengers')

## 4.4 Average income by hour by driver

The next plot is a slight variation of the previous one, focusing on the individual driver. The question is, how much money does a driver make on average for a specific hour of the day. Again, this requires a two step aggregation

1. Calculate the total amount for every hour of the year for each driver
2. Calculate the average income per hour over all days and all drivers

In [None]:
# Step 1: Calculate totals per driver per hour per day
hourly_totals = taxi_trips \
    .withColumn("date", f.to_date('pickup_datetime')) \
    .withColumn("hour", f.hour('pickup_datetime')) \
    .groupBy("date", "hour", "hack_license").agg(
        f.sum("fare_amount").alias("fare_amount"),
        f.sum("tip_amount").alias("tip_amount"),
        f.count("fare_amount").alias("trip_count")
    )

# Step 2: Calculate average per hour
hourly_driver_avg = hourly_totals \
    .groupBy("hour").agg(
        f.avg("fare_amount").alias("avg_fare_amount"),
        f.avg("tip_amount").alias("avg_tip_amount"),
        f.avg("trip_count").alias("avg_trip_count")
    ) \
    .orderBy("hour")

# Convert to Pandas    
pdf = hourly_driver_avg.toPandas()

# Make a Plot
plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf["hour"],pdf["avg_fare_amount"], color="red")
plt.plot(pdf["hour"],pdf["avg_tip_amount"], color="green")
plt.plot(pdf["hour"],pdf["avg_trip_count"], color="blue")

plt.legend(handles=[
    mpatches.Patch(color='red', label='Average Fare Amount per Hour per Driver'),
    mpatches.Patch(color='green', label='Average Tip Amount per Hour per Driver'),
    mpatches.Patch(color='blue', label='Average Trip Count per Hour per Driver')
])

## 4.5 Average income per day

The next plot is a slight variation of the previous one, now looking at the income on a whole day. The question is, how much money does a driver make on average for a specific day. Again, this requires a two step aggregation

1. Calculate the total amount for every day of the year for each driver
2. Calculate the average income per day over all days and all drivers

In [None]:
# Step 1: Calculate totals per driver per date
daily_totals = taxi_trips \
    .withColumn("date", f.to_date('pickup_datetime')) \
    .groupBy("date", "hack_license").agg(
        f.sum("total_amount").alias("amount"),
        f.count("total_amount").alias("count")
    )

# Step 2: Calculate average per date 
daily_average = daily_totals \
    .groupBy("date").agg(
        f.avg("amount").alias("avg_amount"),
        f.avg("count").alias("avg_count")
    ) \
    .orderBy("date")

# Convert to Pandas    
pdf = daily_average.toPandas()

# Make a Plot
plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf["date"],pdf["avg_amount"], color="red")
plt.plot(pdf["date"],pdf["avg_count"], color="blue")

plt.legend(handles=[
    mpatches.Patch(color='red', label='Average Fare Amount per Day per Driver'),
    mpatches.Patch(color='blue', label='Average Trip Count per Day per Driver')
])

## 4.6 Tip by passenger count

Does the tip depend on the number of passengers? Let us display the average tip amount for each number of passengers.

In [None]:
tip_by_passengers = taxi_trips \
    .filter(taxi_trips["passenger_count"] < 10) \
    .groupBy("passenger_count").agg(
        f.avg("tip_amount").alias("tip_amount")
    ) \
    .orderBy("passenger_count")

# Convert to Pandas    
pdf = tip_by_passengers.toPandas()

# Make a Plot
plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.bar(pdf["passenger_count"], pdf["tip_amount"], align='center', alpha=0.5)
plt.ylabel('Tip Amount')
plt.title('Number of Passengers')

# 5. Preaggregated Taxi Trips

Before we start to include additional data sets from other sources, let us first focus a reasonable question which we'd like to give an answer to with machine learning.

We are not so much interested into the individual trips, but we'd like to understand at which time a driver can make most money. We already saw in the pictures above, that the average amount of money per hour per driver doesn't vary very much, although most money seems to be made in the evening hours. Unfortunately these numbers do not neccessarily tell the whole truth, since we don't have any information about how long a driver was actually working.

So the question is: *"Can we predict the overall fares for a specific hour on a specific day?"* We will refine that question a little bit and clarify what information may be used to create the prediction, such that it makes sense from a business point of view.

We now prepare the joined trip data to contain data for precisely this question - we will remove the drivers hack license and medallion.

## 5.1 Extend Information

As a first step, we add some more columns:
* **date** and **hour** - The pickup date and hour (without minutes or seconds)
* **lat_idx** and **long_idx** - We map the whole geo range into a grid and these two columns contains logical coordinates in this grid.

In [None]:
min_pickup_longitude=-74.007698
max_pickup_longitude=-73.776711
min_pickup_latitude=40.706902
max_pickup_latitude=40.799072

longitude_grid_size = 10
latitude_grid_size = 5
longitude_grid_diff = (max_pickup_longitude - min_pickup_longitude) / longitude_grid_size
latitude_grid_diff = (max_pickup_latitude - min_pickup_latitude) / latitude_grid_size

In [None]:
extended_trips = taxi_trips \
    .withColumn("date", f.to_date(taxi_trips["pickup_datetime"])) \
    .withColumn("hour", f.hour(taxi_trips["pickup_datetime"])) \
    .withColumn("lat_idx", f.rint((taxi_trips["pickup_latitude"] - min_pickup_latitude)/latitude_grid_diff)) \
    .withColumn("long_idx", f.rint((taxi_trips["pickup_longitude"] - min_pickup_longitude)/longitude_grid_diff)) \
    .withColumn("lat_idx", f.when((f.col("lat_idx") >= 0) & (f.col("lat_idx") < latitude_grid_size), f.col("lat_idx")).otherwise(-1).cast("int")) \
    .withColumn("long_idx", f.when((f.col("long_idx") >= 0) & (f.col("long_idx") < longitude_grid_size), f.col("long_idx")).otherwise(-1).cast("int")) \

extended_trips.limit(10).toPandas()

## 5.2 Preaggregate and Store into Refined Zone

Now we preaggregate the extended data using the following dimensions
* **date** and **hour**
* **lat_idx** and **long_idx**

In addition to the dimensions, the result will aggregate (sum up) the following metrics
* **passenger_count**
* **fare_amount**
* **tip_amount**
* **total_amount**

In [None]:
hourly_taxi_trips = # YOUR CODE HERE

hourly_taxi_trips.write.mode("overwrite").parquet(refined_basedir + "/taxi-trips-hourly")

In [None]:
hourly_taxi_trips = spark.read.parquet(refined_basedir + "/taxi-trips-hourly")
hourly_taxi_trips.limit(10).toPandas()

# 6. More Pictures

Using the preaggregated data set, we can now draw some more pictures.

## 6.1 Daily Aggregates

In [None]:
daily = hourly_taxi_trips \
    .groupBy("date").agg(
        f.sum("fare_amount").alias("fare_amount"),
        f.sum("tip_amount").alias("tip_amount"),
        f.sum("total_amount").alias("total_amount"),
        f.sum("trip_count").alias("trip_count")
    )\
    .orderBy("date")

# Convert to Pandas    
pdf = daily.toPandas()

# Make a Plot
plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf["date"],pdf["fare_amount"], color="red")
plt.plot(pdf["date"],pdf["tip_amount"], color="green")
plt.plot(pdf["date"],pdf["total_amount"], color="blue")
plt.plot(pdf["date"],pdf["trip_count"], color="violet")

plt.legend(handles=[
    mpatches.Patch(color='red', label='Fare Amount'),
    mpatches.Patch(color='green', label='Tip Amount'),
    mpatches.Patch(color='blue', label='Total Amount'),
    mpatches.Patch(color='violet', label='Trip Count')
])

## 6.2 Average fare and tip per hour

In [None]:
hourly = hourly_taxi_trips \
    .groupBy("hour").agg(
        f.avg(hourly_taxi_trips["fare_amount"] / hourly_taxi_trips["trip_count"]).alias("avg_fare_amount"),
        f.avg(hourly_taxi_trips["tip_amount"] / hourly_taxi_trips["trip_count"]).alias("avg_tip_amount")
    )\
    .orderBy("hour")

# Convert to Pandas    
pdf = hourly.toPandas()

# Make a Plot
plt.figure(figsize=(16, 6), dpi=80, facecolor='w', edgecolor='k')
plt.plot(pdf["hour"],pdf["avg_fare_amount"], color="red")
plt.plot(pdf["hour"],pdf["avg_tip_amount"], color="green")

plt.legend(handles=[
    mpatches.Patch(color='red', label='Average Fare Amount'),
    mpatches.Patch(color='green', label='Average Tip Amount')
])