In [6]:
import pandas as pd
import datetime as dt
import simplejson as json
from escapejson import escapejson

# SQLAlchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
sample_size = 0.01 # 1% sample size

from sqlalchemy import Column, Integer, String, Float, DateTime, null, ForeignKey, BigInteger

In [7]:
class TaxiTrip(Base):
    __tablename__ = 'taxi_trip'
    trip_id = Column(BigInteger, primary_key=True, autoincrement=True)
    pickup_time = Column(DateTime, nullable=True) 
    dropoff_time = Column(DateTime, nullable=True) 
    passenger_count = Column(Float, nullable=True)
    trip_distance = Column(Float, nullable=True)
    pickup_loc_id = Column(BigInteger, nullable=True)
    dropoff_loc_id = Column(BigInteger, nullable=True)
    total_amount = Column(Float, nullable=True)
    payment_type = Column(Integer, nullable=True)
    taxi_type = Column(Integer, nullable=True)

In [8]:
class ForHireTrip(Base):
    __tablename__ = 'for_hire_trip'
    trip_id = Column(BigInteger, primary_key=True, autoincrement=True)
    company = Column(String(10), nullable=True)
    pickup_time = Column(DateTime, nullable=True) 
    dropoff_time = Column(DateTime, nullable=True) 
    pickup_loc_id = Column(BigInteger, nullable=True)
    dropoff_loc_id = Column(BigInteger, nullable=True)
    shared_flag = Column(Integer, nullable=True)

In [9]:
class Zone(Base):
    __tablename__ = 'zone'
    location_id = Column(BigInteger, primary_key=True, autoincrement=False)
    street = Column(String(120), nullable=True)
    latitude = Column(Float, nullable=True)
    longitude = Column(Float, nullable=True)
    street = Column(String(100), nullable=True)
    city = Column(String(100), nullable=True)
    county = Column(String(100), nullable=True)
    zip = Column(Integer, nullable=True)

In [10]:
# Path to sqlite
# Create an engine that can get to the database
#engine = create_engine(f"sqlite:///{database_path}")
engine = create_engine('postgresql+pg8000://postgres:123@localhost:5432/Taxi')
conn = engine.connect()

# Use this to clear out the db
Base.metadata.drop_all(engine)

# Create a "Metadata" Layer That Abstracts our SQL Database
# ----------------------------------
Base.metadata.create_all(engine)

from sqlalchemy.orm import Session
session = Session(bind=engine)

In [11]:
#import the Zones Data csv file into dataframe
file_movie = "raw-data/zones/taxi_zone_lookup.csv"

#columns to import from csv file .. 
col_list = ["LocationID","Street","Latitude","Longitude","City","State","County","Zip"]
zones_df = pd.read_csv(file_movie,usecols=col_list, low_memory=False)
zones_df.head(5)

Unnamed: 0,LocationID,Street,Latitude,Longitude,City,State,County,Zip
0,1,"Newark Airport, EWR",40.68863,-74.018244,New York,NY,New York County,10004
1,2,"Jamaica Bay, Queens",40.556245,-73.919644,Breezy Point,NY,Queens County,11697
2,3,"Allerton/Pelham Gardens, Bronx",40.68863,-74.018244,New York,NY,New York County,10004
3,4,"Alphabet City, Manhattan",40.68863,-74.018244,New York,NY,New York County,10004
4,5,"Arden Heights, Staten Island",40.863679,-73.927964,New York,NY,New York County,10040


In [12]:
#read each row from the dataframe and store in the TaxiTrip table
print("Starting Zones Migration")
for index, row in zones_df.iterrows():
    zone = Zone(location_id=row["LocationID"], street=row["Street"], latitude=row["Latitude"], 
                longitude=row["Longitude"],city=row["City"],county=row["County"],zip=row["Zip"])
    session.add(zone)

session.commit() #save the session
print("Finished Zones Migration")

Starting Zones Migration
Finished Zones Migration


In [13]:
#Yellow Taxi .. taxi_type = 1
#import the Yellow Taxi Data csv file into dataframe
file_movie = "raw-data/taxis/yellow_tripdata_2019-12.csv"

#columns to import from csv file .. 
col_list = ["tpep_pickup_datetime","tpep_dropoff_datetime", "passenger_count","trip_distance","PULocationID","DOLocationID","payment_type","total_amount"]
yellow_taxi_df = pd.read_csv(file_movie,usecols=col_list, low_memory=False)
yellow_taxi_df = yellow_taxi_df.sample(frac=sample_size)
yellow_taxi_df.head(5)
#yellow_taxi_df.dtypes

