## Designing the Database

Each citibike file records information about every single trip that was taken during a single month of the year. There are files for each month starting from June 2013. Each citibike file has the same format. The order and the description of the colomns are as follows:
- Trip Duration (seconds): The length of the trip in seconds
- Start Date & Time: The start time of the trip MM-DD-YYYY HH:MM:SS
- End Date & Time: The end time of the trip MM-DD-YYYY HH:MM:SS
- Start Station ID: The ID for the station where the trip started
- Start Station Name: The name of the station where the trip started
- Start Station Latitude: The latitude of the station where the trip started
- Start Station Longitude: The longitude of the station where the trip started
- End Station ID: The ID for the station where the trip ended
- End Station Name: The name of the station where the trip ended
- End Station Latitude: The latitude of the station where the trip ended
- End Station Longitude: The longitude of the station where the trip ended
- Bike ID: The ID for the bike that was used in the trip
- User Type: What type of user took the trip (Subscriber or Customer)
- Gender: The gender of the user (Male - 1, Female - 2, None - 0)
- Year of Birth: The year that the user was born

<img src="./Data/Images/DatabaseDiagramW.png" width="600" height="800" align="center"/>

*Note: If you cannot see the label names try editing the markdown code (double click diagram) and change the src from DatabaseDiagramW.png to DatabaseDiagramB.png

## Connecting to the Database

In [None]:
pip install psycopg2-binary;

In [None]:
import psycopg2

In [None]:
# Put the password in 
PGHOST = 'tripdatabase2.cmaaautpgbsf.us-east-2.rds.amazonaws.com'
PGDATABASE = ''
PGUSER = 'postgres'
PGPASSWORD = 'Josh1234'

In [None]:
# Database Context Manager
try:   
    # Set up a connection to the postgres server.    
    conn = psycopg2.connect(user = PGUSER,
                            port = "5432",
                            password = PGPASSWORD,
                            host = PGHOST,
                            database = PGDATABASE)
    # Create a cursor object
    cursor = conn.cursor()   
    cursor.execute("SELECT version();")
    record = cursor.fetchone()
    print("Connection Success:", record,"\n")

except (Exception, psycopg2.Error) as error:
    print("Error while connecting to PostgreSQL", error)

## Database Construction I - Creating the BayWheels Staging Table

In [None]:
pip install s3fs;

In [None]:
import pandas as pd
import numpy as np
import s3fs
import os
from io import StringIO
import Queries

In [None]:
# The S3 Bucket that will be used to store the data should be created beforehand
ACCESS_KEY_ID = 'AKIARJEUISD2VILSZ6HM'
ACCESS_SECRET_KEY = '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'

fs = s3fs.S3FileSystem(anon=False, key = ACCESS_KEY_ID, secret= ACCESS_SECRET_KEY)

In [None]:
def upload_data(conn, data: pd.DataFrame(), table: str):
    datastream = StringIO()
    cursor = conn.cursor()
    
    data.to_csv(datastream, index=False, header=False)
    datastream.seek(0)
    
    cursor.execute('rollback;')
    cursor.copy_from(datastream,table,sep=',')
    conn.commit()
    
    return None    

In [None]:
staging_schema_query = """CREATE SCHEMA staging;"""
cursor.execute("rollback;")
cursor.execute(staging_schema_query)

In [None]:
bay_filenames = fs.ls("s3://williams-citibike/TripData/BayWheels")

In [None]:
# TAbles module. One function for all the tables. 
bay_staging_query = """
               CREATE TABLE IF NOT EXISTS staging.bay_trip (
                   starttime TIMESTAMP,
                   endtime TIMESTAMP,
                   startID VARCHAR,
                   startname VARCHAR(128),
                   start_lat REAL,
                   start_long REAL,
                   endID VARCHAR,
                   endname VARCHAR(128),
                   end_lat REAL,
                   end_long REAL             
              );
              """
cursor.execute("rollback;")
cursor.execute(bay_staging_query)
conn.commit()

