# NYC T&LC Data

## Size
The size of the data is summarized in the following table, with a grand total shown in the lower right (1.2 billion):

Year|Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec|Total Result
-|-:|-:|-:|-:|-:|-:|-:|-:|-:|-:|-:|-:|-:
2009|1.41E+7|1.34E+7|1.44E+7|1.43E+7|1.48E+7|1.42E+7|1.36E+7|1.37E+7|1.40E+7|1.56E+7|1.43E+7|1.46E+7|1.71E+8
2010|1.49E+7|1.11E+7|1.29E+7|1.51E+7|1.55E+7|1.48E+7|1.47E+7|1.25E+7|1.55E+7|1.42E+7|1.39E+7|1.38E+7|1.69E+8
2011|1.35E+7|1.42E+7|1.61E+7|1.47E+7|1.56E+7|1.51E+7|1.47E+7|1.33E+7|1.46E+7|1.57E+7|1.45E+7|1.49E+7|1.77E+8
2012|1.50E+7|1.50E+7|1.61E+7|1.55E+7|1.56E+7|1.51E+7|1.44E+7|1.44E+7|1.45E+7|1.45E+7|5.16E+6|1.47E+7|1.70E+8
2013|1.48E+7|1.40E+7|1.57E+7|1.51E+7|1.53E+7|1.44E+7|1.38E+7|1.26E+7|1.42E+7|1.52E+7|1.48E+7|1.46E+7|1.74E+8
2014|1.46E+7|1.41E+7|1.67E+7|1.59E+7|1.62E+7|1.52E+7|1.44E+7|1.40E+7|1.47E+7|1.57E+7|1.48E+7|1.47E+7|1.81E+8
2015|1.43E+7|1.40E+7|1.51E+7|1.47E+7|1.49E+7|1.40E+7|1.31E+7|1.27E+7|1.27E+7|1.39E+7|1.28E+7|1.31E+7|1.65E+8
Total Result|1.01E+8|9.58E+7|1.07E+8|1.05E+8|1.08E+8|1.03E+8|9.87E+7|9.32E+7|1.00E+8|1.05E+8|9.03E+7|1.00E+8|**1.21E+9**

## Parameters


# Bulk Loading

## Parameter Comparison
The raw data is CSV using DOS line terminators.  The table below lists both data formats and the corresponding column number and parameter name in each.  Such a table is useful for configuring things like awk or other data reading features.

|Column|Yellow|Green|
|-|-|-|
|1|VendorID              |VendorID
|2|tpep_pickup_datetime  |lpep_pickup_datetime
|3|tpep_dropoff_datetime |Lpep_dropoff_datetime
|4|**passenger_count**   |**Store_and_fwd_flag**
|5|**trip_distance**     |**RateCodeID**
|6|pickup_longitude      |Pickup_longitude
|7|pickup_latitude       |Pickup_latitude
|8|**RateCodeID**        |**Dropoff_longitude**
|9|**store_and_fwd_flag**|**Dropoff_latitude**
|10|**dropoff_longitude**|**Passenger_count**
|11|**dropoff_latitude** |**Trip_distance**
|12|**payment_type**     |**Fare_amount**
|13|**fare_amount**      |**Extra**
|14|**extra**            |**MTA_tax**
|15|**mta_tax**          |**Tip_amount**
|16|**tip_amount**       |**Tolls_amount**
|17|**tolls_amount**     |**Ehail_fee**
|18|improvement_surcharge|improvement_surcharge
|19|total_amount         |Total_amount
|20|                     |**Payment_type**
|21|                     |**Trip_type**

The bash script below performs an initial translation of the data into the data schema being used for trips.  The output from that script is much larger due to the JSON data but the file itself is consumable using the COPY command in PostGreSQL, which makes it orders of magnitude faster.  The script performs the following:

1. Expand the given input data file to an absolute path (for traceability)
1. Strip the header row from the top of the file via tail -n +2
1. Remove the carriage return via dos2unix
1. Convert the original file format to the desired format using awk
1. Removes any records where the positional information is 0