Unnamed: 0,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,PULocationID,DOLocationID,payment_type,total_amount
6771762,2019-12-31 16:46:48,2019-12-31 17:10:31,3.0,2.7,186,45,1.0,22.8
5571402,2019-12-23 21:03:36,2019-12-23 21:20:50,1.0,1.61,230,237,1.0,18.96
468031,2019-12-03 11:02:46,2019-12-03 11:14:54,2.0,1.5,163,90,1.0,14.75
567884,2019-12-03 19:21:50,2019-12-03 19:32:01,1.0,1.21,162,234,1.0,14.76
3964664,2019-12-17 08:28:30,2019-12-17 08:34:51,1.0,0.7,140,229,1.0,10.55


In [14]:
#read each row from the dataframe and store in the TaxiTrip table
print("Starting Yellow Taxi Migration")
count = 0
for index, row in yellow_taxi_df.iterrows():
    pickup_dt = null()
    dropoff_dt = null()
    if not pd.isna(row["tpep_pickup_datetime"]): #check to see if the date is not null or not na
        try: #try/except to avoid error due to date converion from string 2019-12-01 00:05:27
            pickup_dt = dt.datetime.strptime(row["tpep_pickup_datetime"], '%Y-%m-%d %H:%M:%S')
        #break
        except ValueError:
            pickup_dt = null()
            print(f'{row["tpep_pickup_datetime"]}, {index}')
            
    if not pd.isna(row["tpep_dropoff_datetime"]): #check to see if the date is not null or not na
        try: #try/except to avoid error due to date converion from string 2019-12-01 00:05:27
            dropoff_dt = dt.datetime.strptime(row["tpep_dropoff_datetime"], '%Y-%m-%d %H:%M:%S')
        #break
        except ValueError:
            dropoff_dt = null()
            print(f'{row["tpep_dropoff_datetime"]}, {index}')
    
    payment_type = row["payment_type"]
    if pd.isna(row["payment_type"]):
        payment_type = 5
    else:
        payment_type = int(payment_type)
    
    #creating the TaxiTrip object and adding it to the db session
    trip = TaxiTrip(pickup_time=pickup_dt,
                    dropoff_time=dropoff_dt,passenger_count=row["passenger_count"], 
                    trip_distance=row["trip_distance"], pickup_loc_id=row["PULocationID"], 
                    dropoff_loc_id=row["DOLocationID"], total_amount=row["total_amount"],
                    payment_type=payment_type, taxi_type=1)
    session.add(trip)
    count = count + 1
    if count % 10000 == 0:
        print(f'{count}', end="\r")
        session.commit()

if session.is_modified:
    session.commit() #save the session
print("Finished Yellow Taxi Migration")

Starting Yellow Taxi Migration
Finished Yellow Taxi Migration


In [15]:
#Green Taxi .. taxi_type = 2
#import the Green Taxi Data csv file into dataframe
file_movie = "raw-data/taxis/green_tripdata_2019-12.csv"

#columns to import from csv file .. 
col_list = ["lpep_pickup_datetime","lpep_dropoff_datetime", "passenger_count","trip_distance","PULocationID","DOLocationID","payment_type","total_amount"]
green_taxi_df = pd.read_csv(file_movie,usecols=col_list, low_memory=False)
green_taxi_df = green_taxi_df.sample(frac=sample_size)
green_taxi_df.head(5)
#green_taxi_df.dtypes

Unnamed: 0,lpep_pickup_datetime,lpep_dropoff_datetime,PULocationID,DOLocationID,passenger_count,trip_distance,total_amount,payment_type
447718,2019-12-30 15:12:00,2019-12-30 15:32:00,146,168,,6.15,33.06,
26597,2019-12-03 15:55:53,2019-12-03 16:00:06,41,42,1.0,0.79,7.7,1.0
289737,2019-12-24 18:41:15,2019-12-24 18:46:36,43,238,1.0,1.2,8.8,1.0
258875,2019-12-21 19:54:26,2019-12-21 20:04:53,65,189,1.0,1.83,10.3,2.0
418507,2019-12-18 16:01:00,2019-12-18 17:38:00,237,205,,16.72,57.58,