In [None]:
def populate_bay_staging(datafile: str) -> None:
    """Grabs the data from the s3 bucket and edits it so that it can be uploaded to the staging table
    
    Parameters
    ----------
    datafile : str
        The name of a file in the s3 bucket without the s3:// prefix

    Returns
    -------
    None:
        If executed properly the database should now have rows corresponding to the rows in the data
    """
    columns = ['start_time','end_time',
               'start_station_id', 'start_station_name', 
               'start_station_latitude', 'start_station_longitude', 
               'end_station_id', 'end_station_name',
               'end_station_latitude', 'end_station_longitude']


    altcols = ['started_at','ended_at',
               'start_station_id', 'start_station_name',
               'start_lat', 'start_lng',
               'end_station_id', 'end_station_name',
               'end_lat', 'end_lng']
        
    na_fills = {'start_lat': -1,'start_lng': -1,
               'end_lat': -1, 'end_lng': -1}
    
    with fs.open("s3://"+datafile, 'r') as file:
        try:
            data = pd.read_csv(file, usecols = columns, na_values="")[columns]
        except:    
            file.seek(0)
            data = pd.read_csv(file, usecols = altcols, na_values="")[altcols]
            data.fillna(value=na_fills, inplace=True)
        
        #Some stations have commas in their name causing the copy_from to register extra data fields
        data.iloc[:, 3] = data.iloc[:, 3].str.replace(',','_')
        data.iloc[:, 7] = data.iloc[:, 7].str.replace(',','_')
        
        upload_data(conn, data, 'staging.bay_trip')

    print(f"Finished Uploading to Bay Staging Table: {datafile}")
    return None

In [None]:
for file in bay_filenames:
    populate_bay_staging(file)

## Database Construction II - Creating the BlueBike Staging Table

In [None]:
blue_filenames = fs.ls("s3://williams-citibike/TripData/BlueBike")

In [None]:
# TAbles module. One function for all the tables. 
blue_staging_query = """
               CREATE TABLE IF NOT EXISTS staging.blue_trip (
                   starttime TIMESTAMP,
                   endtime TIMESTAMP,
                   startID NUMERIC,
                   startname VARCHAR(128),
                   start_lat REAL,
                   start_long REAL,
                   endID NUMERIC,
                   endname VARCHAR(128),
                   end_lat REAL,
                   end_long REAL              
              );
              """
cursor.execute("rollback;")
cursor.execute(blue_staging_query)
conn.commit()

In [None]:
def populate_blue_staging(datafile: str) -> None:
    """Grabs the data from the s3 bucket and edits it so that it can be uploaded to the staging table
    
    Parameters
    ----------
    datafile : str
        The name of a file in the s3 bucket without the s3:// prefix

    Returns
    -------
    None:
        If executed properly the database should now have rows corresponding to the rows in the data
    """
      
    columns = ['starttime','stoptime',
               'start station id', 'start station name',
               'start station latitude', 'start station longitude',
               'end station id', 'end station name',
               'end station latitude', 'end station longitude']
    
    with fs.open("s3://"+datafile, 'r') as file:
        data = pd.read_csv(file, usecols=columns, na_values = "")[columns]
        
        data.iloc[:, 3] = data.iloc[:, 3].str.replace(',','_')
        data.iloc[:, 7] = data.iloc[:, 7].str.replace(',','_')
        
        upload_data(conn,data,'staging.blue_trip')
    
    print(f"Finished Uploading to Blue Staging Table: {datafile}")
    return None

In [None]:
# Data starts from 2015, any data before data doesn't have location data
for file in blue_filenames[5:]:
    populate_blue_staging(file)

## Database Construction III - Creating the Capital Staging Table

In [None]:
capital_filenames = fs.ls("s3://williams-citibike/TripData/CapitalBike")
capital_filenames = fs.ls("s3://williams-citibike/TripData/CaptialBike") # Delete this after re-running notebook 0

In [None]:
# TAbles module. One function for all the tables. 
capital_staging_query = """
               CREATE TABLE IF NOT EXISTS staging.capital_trip (
                   starttime TIMESTAMP,
                   endtime TIMESTAMP,
                   startID NUMERIC,
                   startname VARCHAR(128),
                   start_lat REAL,
                   start_long REAL,
                   endID NUMERIC,
                   endname VARCHAR(128),
                   end_lat REAL,
                   end_long REAL              
              );
              """
cursor.execute("rollback;")
cursor.execute(capital_staging_query)
conn.commit()

In [None]:
def populate_capital_staging(datafile: str) -> None:
    """Grabs the data from the s3 bucket and edits it so that it can be uploaded to the staging table
    
    Parameters
    ----------
    datafile : str
        The name of a file in the s3 bucket without the s3:// prefix

    Returns
    -------
    None:
        If executed properly the database should now have rows corresponding to the rows in the data
    """
    
    columns = ['Start date', 'End date',
               'Start station number', 'Start station',
               'End station number', 'End station']
    
    altcolumns = ['started_at','ended_at',
                  'start_station_id', 'start_station_name',
                  'start_lat', 'start_lng',
                  'end_station_id', 'end_station_name',
                  'end_lat', 'end_lng']
    
    with fs.open("s3://"+datafile, 'r') as file:
        try:   
            data = pd.read_csv(file, usecols=columns, na_values = "")[columns]
            data.insert(4,'start_lat', -1)
            data.insert(5,'start_lng',-1)

            data.insert(8,'end_lat', -1)
            data.insert(9,'end_lng',-1)
        except:
            file.seek(0)
            data = pd.read_csv(file, usecols=altcolumns, na_values = "")[altcolumns]
            data.fillna({'start_station_id': -1, 'end_station_id':-1, 
                         'start_lat': -1, 'start_lng': -1,
                         'end_lat': -1, 'end_lng': -1}, inplace=True)
        
        data.iloc[:, 3] = data.iloc[:, 3].str.replace(',','_')
        data.iloc[:, 7] = data.iloc[:, 7].str.replace(',','_')

        upload_data(conn,data,'staging.capital_trip')
    
    print(f"Finished Uploading to Blue Staging Table: {datafile}")
    return None