The fields for a trip in the database fields are described below:
1. **id**: Skipping this in the COPY FROM command results in the database assigning the default, which is what is desired.
1. **entity**: This will be NULL for NYC T&LC data
1. **start_datetime** = models.DateTimeField()
1. **duration** = models.DurationField()
1. **geometry** = models.LineStringField(dim=3, null=True)
1. **metadata** = JsonBField()
1. **archive_uri** = models.CharField(max_length=1024)

If the COPY command does not receive an input for a column it will use any defined default or not work if the field is required.  This works to our benefit in the case of the id, which is defined to always get a unique identifier.  The code below will bulk load a single file into the database.

## Method 1: Use shell commands to convert the data

In [36]:
nyc_parse_config = {
    'yellow_v1': {
        'pattern': 'yellow_tripdata_(2009|201[0-4])',
        'columns': {
            'passenger_count': '$4',
            'trip_dist': '$5',
            'pu_lon': '$6',
            'pu_lat': '$7',
            'do_lon': '$10',
            'do_lat': '$11',
            'rate_code': '$8',
            'store_fwd_flag': '$9',
            'pay_type': '$12',
            'fare_amount': '$13',
            'extra': '$14',
            'mta_tax': '$15',
            'imp_charge': '0',
            'tip_amount': '$16',
            'tolls_amount': '$17',
            'ehail_fee': 0,
            'total_amount': '$18',
            'trip_type': "Unknown",
            'taxi_type': 'yellow',
            'fields': 18
        },
    },
    'yellow_v2': {
        'pattern': 'yellow_tripdata_(2015|2016-(0[1-6]))',
        'columns': {
            'passenger_count': '$4',
            'trip_dist': '$5',
            'pu_lon': '$6',
            'pu_lat': '$7',
            'do_lon': '$10',
            'do_lat': '$11',
            'rate_code': '$8',
            'store_fwd_flag': '$9',
            'pay_type': '$12',
            'fare_amount': '$13',
            'extra': '$14',
            'mta_tax': '$15',
            'imp_charge': '$18',
            'tip_amount': '$16',
            'tolls_amount': '$17',
            'ehail_fee': 0,
            'total_amount': '$19',
            'trip_type': "Unknown",
            'taxi_type': 'yellow',
            'fields': 19
        },
    },
    # 'yellow_v3': {
    #     'pattern': 'yellow_tripdata_2016-(0[7-9]|1[0-2])',
    #     'columns': {
    #         # Pickup and Dropoff locations change to areas, requires revision of data model
    #     },
    # },
    'green_v1': {
        'pattern': 'green_tripdata_(2009|201[0-4])',
        'columns': {
            'passenger_count': '$10',
            'trip_dist': '$11',
            'pu_lon': '$6',
            'pu_lat': '$7',
            'do_lon': '$8',
            'do_lat': '$9',
            'rate_code': '$5',
            'store_fwd_flag': '$4',
            'pay_type': '$19',
            'fare_amount': '$12',
            'extra': '$13',
            'mta_tax': '$14',
            'imp_charge': 0,
            'tip_amount': '$15',
            'tolls_amount': '$16',
            'ehail_fee': '$17',
            'total_amount': '$18',
            'trip_type': '$20',
            'taxi_type': 'green',
            'fields': 20
        },
    },
    'green_v2': {
        'pattern': 'green_tripdata_(2015|2016-(0[1-6]))',
        'columns': {
            'passenger_count': '$10',
            'trip_dist': '$11',
            'pu_lon': '$6',
            'pu_lat': '$7',
            'do_lon': '$8',
            'do_lat': '$9',
            'rate_code': '$5',
            'store_fwd_flag': '$4',
            'pay_type': '$20',
            'fare_amount': '$12',
            'extra': '$13',
            'mta_tax': '$14',
            'imp_charge': '$18',
            'tip_amount': '$15',
            'tolls_amount': '$16',
            'ehail_fee': '$17',
            'total_amount': '$19',
            'trip_type': '$21',
            'taxi_type': 'green',
            'fields': 21
        },
    },
    # 'green_v3': {
    #     'pattern': 'green_tripdata_2016-(0[7-9]|1[0-2])',
    #     'columns': {
    #         # Pickup and Dropoff locations change to areas, requires revision of data model
    #     },
    # },
}

