In [51]:
from __future__ import absolute_import, division, print_function

# Get data into SQL database

In [52]:
import os
import json
import pandas as pd
import glob
import numpy as np
import sys
import time
import copy

## Python packages - you may have to pip install sqlalchemy, sqlalchemy_utils, and psycopg2.
from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database
import psycopg2

# Open connection to server and create database 

In [53]:
#In Python: Define a database name (we're using a dataset on births, so I call it 
# birth_db), and your username and password used above. 
dbname = 'citibike_db'
#username = 'username'
#pswd = 'password'

## 'engine' is a connection to a database
## Here, we're using postgres, but sqlalchemy can connect to other things too.
#engine = create_engine('postgresql://%s:%s@localhost/%s'%(username,pswd,dbname))
engine = create_engine('postgresql://@localhost/%s'%(dbname))

print(engine.url)
# Replace localhost with IP address if accessing a remote server

## create a database (if it doesn't exist)
if not database_exists(engine.url):
    create_database(engine.url)
print(database_exists(engine.url))

# Connect to an existing database
conn = psycopg2.connect("dbname=citibike_db")

# Open a cursor to perform database operations
cur = conn.cursor()


postgresql://@localhost/citibike_db
True


# Prepare schema and create empty table

## First, assert that the columns are all the same

In [124]:
# Locate the files
data_dir = os.path.join(os.getcwd(),'..','..','data','citibike')
files = glob.glob(os.path.join(data_dir,'*trip*.csv'))
print('Number of files: '+str(len(files)))

Number of files: 38


In [55]:
headers = {}
for idx,tmp in enumerate(files):
    df = pd.read_csv(tmp,nrows=5)
    # Changes to header after first run (see 'Note' below)
    df.columns = [x.lower() for x in df.columns]
    df.columns = ['bikeid' if x == 'bike id' else x for x in df.columns]
    df.columns = ['usertype' if x == 'user type' else x for x in df.columns]
    df.columns = ['tripduration' if x == 'trip duration' else x for x in df.columns]
    df.columns = ['starttime' if x == 'start time' else x for x in df.columns]
    df.columns = ['stoptime' if x == 'stop time' else x for x in df.columns]
    # End changes
    df.columns = [x.replace(' ','_') for x in df.columns]
    headers[idx] = df.columns.tolist()
    if idx > 0:
        # Assert that column labels match
        try:
            assert(headers[idx] == headers[0])
        except AssertionError:
            print('Mismatch in header for file with index {}'.format(idx))
                        

Note:

On first run I got the following errors:
    
    Mismatch in header for file with index 35
    Mismatch in header for file with index 36
    Mismatch in header for file with index 37
    
The variations were in captitalization of some column names as well as contractions (e.g., 'bikeid' vs 'Bike ID').
Add appropriate changes to the code section that reads the data frames from the csv files.

## Use header to create schema

In [56]:
for head in headers[0]:
    print(head)

tripduration
starttime
stoptime
start_station_id
start_station_name
start_station_latitude
start_station_longitude
end_station_id
end_station_name
end_station_latitude
end_station_longitude
bikeid
usertype
birth_year
gender


In [57]:
# Hardcode format
# check here for varchar vs text
# http://stackoverflow.com/questions/4848964/postgresql-difference-between-text-and-varchar-character-varying

postgre_schema = {
    'tripduration': 'integer' ,
    'starttime': 'timestamp',
    'stoptime': 'timestamp',
    'start_station_id': 'integer' ,
    'start_station_name': 'text',
    'start_station_latitude': 'double precision',
    'start_station_longitude': 'double precision',
    'end_station_id': 'integer' ,
    'end_station_name': 'text',
    'end_station_latitude': 'double precision',
    'end_station_longitude': 'double precision',
    'bikeid': 'integer' ,
    'usertype': 'text',
    'birth_year': 'integer' ,
    'gender': 'text'}


In [58]:
# Add Primary key
postgre_schema['pk'] = 'serial PRIMARY KEY'

