In [1]:
import csv
import os
import timeit
import sys
from IPython.display import clear_output

from datetime import datetime

from sqlalchemy import Column, Integer, Boolean, Float, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from geoalchemy2 import Geometry

In [2]:
engine = create_engine('postgresql+psycopg2://postgres:organon@localhost/nyc_taxi')

Base = declarative_base()
class Trip(Base):
    __tablename__ = 'trip'
    trip_id = Column(Integer, primary_key=True) # Trip ID, sqlalchemy will map it to postgresql.SERIAL
    is_yellow = Column(Boolean) # True for yellow, False for green
    pickup_datetime = Column(DateTime) # Pickup date and time
    dropoff_datetime = Column(DateTime) # Dropoff date and time
    distance = Column(Float) # Trip distance
    
    pickup_lonlat = Column(Geometry(geometry_type='POINT', srid=4269)) # Start longitude and latitude (postgis)
    dropoff_lonlat = Column(Geometry(geometry_type='POINT', srid=4269)) # End longitude and latitude (postgis)
    
    net_amt = Column(Float) # Net amount, namely the total amount minus the toll amount
    
Trip.__table__.create(engine)

In [None]:
def insert_from_file(engine, directory, filename, is_yellow, batch_size=1e6):
    session = sessionmaker()
    session.configure(bind=engine)
    s = session()
    
    filename_full = directory + filename
    count_row = 0
    # Minor differences in the file format between the yellow and green cab
    # The column indices corresponds to pickup_datatime, dropoff_datetime, distance,
    # pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude, 
    # toll_amount, total_amount
    if is_yellow:
        idx_cols = [1, 2, 4, 5, 6, 9, 10, 16, 17]
    else:
        idx_cols = [1, 2, 10, 5, 6, 7, 8, 15, 17]
        
    try:
        with open(filename_full, newline='') as csvfile:
            reader = csv.reader(csvfile, delimiter=',')
            next(reader, None) # Skip the header
            for line in reader:
                if line and all(map(bool, [line[col] for col in idx_cols])) and -75 < float(line[idx_cols[3]]) < -70 and -75 < float(line[idx_cols[5]]) < -70:
                    trip = Trip(**{
                            'is_yellow': is_yellow,
                            'pickup_datetime': datetime.strptime(line[idx_cols[0]], '%Y-%m-%d %H:%M:%S'),
                            'dropoff_datetime': datetime.strptime(line[idx_cols[1]], '%Y-%m-%d %H:%M:%S'),
                            'distance': float(line[idx_cols[2]]),
                            'pickup_lonlat': 'SRID=4269;POINT({0} {1})'.format(line[idx_cols[3]], line[idx_cols[4]]),
                            'dropoff_lonlat': 'SRID=4269;POINT({0} {1})'.format(line[idx_cols[5]], line[idx_cols[6]]),
                            'net_amt': float(line[idx_cols[8]]) - float(line[idx_cols[7]])
                            })
                    s.add(trip) #Add all the records
                    count_row = count_row + 1
                
                if count_row > 0 and count_row % batch_size == 0:
                    s.commit()
                    #clear_output()
                    print(' - {0} rows committed!'.format(count_row))
                    sys.stdout.flush()
                    
            if count_row % batch_size != 0:
                s.commit()
    except:
        s.rollback() #Rollback the changes on error
    finally:
        s.close() #Close the connection
        
    return count_row

In [3]:
dir_trip_green = "./datasets/green_2014/"

for idx, filename in enumerate(os.listdir(dir_trip_green)): # "." means current directory
    start_time = timeit.default_timer()
    nrows = insert_from_file(engine, dir_trip_green, filename, False)
    print("{0}: {1} rows from {2} inserted in {3} secs".format(idx, nrows, filename, timeit.default_timer()-start_time))
    sys.stdout.flush()

 - 1000000 rows committed!
0: 1270185 rows from green_tripdata_2014-07.csv inserted in 651.7393154399997 secs
 - 1000000 rows committed!
1: 1417220 rows from green_tripdata_2014-05.csv inserted in 691.7019021550004 secs
 - 1000000 rows committed!
2: 1305266 rows from green_tripdata_2014-04.csv inserted in 724.2230242559999 secs
 - 1000000 rows committed!
3: 1340949 rows from green_tripdata_2014-08.csv inserted in 761.3165986990025 secs
 - 1000000 rows committed!
4: 1002225 rows from green_tripdata_2014-02.csv inserted in 583.4499504420019 secs
 - 1000000 rows committed!
5: 1289976 rows from green_tripdata_2014-03.csv inserted in 742.6310740890003 secs
 - 1000000 rows committed!
6: 1641665 rows from green_tripdata_2014-12.csv inserted in 945.3414683209994 secs
 - 1000000 rows committed!
7: 1487823 rows from green_tripdata_2014-10.csv inserted in 871.0925675079998 secs
 - 1000000 rows committed!
8: 1544341 rows from green_tripdata_2014-11.csv inserted in 901.4707674310011 secs
 - 1000000

In [5]:
dir_trip_yellow = "./datasets/yellow_2014/"

for idx, filename in enumerate(os.listdir(dir_trip_yellow)): # "." means current directory
    start_time = timeit.default_timer()
    nrows = insert_from_file(engine, dir_trip_yellow, filename, True)
    print("{0}: {1} rows from {2} inserted in {3} secs".format(idx, nrows, filename, timeit.default_timer()-start_time))
    sys.stdout.flush()

 - 1000000 rows committed!
 - 2000000 rows committed!
 - 3000000 rows committed!
 - 4000000 rows committed!
 - 5000000 rows committed!
 - 6000000 rows committed!
 - 7000000 rows committed!
 - 8000000 rows committed!
 - 9000000 rows committed!
 - 10000000 rows committed!
 - 11000000 rows committed!
 - 12000000 rows committed!
0: 12967361 rows from yellow_tripdata_2014-11.csv inserted in 7189.542198682 secs
 - 1000000 rows committed!
 - 2000000 rows committed!
 - 3000000 rows committed!
 - 4000000 rows committed!
 - 5000000 rows committed!
 - 6000000 rows committed!
 - 7000000 rows committed!
 - 8000000 rows committed!
 - 9000000 rows committed!
 - 10000000 rows committed!
 - 11000000 rows committed!
 - 12000000 rows committed!
 - 13000000 rows committed!
 - 14000000 rows committed!
 - 15000000 rows committed!
1: 15147865 rows from yellow_tripdata_2014-03.csv inserted in 8723.442068528002 secs
 - 1000000 rows committed!
 - 2000000 rows committed!
 - 3000000 rows committed!
 - 4000000 row