In [37]:
from textwrap import dedent
awk_template=dedent("""\
            BEGIN {{ FS = ","; OFS = "|"}}
            {{
                if ( NR == 1 ) {{
                    print $0 > "/dev/stderr"
                    next
                }}
                if (/^\s*$/ ) {{ next; }}
                if ( {pu_lon} == "0" || {pu_lat} == "0" || {do_lon} == "0" || {do_lat} == "0" ) {{
                    print "Invalid position " FILENAME "(" NR "): " $0 > "/dev/stderr"
                    next
                }}
                match($2, /^([0-9]{{4}})-([0-9]{{2}}).*/, ymd)
                t1 = gensub(/[-:]/," ","g",$2);
                t2 = gensub(/[-:]/," ","g",$3);
                d1=mktime(t1);
                d2=mktime(t2);
                if ( d2-d1 > 1000000 ) {{
                    print "Invalid timestamp " FILENAME "(" NR "): " $0 > "/dev/stderr"
                    next
                }}
                gsub(/\r/, "", $NF);

                if( $1==1 || $1=="CMT" ) vendor="Creative Mobile Technologies, LLC";
                else if( $1==2 || $1=="VTS" ) vendor="VeriFone Inc.";
                else if( $1=="DDS" ) vendor="Digital Dispatch Systems Inc.";
                else vendor=$1;

                py = toupper({pay_type})
                if( py=="2" || py=="CASH" || py=="CAS" || py=="CSH" ) payment="Cash"
                else if( py=="1" || py=="CRD" || py=="CRE" || py=="CREDIT" ) payment="Credit"
                else if( py=="3" ) payment="No Charge"
                else if( py=="4" ) payment="Dispute"
                else if( py=="5" ) payment="Unknown"
                else if( py=="6" ) payment="Voided Trip"
                else payment={pay_type}

                if( {rate_code}==1 ) rate="Standard rate"
                else if( {rate_code}==2 ) rate="JFK"
                else if( {rate_code}==3 ) rate="Newark"
                else if( {rate_code}==4 ) rate="Nassau or Westchester"
                else if( {rate_code}==5 ) rate="Negotiated fare"
                else if( {rate_code}==6 ) rate="Group ride"
                else rate={rate_code}

                if( {trip_type}==1 ) tt="Street-hail"
                else if( {trip_type}==2 ) tt="Dispatch"
                else tt={trip_type}

                if( {store_fwd_flag}=="Y" ) sf="true";
                else sf="false";

                row_id=sprintf("%d", ((ymd[1]-2000)*100+ymd[2])) sprintf("%015d", FNR)

                print row_id, $2, d2-d1, "SRID=4326;LINESTRING(" {pu_lon} " " {pu_lat} " 0," {do_lon} " " {do_lat} " " d2-d1 ")", FILENAME"?line="NR, \\
                    "{{"\\
                        "\\"vendor\\":" "\\""vendor"\\"" \\
                        ",\\"trip_distance\\":" sprintf("%g", {trip_dist}) \\
                        ",\\"passenger_count\\":" {passenger_count} \\
                        ",\\"rate\\":"  "\\""rate"\\"" \\
                        ",\\"fare_amount\\":" sprintf("%.2f", {fare_amount}) \\
                        ",\\"mta_tax\\":" sprintf("%.2f", {mta_tax}) \\
                        ",\\"tip_amount\\":" sprintf("%.2f", {tip_amount}) \\
                        ",\\"tolls_amount\\":" sprintf("%.2f", {tolls_amount}) \\
                        ",\\"ehail_fee\\":" sprintf("%.2f", {ehail_fee}) \\
                        ",\\"extra\\":" sprintf("%.2f", {extra}) \\
                        ",\\"improvement_surcharge\\":" sprintf("%.2f", {imp_charge}) \\
                        ",\\"total_amount\\":" sprintf("%.2f", {total_amount}) \\
                        ",\\"payment_type\\":" "\\""payment"\\"" \\
                        ",\\"store_and_fwd_flag\\":" sf \\
                        ",\\"trip_type\\":" "\\""tt"\\"" \\
                        ",\\"taxi_type\\": \\"{taxi_type}\\"" \\
                    "}}"
            }}
        """)