In [59]:
# Turn into ordered list with fixed length columns first, leading with primary key
# http://stackoverflow.com/questions/12604744/does-the-order-of-columns-in-a-postgres-table-impact-performance
postgre_schema_list = []
# Primary key first
postgre_schema_list += ([(k, postgre_schema[k]) 
                         for k in postgre_schema.keys() if k == 'pk'])
postgre_schema_list += ([(k, postgre_schema[k]) 
                         for k in postgre_schema.keys() if postgre_schema[k] == 'integer'])
postgre_schema_list += ([(k, postgre_schema[k]) 
                         for k in postgre_schema.keys() if postgre_schema[k] == 'bigint'])
postgre_schema_list += ([(k, postgre_schema[k]) 
                         for k in postgre_schema.keys() if postgre_schema[k] == 'double precision'])
postgre_schema_list += ([(k, postgre_schema[k]) 
                         for k in postgre_schema.keys() if postgre_schema[k] == 'timestamp'])
postgre_schema_list += ([(k, postgre_schema[k]) 
                         for k in postgre_schema.keys() if postgre_schema[k] == 'text'])
# Check that everything is still there
if len(postgre_schema_list) == len(postgre_schema.keys()):
    print('All is well!')
else:
    print('You forgot something!')



All is well!


In [60]:
print(postgre_schema_list)

[('pk', 'serial PRIMARY KEY'), ('end_station_id', 'integer'), ('tripduration', 'integer'), ('bikeid', 'integer'), ('start_station_id', 'integer'), ('birth_year', 'integer'), ('start_station_latitude', 'double precision'), ('start_station_longitude', 'double precision'), ('end_station_latitude', 'double precision'), ('end_station_longitude', 'double precision'), ('stoptime', 'timestamp'), ('starttime', 'timestamp'), ('gender', 'text'), ('usertype', 'text'), ('start_station_name', 'text'), ('end_station_name', 'text')]


In [61]:
# Build 'CREATE TABLE; string
tmp_str = ''
for idx, column in enumerate(postgre_schema_list):
    tmp_str = tmp_str+column[0]+' '+column[1]
    if idx < len(postgre_schema_list)-1:
        tmp_str = tmp_str+', '
str_create_table = "CREATE TABLE citibike_all ("+tmp_str+");"
print(str_create_table)

CREATE TABLE citibike_all (pk serial PRIMARY KEY, end_station_id integer, tripduration integer, bikeid integer, start_station_id integer, birth_year integer, start_station_latitude double precision, start_station_longitude double precision, end_station_latitude double precision, end_station_longitude double precision, stoptime timestamp, starttime timestamp, gender text, usertype text, start_station_name text, end_station_name text);


In [62]:
# Execute a command: this creates a new table
cur.execute(str_create_table)



In [63]:
# Make the changes to the database persistent
conn.commit()

# Read data and write to SQL table

In [64]:
# Locate the files
files = glob.glob(os.path.join(data_dir,'*trip*.csv'))
print('Number of files: '+str(len(files)))

Number of files: 38


In [88]:
# Read data and check contents
for idx,tmp in enumerate(files):
    if idx == 0:
        df = pd.read_csv(tmp)
        # Changes to header after first run (see 'Note' below)
        df.columns = [x.lower() for x in df.columns]
        df.columns = ['bikeid' if x == 'bike id' else x for x in df.columns]
        df.columns = ['usertype' if x == 'user type' else x for x in df.columns]
        df.columns = ['tripduration' if x == 'trip duration' else x for x in df.columns]
        df.columns = ['starttime' if x == 'start time' else x for x in df.columns]
        df.columns = ['stoptime' if x == 'stop time' else x for x in df.columns]
        # End changes
        df.columns = [x.replace(' ','_') for x in df.columns]
        df['starttime'] = df['starttime'].apply(pd.to_datetime)
        df['stoptime'] = df['stoptime'].apply(pd.to_datetime)


