# Taxi Trips (exercise)

# Kernel installation

- Run the following line (script) once, if the required kernel (big-data-dda-kernel) is not installed.
- You need to install this kernel only once

In [None]:
%%bash
mkdir -p ~/.local/share/jupyter/kernels/ai4seismology-bigdata
cp /data/horse/ws/s4122485-ai4seismology_dev/thursday_bigdata/kernel.json ~/.local/share/jupyter/kernels/ai4seismology-bigdata

## Important!!!
Once the Kernel is installed, 
1. Reload the notebook (reload/refresh the web page)
2. Select the kernel: Menu -> Kernel -> Change Kernel -> Select "dda-kernel"
Always use this kernel for upcoming exercises.

# Select ai4seismology-bigdata kernel

In [None]:
import sys

In [None]:
!{sys.executable} -m pip install --user --upgrade ipympl jupyter_leaflet leafmap ipyleaflet

# Restart the Jupyter Server

In [None]:
# To enable horizontal scrolling
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Initialisation of Spark cluster

In [None]:
# Note: Skip if running on local machine
# Import utilities required to run big data frameworks on ZIH HPC systems
from big_data_utils.environment_utils import ClusterConfig
from big_data_utils.cluster_utils import ClusterService
from big_data_utils.utils import kill_java_processes_by_name

In [None]:
# Note: Skip if running on local machine
# Configure the cluster environment
myconfig = ClusterConfig(fw_name="spark")
#myconfig.configure_env(conf_dest="./my-conf", conf_template="/projects/p_scads_bigdatahpc/.template/spark")
myconfig.configure_env(conf_dest="./my-conf",randomize_ports=True)

In [None]:
# Initialize the cluster service class
mycluster = ClusterService("spark")

# Check which processes are running
mycluster.check_status()

In [None]:
# Note: Skip if running on local machine
# Start Spark standalone cluster
mycluster.start_cluster()

In [None]:
# Note: Skip if running on local machine
# Check if the master and worker processes are started or not
mycluster.check_status()

# Download of NYC taxi trips and taxi zone file

Modify the base directory in the following cell if you want to save data files in different directories.

In [None]:
base_directory = "./data"

In [None]:
import os
import wget
import zipfile

base_directory = os.path.abspath(base_directory)
os.environ["BASEDIRECTORY"] = base_directory

# Download yellow trip data
data_directory = base_directory + "/taxidata"
data_file = "yellow_tripdata_2022-01.parquet"
data_path = data_directory + "/" + data_file
if not os.path.exists(data_path):
    os.makedirs(data_directory, exist_ok=True)
if not os.path.exists(data_path):
    wget.download("https://d37ci6vzurychx.cloudfront.net/trip-data/" + data_file, out = data_directory)   

# Download zone data
zone_directory = base_directory + "/taxizonesdata"
if not os.path.isdir(zone_directory):
    os.makedirs(zone_directory, exist_ok=True)

zone_zipfile = "taxi_zones.zip"
zone_zipfile_path = zone_directory + "/" + zone_zipfile
if not os.path.exists(zone_zipfile_path):
    wget.download("https://d37ci6vzurychx.cloudfront.net/misc/" + zone_zipfile, out = zone_directory)
    with zipfile.ZipFile(zone_zipfile_path, "r") as zip_ref:
        zip_ref.extractall(zone_directory)
        zip_ref.close()
    
zone_lookup_file = "taxi_zone_lookup.csv"
if not os.path.exists(zone_directory + "/" + zone_lookup_file):
    wget.download("https://d37ci6vzurychx.cloudfront.net/misc/" + zone_lookup_file, out = zone_directory)

# Initialisation of Spark context

In [None]:
import findspark
import os
findspark.init(os.environ['SPARK_HOME'])
print(os.environ['SPARK_HOME'])

In [None]:
import platform
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master(f"spark://{myconfig.get_master_host()}:{myconfig.get_master_port()}") \
    .appName("Python Spark Map Visualization of NYC taxi trips") \
    .getOrCreate()

sc = spark.sparkContext

In [None]:
# Check running java processes
mycluster.check_status()

In [None]:
trips = spark.read.parquet(data_path)

In [None]:
trips.dtypes

In [None]:
trips.show()

# Grouping using groupBy

In [None]:
# using groupBy
trips.groupBy("VendorID").count().show()

# Exercise 1
Count trips grouped by passengers.

Are there unexpected values? How can they be interpreted? Find more information on https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page.

In [None]:
trips. #add something here

# Exercise 2
Find the minimal distance for these groups. Are there unexpected values? How can they be interpreted?

In [None]:
trips. #add something here