In [None]:
for file in capital_filenames:
    populate_capital_staging(file)


## Database Construction IV - Creating the CitiBike Staging Table

In [None]:
citi_filenames = fs.ls("s3://williams-citibike/TripData/CitiBike")

Get rid of bikeID:gender

In [None]:
# TAbles module. One function for all the tables. 
citi_staging_query = """
               CREATE TABLE IF NOT EXISTS staging.citi_trip (
                   tripduration NUMERIC, 
                   starttime TIMESTAMP,
                   endtime TIMESTAMP,
                   startID NUMERIC,
                   startname VARCHAR(128),
                   start_lat REAL,
                   start_long REAL,
                   endID NUMERIC,
                   endname VARCHAR(128),
                   end_lat REAL,
                   end_long REAL              
              );
              """
cursor.execute("rollback;")
cursor.execute(citi_staging_query)
conn.commit()

In [None]:
def populate_citi_staging(datafile: str) -> None:
    """Grabs the data from the s3 bucket and edits it so that it can be uploaded to the staging table
    
    Parameters
    ----------
    datafile : str
        The name of a file in the s3 bucket without the s3:// prefix

    Returns
    -------
    None:
        If executed properly the database should now have rows corresponding to the rows in the data
    """
       
    with fs.open("s3://"+datafile, 'r') as file:
        data = pd.read_csv(file, na_values ="", usecols=list(range(0,11)))   # Can't use the C engine to speed this up
        data.fillna(-1, inplace=True)   # Empty spaces need to be integers for birthyear REAL type in database
        
        #Some stations have commas in their name causing the copy_from to register extra data fields
        data.iloc[:, 4] = data.iloc[:, 4].str.replace(',','_')
        data.iloc[:, 8] = data.iloc[:, 8].str.replace(',','_')
        
        data.iloc[:, 3] = data.iloc[:, 3].astype('int32')
        data.iloc[:, 7] = data.iloc[:, 7].astype('int32')
        
        upload_data(conn,data,'staging.citi_trip')
        
    print(f"Finished Uploading to Citi Staging Table: {datafile}")
    return None

In [None]:
for file in citi_filenames:
    populate_citi_staging(file)

## Database Construction V - Creating the Divvy Staging Table

In [None]:
divvy_filenames = fs.ls("s3://williams-citibike/TripData/DivvyBike")

In [None]:
# TAbles module. One function for all the tables. 
divvy_staging_query = """
               CREATE TABLE IF NOT EXISTS staging.divvy_trip (
                   starttime TIMESTAMP,
                   endtime TIMESTAMP,
                   startID NUMERIC,
                   startname VARCHAR(128),
                   start_lat REAL,
                   start_long REAL,
                   endID NUMERIC,
                   endname VARCHAR(128),
                   end_lat REAL,
                   end_long REAL             
              );
              """
cursor.execute("rollback;")
cursor.execute(divvy_staging_query)
conn.commit()

In [None]:
def populate_divvy_staging(datafile: str) -> None:
    """Grabs the data from the s3 bucket and edits it so that it can be uploaded to the staging table
    
    Parameters
    ----------
    datafile : str
        The name of a file in the s3 bucket without the s3:// prefix

    Returns
    -------
    None:
        If executed properly the database should now have rows corresponding to the rows in the data
    """
    
    columns = ['started_at', 'ended_at',
               'start_station_id', 'start_station_name',
               'start_lat', 'start_lng',
               'end_station_id', 'end_station_name',
               'end_lat', 'end_lng']
    
    altcolumns = ['starttime', 'stoptime',
                  'from_station_id', 'from_station_name',
                  'to_station_id','to_station_name']
    
    alt3 = ['start_time', 'end_time',
            'from_station_id', 'from_station_name',
            'to_station_id','to_station_name']
    
    names = ['starttime', 'endtime','startid','startname','endid','endname']
    
    with fs.open("s3://"+datafile, 'r') as file:
        try:
            data = pd.read_csv(file, usecols=columns, na_values="", parse_dates=[0,1])[columns]
            data.fillna({'start_station_id': -1, 'end_station_id':-1, 
                         'start_lat': -1, 'start_lng': -1,
                         'end_lat': -1, 'end_lng': -1}, inplace=True)            
        except ValueError:
            file.seek(0)
            try:
                data = pd.read_csv(file, usecols=altcolumns, na_values = "", parse_dates=[0,1])[altcolumns]
                data.columns = names
            except ValueError:
                file.seek(0)
                try:
                    data = pd.read_csv(file, usecols=alt3, na_values = "", parse_dates=[0,1])[alt3]
                    data.columns = names
                except:
                    file.seek(0)
                    data = pd.read_csv(file, usecols=[1,2,5,6,7,8], na_values="", parse_dates=[0,1])
                    data.columns = names
        
            data.insert(4,'start_lat', -1)
            data.insert(5,'start_lng',-1)

            data.insert(8,'end_lat', -1)
            data.insert(9,'end_lng',-1)
            
            data.fillna({'startid': -1, 'endidd':-1}, inplace=True)

        data.iloc[:, 3] = data.iloc[:, 3].str.replace(',','_')
        data.iloc[:, 7] = data.iloc[:, 7].str.replace(',','_')
        
        
        upload_data(conn,data,'staging.divvy_trip')
        
        
    print(f"Finished Uploading to Divvy Staging Table: {datafile}")
    return None