In [41]:
awk_script = awk_template.format(**nyc_parse_config['green_v2']['columns'])
with open('/home/dingbat/data/taxi/NYCTLC/taxi.awk', 'w') as f:
    f.write(awk_script)

# Calling awk from python and streaming results

In [42]:
from datetime import datetime
from django.db import connections


if True:
    ifilename = '/home/dingbat/data/taxi/NYCTLC/yellow_tripdata_2009-01-sample.csv'
else:
    ifilename = '/home/dingbat/data/taxi/NYC/DataNYCTLC/2015/yellow/yellow_tripdata_2015-01.csv'

awk_script = awk_template.format(**nyc_parse_config['yellow_v2']['columns'])

In [43]:
with open('id_split.py', 'w') as f:
    f.write(dedent("""\
        import sys
        ids = []
        for line in sys.stdin:
            split_line = line.split('|', 1)
            ids.append(split_line[0])
            print(line)
    """))

In [95]:
from django.db import transaction
from entity.models import Trip
from contextlib import closing
from django.db import connection
import subprocess
import os
import io

start = datetime.now()

# Create a stream from the shell commands for use with copy_expert.
ifilename = os.path.abspath(ifilename)
ps_awk = subprocess.Popen(
    ['awk', '-v', 'f={}'.format(ifilename), awk_script, ifilename],
    stdout=subprocess.PIPE
)
ps_awk = subprocess.Popen(
    ['awk', '-v', 'f={}'.format(ifilename), awk_script, ifilename],
    stdout=subprocess.PIPE
)

# Read in the stream to the database.
cursor = connections['default'].cursor()
cursor.copy_expert(
    "COPY entity_trip(start_datetime, duration, geometry, archive_uri, metadata) FROM STDOUT DELIMITER '|'",
    file=ps_awk.stdout
)

end = datetime.now()

print('Database load: Start={}; End={}'.format(start, end))
print('Created {:,} records in {}s'.format(cursor.rowcount, end - start))

Database load: Start=2016-03-27 00:57:03.537268; End=2016-03-27 00:57:03.559222
Created 34 records in 0:00:00.021954s


In [81]:
# Remove trajectories that are outside of the valid region or invalid for some reason (usually same start and end point)
start = datetime.now()
cursor.execute("""
    DELETE FROM entity_trip
    WHERE
        NOT ST_GeomFromText('POLYGON((-180 -90, 180 -90, 180 90, -180 90, -180 -90))') ~ geometry
        OR
        NOT ST_IsValid(geometry)
    """.replace('\n', ' '))
'Removed {} invalid rows in {}'.format(cursor.rowcount, datetime.now()-start)

'Removed 71502 invalid rows in 0:01:12.633380'

In [77]:
cursor.execute("DROP FUNCTION IF EXISTS osm_roadmatch_line(geometry, int);")
cursor.execute("DROP FUNCTION IF EXISTS osm_roadmatch_point(geometry, int);")