# Exercise 3
Remove all trip distances of 0.0 miles from the previous result. What do you expect?

In [None]:
trips. #add something here

# SQL Queries

In [None]:
from pyspark.sql.types import *

sqlContext = SparkSession.builder.getOrCreate()

In [None]:
# Create a temporary view from the DataFrame
trips.createOrReplaceTempView("trips")

In [None]:
# Apply a SQL query
query = "SELECT fare_amount FROM trips WHERE trip_distance>=5"
sqlContext.sql(query).show()

# Exercise 4
Rewrite the previous statement without SQL, but with a functional statement.

In [None]:
trips. # add something here

In [None]:
# Compute summary statistics
trips.describe().show()

# Exercise 5
Find the distance for tips larger than $5  - Formulate a SQL query and apply it on the DataFrame.

In [None]:
query = # add something here
sqlContext.sql(query).show()

# Exercise 6
Formulate a query to get total amount of trip for distances larger than 30 miles.

In [None]:
query = # add something here
sqlContext.sql(query).show()

# Exercise 7
Create a box-and-whisker plot of the numerical columns. What do these say about the data?

In [None]:
%matplotlib inline

In [None]:
import matplotlib.pyplot as plt

In [None]:
for column in trips.dtypes:
    name = column[0]
    colType = column[1]
    if colType != 'string' and colType != 'timestamp' and colType != 'timestamp_ntz':
        columnQuantiles = trips. # add something here
        print("{} quantiles: {}".format(name,columnQuantiles))
        stats = [{
            "whislo": columnQuantiles # add something here
            "q1": columnQuantiles # add something here
            "med": columnQuantiles # add something here
            "q3": columnQuantiles # add something here
            "whishi": columnQuantiles # add something here
        }]
        fig, axes = plt.subplots(nrows=1, ncols=1, figsize=(5,5), sharey=True)
        axes.bxp(bxpstats=stats, showfliers=False)
        axes.grid(True)
        axes.set_title(name)

# Exercise 8
Provide an overview over the number of trips per week day.

In [None]:
def barchart(dataRows, titleSuffix):
    positions = list(reversed(range(len(dataRows))))
    names = [str(item[titleSuffix]) + " (" + str(item['count']) + ")" for item in dataRows]
    values = [item['count'] for item in dataRows]
    plt.grid()
    plt.barh(positions,values,align="center")
    plt.yticks(positions,names)
    plt.xlabel("Number of trips")
    plt.title("Distribution of trips per " + titleSuffix)
    plt.show()

In [None]:
import datetime
help(datetime.datetime.weekday)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
import calendar

#udf stands for user defined function
@udf 
def weekdayStr(d):
    return calendar.day_name # add something here

@udf(returnType=IntegerType())
def weekday(d):
    return d.weekday()

#Replace function weekday with function weekdayStr if you want.
weekdayRows = trips.select(weekday(trips.tpep_dropoff_datetime).alias("weekday")). # add something here

barchart(weekdayRows, "weekday")

# Exercise 9
Provide an overview over the number of trips per hour.

In [None]:
@udf(returnType=IntegerType())
def hour(d):
    return d.hour

hourRows = # add something here

barchart(hourRows, "hour")

# Map Visualisations

In [None]:
import leafmap

In [None]:
def getMap():
    map_args={
        "google_map":"HYBRID",
        #center to New York at 41 degrees north and 74 degrees west ([lat, lon])
        "center":[40.702557, -74.012318],
        "zoom":12,
        "height":"450px",
        "width":"800px",
        "max_zoom":"20"
    }
    return leafmap.Map(**map_args)

In [None]:
getMap()

In [None]:
def taxizoneColorFunction(taxiZonesIntensity, maximum_intensity, taxizoneFeature):
    taxizoneId = taxizoneFeature["properties"]["LocationID"]
    taxizoneIntensity = taxiZonesIntensity[taxizoneId] if taxizoneId in taxiZonesIntensity else 0
    return {
        "color": "black",
        "fillColor": '#%02X0000' % (int(taxizoneIntensity*255/maximum_intensity))
    }
def getTaxiZoneStylingFunction(taxiZonesIntensity):
    maximum_intensity = max(taxiZonesIntensity.values())
    return lambda x: taxizoneColorFunction(taxiZonesIntensity, maximum_intensity, x)

In [None]:
import sys
print(sys.executable)

In [None]:
taxizonesFile = base_directory+"/taxizonesdata/taxi_zones.shp"