In [None]:
for file in divvy_filenames:
    populate_divvy_staging(file)

## Preparing the Station Table I - Querying from the Database

In [15]:
pip install geopandas

Collecting geopandas
  Downloading geopandas-0.8.2-py2.py3-none-any.whl (962 kB)
[K     |████████████████████████████████| 962 kB 6.2 MB/s eta 0:00:01
[?25hCollecting fiona
  Downloading Fiona-1.8.18-cp37-cp37m-manylinux1_x86_64.whl (14.8 MB)
[K     |████████████████████████████████| 14.8 MB 8.6 MB/s eta 0:00:01
[?25hCollecting pyproj>=2.2.0
  Downloading pyproj-3.0.0.post1-cp37-cp37m-manylinux2010_x86_64.whl (6.4 MB)
[K     |████████████████████████████████| 6.4 MB 60.2 MB/s eta 0:00:01
[?25hCollecting shapely
  Downloading Shapely-1.7.1-cp37-cp37m-manylinux1_x86_64.whl (1.0 MB)
[K     |████████████████████████████████| 1.0 MB 63.8 MB/s eta 0:00:01
Collecting munch
  Downloading munch-2.5.0-py2.py3-none-any.whl (10 kB)
Collecting cligj>=0.5
  Downloading cligj-0.7.1-py3-none-any.whl (7.1 kB)
Collecting click-plugins>=1.0
  Downloading click_plugins-1.1.1-py2.py3-none-any.whl (7.5 kB)
Installing collected packages: munch, cligj, click-plugins, fiona, pyproj, shapely, geopandas
S

In [16]:
import geopandas as gpd
import shapely

In [101]:
# Endid has more distinct values than startid
# Tables module
bay_station_query = """
        SELECT DISTINCT ON(endid) endid, endname, end_lat, end_long 
          FROM staging.bay_trip 
         ORDER BY endid;
        """

In [102]:
bay_station = pd.read_sql(bay_station_query, conn) # Expect long execution times

In [103]:
def drop_decimal(x):
    """Drops the .0 from a string, if it has it"""
    if x.endswith('.0'):
        return(x[:-2])
    else: return x

In [104]:
bay_station['endid'] = bay_station.endid.apply(drop_decimal)

In [None]:
bay_station.drop_duplicates(inplace=True)
bay_station = bay_station.set_index('endid').drop(['449','420', '408','484']).reset_index()

In [120]:
bay_spatial = gpd.GeoDataFrame(bay_station, geometry=gpd.points_from_xy(bay_station.end_long, bay_station.end_lat), crs="EPSG:4326")

In [121]:
stations_schema_query = """CREATE SCHEMA IF NOT EXISTS stations;"""
cursor.execute("rollback;")
cursor.execute(stations_schema_query)

In [122]:
# Tables module
bay_station_query = """
               CREATE TABLE IF NOT EXISTS stations.bay_station (
                   stationID VARCHAR,
                   name VARCHAR(64) NOT NULL,
                   latitude REAL,
                   longitude REAL,
                   geometry GEOGRAPHY(POINT,4326) NOT NULL
                );
                
                """
cursor.execute("rollback;")
cursor.execute(bay_station_query)
conn.commit()

In [123]:
def database_upload(conn, geodf, table):
    cursor = conn.cursor()
    
    stream = StringIO()
    geodf.to_csv(stream, sep='\t', index=False, header=False)
    stream.seek(0)
    
    cursor.copy_from(stream, table, sep='\t')
    conn.commit()
    
    return None

In [124]:
database_upload(conn, bay_spatial, 'stations.bay_station')