cursor.execute("CREATE EXTENSION IF NOT EXISTS hstore;")
cursor.execute("""
CREATE OR REPLACE FUNCTION osm_roadmatch_point(
    point geometry,
    initial_results integer DEFAULT 10)
RETURNS bigint AS $BODY$
DECLARE
    the_match bigint;
BEGIN
    point := ST_Transform(point, 3857);
    SELECT l.gid, ST_Distance(l.way, point) as dist
    INTO the_match
    FROM planet_osm_line as l 
    WHERE
        ST_DWithin(point, l.way, 20)
        AND
        l.highway IN (
            'motorway',
            'trunk',
            'primary',
            'secondary',
            'tertiary',
            'unclassified',
            'residential',
            'service',
            'motorway_link',
            'trunk_link',
            'primary_link',
            'secondary_link',
            'tertiary_link',
            'living_street',
            'road',
            'turning_circle'
        )
    ORDER BY dist
    LIMIT initial_results;
    RETURN the_match;
END $BODY$
LANGUAGE plpgsql VOLATILE;
""")

cursor.execute("""
CREATE FUNCTION osm_roadmatch_line(
    IN geometry,
    initial_results int DEFAULT 10)
RETURNS bigint[] AS $BODY$
DECLARE
    roadmatch bigint[];
BEGIN
    SELECT array_agg(osm_roadmatch_point((dp).geom, initial_results)) INTO roadmatch
    FROM (SELECT ST_DumpPoints($1) AS dp) As foo;
    RETURN roadmatch;
END $BODY$
LANGUAGE plpgsql VOLATILE;
""")

In [90]:
# Execute the road matching query.
start = datetime.now()
cursor.execute("""
WITH roadmatch_metadata AS (
    WITH expanded_json_keys AS (
        WITH roadmatch_all AS (
            SELECT
                id
                , osm_roadmatch_line(geometry) as matches
                , metadata
            FROM entity_trip
            WHERE
                NOT metadata ? 'roadmatch'
                AND
                id BETWEEN 0 AND 1000
        )
        SELECT id, j1.key, j1.value FROM roadmatch_all, jsonb_each(metadata) as j1
        UNION
        SELECT id, 'roadmatch', to_json(roadmatch_all.matches)::jsonb FROM roadmatch_all
    )
    SELECT
        id,
        json_object_agg(key, value)::jsonb as metadata
    FROM expanded_json_keys
    GROUP BY id
)
SELECT * FROM roadmatch_metadata
""".replace('\n', ' '))
print(cursor.fetchone())
'Found {} rows in {}'.format(cursor.rowcount, datetime.now()-start)

None


'Found 0 rows in 0:00:00.002384'

In [91]:
# Execute the road matching query.
start = datetime.now()
cursor.execute("""
WITH roadmatch_metadata AS (
    WITH expanded_json_keys AS (
        WITH roadmatch_all AS (
            SELECT
                id
                , osm_roadmatch_line(geometry) as matches
                , metadata
            FROM entity_trip
            WHERE
                NOT metadata ? 'roadmatch'
                AND
                id BETWEEN 20000 AND 21000
        )
        SELECT id, j1.key, j1.value FROM roadmatch_all, jsonb_each(metadata) as j1
        UNION
        SELECT id, 'roadmatch', to_json(roadmatch_all.matches)::jsonb FROM roadmatch_all
    )
    SELECT
        id,
        json_object_agg(key, value)::jsonb as metadata
    FROM expanded_json_keys
    GROUP BY id
)
UPDATE entity_trip
SET metadata=roadmatch_metadata.metadata
FROM roadmatch_metadata
WHERE entity_trip.id=roadmatch_metadata.id
""".replace('\n', ' '))
'Roadmatched {} rows in {}'.format(cursor.rowcount, datetime.now()-start)

'Roadmatched 0 rows in 0:00:00.002572'

* http://www.postgresql.org/docs/9.4/interactive/populate.html
* http://stefano.dissegna.me/django-pg-bulk-insert.html
* https://wiki.postgresql.org/wiki/COPY
* http://initd.org/psycopg/docs/cursor.html#cursor.copy_expert
* http://adpgtech.blogspot.com/2014/09/importing-json-data.html