def getZoneCenters():
    zone_centers={}
    my_geojson = leafmap.shp_to_geojson(taxizonesFile)
    for feature in my_geojson["features"]:
        location = feature["properties"]["LocationID"]
        coordinates = feature["geometry"]["coordinates"]
        avg_lat = 0
        avg_lon = 0
        count = 0
        for coordinate_list in coordinates:
            for coordinate in coordinate_list:
                if type(coordinate) == tuple and len(coordinate) == 2:
                    avg_lat += coordinate[1]
                    avg_lon += coordinate[0]
                    count += 1
                elif len(coordinate) > 2:
                    for coord in coordinate:
                        avg_lat += coord[1]
                        avg_lon += coord[0]
                        count += 1
        
        avg_lat = avg_lat/count
        avg_lon = avg_lon/count
        zone_centers[location]=[avg_lat, avg_lon]
    return zone_centers

zoneCenters = getZoneCenters()

In [None]:
def getHeatCenters(taxizoneIntensityMap):
    heat_data=[]
    for key, value in zoneCenters.items():
        location = key
        (lat, lon) = value
        taxizoneIntensity = taxizoneIntensityMap[location] if location in taxizoneIntensityMap else 0
        heat_data.append([lat, lon, taxizoneIntensity])
    return heat_data

# Exercise 10
Get the number of trips which start/end in each zone.

In [None]:
pickupData = trips. #add something here
dropoffData = trips. #add something here
grouped_by_pickup_location={row["PULocationID"]:row["count"] for row in pickupData}
grouped_by_dropoff_location={row["DOLocationID"]:row["count"] for row in dropoffData}

In [None]:
m = getMap()
m.add_shp(in_shp=taxizonesFile,layer_name="taxizone",style={},hover_style={}, style_callback=getTaxiZoneStylingFunction(grouped_by_pickup_location), fill_colors=None,
              info_mode='on_hover')
m.layer_opacity('taxizone', 0.9)
m.add_heatmap(data=getHeatCenters(grouped_by_pickup_location), name='pickup_heat', radius=10)
m.layer_opacity('pickup_heat', 0.9)
m

In [None]:
m = getMap()
m.add_shp(in_shp=taxizonesFile,layer_name="taxizone",style={},hover_style={}, style_callback=getTaxiZoneStylingFunction(grouped_by_dropoff_location), fill_colors=None,
              info_mode='on_hover')
m.layer_opacity('taxizone', 0.9)
m.add_heatmap(data=getHeatCenters(grouped_by_dropoff_location), name='dropoff_heat', radius=10)
m.layer_opacity('dropoff_heat', 0.9)
m

# Exercise 11
Collect the trips with the 10 highest tips. Be careful not to use trips with zones which indicate "Unknown" values.

In [None]:
zoneLookup = spark.read.csv(base_directory + "/taxizonesdata/taxi_zone_lookup.csv", header=True, inferSchema=True)

In [None]:
zoneLookup.filter(zoneLookup.Borough == "Unknown").show()
zoneLookup.filter(zoneLookup.Borough == "N/A").show()

In [None]:
zoneLookup.dtypes

In [None]:
trips.dtypes

In [None]:
help(trips.join)

In [None]:
# add something here to filter out Unknown values
tripsWithHighestTips = temporary. # add something here to take the top 10 elements
tripsWithHighestTips

In [None]:
from geojson import FeatureCollection, Feature, LineString
def to_lon_and_lat(latLonCoordinate):
    return [latLonCoordinate[1],latLonCoordinate[0]]

def trip_to_geojson(trip):
    start_point = to_lon_and_lat(zoneCenters[trip["PULocationID"]])
    end_point = to_lon_and_lat(zoneCenters[trip["DOLocationID"]])
    props = {
        "starttime":trip["tpep_pickup_datetime"].isoformat(),
        "startzone":trip["PULocationID"],
        "endtime":trip["tpep_dropoff_datetime"].isoformat(),
        "endzone":trip["DOLocationID"],
    }
    return Feature(geometry=LineString([start_point, end_point]), properties=props)

def tripList_to_geojson(tripList):
    coll = FeatureCollection(list(map(lambda item: trip_to_geojson(item),tripList)))
    return coll

In [None]:
trip_geojson = tripList_to_geojson(tripsWithHighestTips)

In [None]:
m = getMap()
m.add_shp(in_shp=taxizonesFile,layer_name="taxizone")
m.layer_opacity('taxizone', 0.9)
m.add_geojson(in_geojson=trip_geojson,layer_name="connections", style={"color":"red"})
m.layer_opacity('connections', 1.0)
m

In [None]:
# Check status of runnning java processes
mycluster.check_status()

In [None]:
# Stopping spark context
sc.stop()
mycluster.check_status()

In [None]:
mycluster.stop_cluster()
kill_java_processes_by_name("SparkSubmit")

In [None]:
mycluster.check_status()