In [16]:
#read each row from the dataframe and store in the TaxiTrip table
print("Starting Green Taxi Migration")
count = 0
for index, row in green_taxi_df.iterrows():
    pickup_dt = null()
    dropoff_dt = null()
    if not pd.isna(row["lpep_pickup_datetime"]): #check to see if the date is not null or not na
        try: #try/except to avoid error due to date converion from string 2019-12-01 00:05:27
            pickup_dt = dt.datetime.strptime(row["lpep_pickup_datetime"], '%Y-%m-%d %H:%M:%S')
        #break
        except ValueError:
            pickup_dt = null()
            print(f'{row["tpep_pickup_datetime"]}, {index}')
            
    if not pd.isna(row["lpep_dropoff_datetime"]): #check to see if the date is not null or not na
        try: #try/except to avoid error due to date converion from string 2019-12-01 00:05:27
            dropoff_dt = dt.datetime.strptime(row["lpep_dropoff_datetime"], '%Y-%m-%d %H:%M:%S')
        #break
        except ValueError:
            dropoff_dt = null()
            print(f'{row["tpep_dropoff_datetime"]}, {index}')
    
    payment_type = row["payment_type"]
    if pd.isna(row["payment_type"]):
        payment_type = 5
    else:
        payment_type = int(payment_type)
            
    #creating the TaxiTrip object and adding it to the db session
    trip = TaxiTrip(pickup_time=pickup_dt,
                    dropoff_time=dropoff_dt,passenger_count=row["passenger_count"], 
                    trip_distance=row["trip_distance"], pickup_loc_id=row["PULocationID"], 
                    dropoff_loc_id=row["DOLocationID"], total_amount=row["total_amount"],
                    payment_type=payment_type, taxi_type=2)
    session.add(trip)
    count = count + 1
    if count % 10000 == 0:
        print(f'{count}', end="\r")
        session.commit()

if session.is_modified:
    session.commit() #save the session
print("Finished Green Taxi Migration")

Starting Green Taxi Migration
Finished Green Taxi Migration


In [17]:
#import the For Hire Taxi Data csv file into dataframe
file_movie = "raw-data/taxis/fhvhv_tripdata_2019-12.csv"

#columns to import from csv file .. 
col_list = ["hvfhs_license_num","pickup_datetime","dropoff_datetime", "PULocationID","DOLocationID","SR_Flag"]
for_hire_df = pd.read_csv(file_movie,usecols=col_list, low_memory=False)
for_hire_df = for_hire_df.sample(frac=sample_size)
for_hire_df.head(5)
#for_hire_df.dtypes

Unnamed: 0,hvfhs_license_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
3857495,HV0003,2019-12-06 14:41:25,2019-12-06 14:46:28,217,256,
12105533,HV0003,2019-12-16 19:19:58,2019-12-16 19:24:55,61,61,
6458731,HV0003,2019-12-09 17:34:15,2019-12-09 17:58:22,205,216,
15872527,HV0003,2019-12-21 15:35:12,2019-12-21 15:43:14,50,239,
4633217,HV0003,2019-12-07 12:56:35,2019-12-07 13:13:53,68,113,


In [18]:
#read each row from the dataframe and store in the TaxiTrip table
print("Starting For Hire Migration")
count = 0
for index, row in for_hire_df.iterrows():
    pickup_dt = null()
    dropoff_dt = null()
    if not pd.isna(row["pickup_datetime"]): #check to see if the date is not null or not na
        try: #try/except to avoid error due to date converion from string 2019-12-01 00:05:27
            pickup_dt = dt.datetime.strptime(row["pickup_datetime"], '%Y-%m-%d %H:%M:%S')
        #break
        except ValueError:
            pickup_dt = null()
            print(f'{row["pickup_datetime"]}, {index}')
            
    if not pd.isna(row["dropoff_datetime"]): #check to see if the date is not null or not na
        try: #try/except to avoid error due to date converion from string 2019-12-01 00:05:27
            dropoff_dt = dt.datetime.strptime(row["dropoff_datetime"], '%Y-%m-%d %H:%M:%S')
        #break
        except ValueError:
            dropoff_dt = null()
            print(f'{row["dropoff_datetime"]}, {index}')
    
    company = "Other"
    if row["hvfhs_license_num"] == "HV0002":
        company = "Juno"
    elif row["hvfhs_license_num"] == "HV0003":
        company = "Uber"
    elif row["hvfhs_license_num"] == "HV0004":
        company = "Via"
    elif row["hvfhs_license_num"] == "HV0005":
        company = "Lyft"
    
    sharedFlag = 0
    if pd.isna(row["SR_Flag"]):
        sharedFlag = 1
    
    #creating the TaxiTrip object and adding it to the db session
    trip = ForHireTrip(company=company, pickup_time=pickup_dt,dropoff_time=dropoff_dt,
                       pickup_loc_id=row["PULocationID"],dropoff_loc_id=row["DOLocationID"],
                       shared_flag=sharedFlag)
    session.add(trip)
    count = count + 1
    if count % 10000 == 0:
        print(f'{count}', end="\r")
        session.commit()

if session.is_modified:
    session.commit() #save the session
print("Finished For Hire Migration")

Starting For Hire Migration
Finished For Hire Migration