In [89]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 843416 entries, 0 to 843415
Data columns (total 15 columns):
tripduration               843416 non-null int64
starttime                  843416 non-null datetime64[ns]
stoptime                   843416 non-null datetime64[ns]
start_station_id           843416 non-null int64
start_station_name         843416 non-null object
start_station_latitude     843416 non-null float64
start_station_longitude    843416 non-null float64
end_station_id             843416 non-null int64
end_station_name           843416 non-null object
end_station_latitude       843416 non-null float64
end_station_longitude      843416 non-null float64
bikeid                     843416 non-null int64
usertype                   843416 non-null object
birth_year                 843416 non-null object
gender                     843416 non-null int64
dtypes: datetime64[ns](2), float64(4), int64(5), object(4)
memory usage: 96.5+ MB


In [94]:
# Check for non-integer strings in 'birth_year' column
# (This caused an error during SQL commit below)
for i in range(df.shape[0]):
    try:
        int(df['birth_year'][i])
    except:
        print(df['birth_year'][i])
        break


\N


In [95]:
# Replace string with numeric string, assuming it is alwys the same
df['birth_year'] = df['birth_year'].replace('\\N', '9999')


In [96]:
# Check !
for i in range(df.shape[0]):
    tmp = df['birth_year'][i]
    try:
        int(tmp)
    except:
        print(tmp)
        break


In [97]:
# Convert to numeric
df['birth_year'] = df['birth_year'].apply(pd.to_numeric)

In [122]:
# Commit data to SQL data base

start_time_overall = time.time()

for idx,tmp in enumerate(files):
    if idx > 22:
        df = pd.read_csv(tmp)
        # Changes to header after first run (see 'Note' below)
        df.columns = [x.lower() for x in df.columns]
        df.columns = ['bikeid' if x == 'bike id' else x for x in df.columns]
        df.columns = ['usertype' if x == 'user type' else x for x in df.columns]
        df.columns = ['tripduration' if x == 'trip duration' else x for x in df.columns]
        df.columns = ['starttime' if x == 'start time' else x for x in df.columns]
        df.columns = ['stoptime' if x == 'stop time' else x for x in df.columns]
        # End changes
        df.columns = [x.replace(' ','_') for x in df.columns]
        # For a sizeable number of entries no birth year is available
        df['birth_year'] = df['birth_year'].replace('\\N', '9999') 
        df['birth_year'] = df['birth_year'].apply(pd.to_numeric)
        print("Time to read table and edit data frame.")
        print("--- %s seconds ---" % (time.time() - start_time_overall))
        # Write to sql table
        start_time_sql = time.time()
        df.to_sql('citibike_all', engine, if_exists='append', index=False)
        print("Time to write data to SQL.")
        print("--- %s seconds ---" % (time.time() - start_time_sql))

print("Time overall.")
print("--- %s seconds ---" % (time.time() - start_time_overall))    

Time to read table and edit data frame.
--- 9.15958881378 seconds ---
Time to write data to SQL.
--- 457.856747866 seconds ---
Time to read table and edit data frame.
--- 473.324411869 seconds ---
Time to write data to SQL.
--- 338.643971205 seconds ---
Time to read table and edit data frame.
--- 817.442117929 seconds ---
Time to write data to SQL.
--- 287.003496885 seconds ---
Time to read table and edit data frame.
--- 1107.85331702 seconds ---
Time to write data to SQL.
--- 166.339803934 seconds ---
Time to read table and edit data frame.
--- 1277.90901804 seconds ---
Time to write data to SQL.
--- 186.093530178 seconds ---
Time to read table and edit data frame.
--- 1469.97522092 seconds ---
Time to write data to SQL.
--- 304.94830513 seconds ---
Time to read table and edit data frame.
--- 1781.44838691 seconds ---
Time to write data to SQL.
--- 336.165132999 seconds ---
Time to read table and edit data frame.
--- 2125.41932797 seconds ---
Time to write data to SQL.
--- 401.6403079

# 

# More commands for interacting with the database

In [None]:
# rollback transaction that created errors
# conn.rollback()

In [None]:
# Delete entire table from db
# cur.execute("DROP TABLE tablename;")

In [None]:
# Make the changes to the database persistent
# conn.commit()