In [110]:
# Problem 1
import requests
from bs4 import BeautifulSoup
def fetch_trip_data():
    url = "https://s3.amazonaws.com/baywheels-data/"
    document = requests.get(url)
    soup= BeautifulSoup(document.content,"lxml-xml")
    monthlist = []
        # extract all text embedded in the key tag as that is where the file name is located and store in a list 
    for x in soup.find_all("Key"):
        #exclude the index file from the list 
        if x.text != 'index.html':
            monthlist.append(x.text)
    # create a list of the last 6 file names scraped from the websites 
    last_six_months = monthlist[-6:]



['202106-baywheels-tripdata.csv.zip', '202107-baywheels-tripdata.csv.zip', '202108-baywheels-tripdata.csv.zip', '202109-baywheels-tripdata.csv.zip', '202110-baywheels-tripdata.csv.zip', '202111-baywheels-tripdata.csv.zip']
['202112-baywheels-tripdata.csv.zip', '202201-baywheels-tripdata.csv.zip', '202202-baywheels-tripdata.csv.zip', '202203-baywheels-tripdata.csv.zip', '202204-baywheels-tripdata.csv.zip', '202205-baywheels-tripdata.csv.zip']


In [6]:
import os
# Get the current working directory
cwd = os.getcwd()
# Print the current working directory 
print("Current working directory: {0}".format(cwd))

Current working directory: /Users/luislopez/Documents


In [111]:
import zipfile
import io
for c in twelve_months:
    csv_url = 'https://s3.amazonaws.com/baywheels-data/'+ c
    r = requests.get(csv_url, stream =True)
    check = zipfile.is_zipfile(io.BytesIO(r.content))
    #check that the file is a zipfile 
    # uncompress and store zip files into local drive 
    if check:
        z = zipfile.ZipFile(io.BytesIO(r.content))
        z.extractall()
        

In [173]:
#problem 2
import pandas as pd

unzipped_last_six_months= []
    # remove .zip extension from file name
for z in last_six_months:
    
    unzipped_last_six_months.append(z.replace(".zip",""))
# create pandas dataframe with combined data from the 6 csv files 
combined_df = (pd.read_csv(f) for f in unzipped_last_six_months)
combined_df = pd.concat(combined_df)

In [174]:
combined_df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 1116107 entries, 0 to 232952
Data columns (total 13 columns):
 #   Column              Non-Null Count    Dtype  
---  ------              --------------    -----  
 0   ride_id             1116107 non-null  object 
 1   rideable_type       1116107 non-null  object 
 2   started_at          1116107 non-null  object 
 3   ended_at            1116107 non-null  object 
 4   start_station_name  937971 non-null   object 
 5   start_station_id    937971 non-null   object 
 6   end_station_name    913544 non-null   object 
 7   end_station_id      913544 non-null   object 
 8   start_lat           1116107 non-null  float64
 9   start_lng           1116107 non-null  float64
 10  end_lat             1114821 non-null  float64
 11  end_lng             1114821 non-null  float64
 12  member_casual       1116107 non-null  object 
dtypes: float64(4), object(9)
memory usage: 119.2+ MB


In [175]:
# convert the started at and ended at fields to datetime types 
combined_df.started_at = pd.to_datetime(combined_df.started_at)
combined_df.ended_at = pd.to_datetime(combined_df.ended_at)

In [176]:
combined_df.nunique()
#check for any red flags pertaining to unique counts of fields
# discrepancy between the start/end station and the start/end station id 

ride_id               1116107
rideable_type               3
started_at            1050175
ended_at              1049374
start_station_name        505
start_station_id          499
end_station_name          506
end_station_id            500
start_lat              209987
start_lng              232130
end_lat                 24639
end_lng                 25124
member_casual               2
dtype: int64

In [27]:
# check for any null value count of fields 
combined_df.isnull().sum()

ride_id                    0
rideable_type              0
started_at                 0
ended_at                   0
start_station_name    178136
start_station_id      178136
end_station_name      202563
end_station_id        202563
start_lat                  0
start_lng                  0
end_lat                 1286
end_lng                 1286
member_casual              0
dtype: int64

In [178]:
#remove entries that have null values for start station or end station
combined_df = combined_df[combined_df['start_station_id'].notnull()]
combined_df = combined_df[combined_df['end_station_id'].notnull()]
#check that rows were removed 
combined_df.isnull().sum()
# appears that rows with null values in ending latitude and longitude also all had null values for either start station
# or end station 

ride_id               0
rideable_type         0
started_at            0
ended_at              0
start_station_name    0
start_station_id      0
end_station_name      0
end_station_id        0
start_lat             0
start_lng             0
end_lat               0
end_lng               0
member_casual         0
dtype: int64

In [230]:
#check for duplicates 
combined_df.duplicated().sum()

0

In [181]:
# create a trip_duration field that stores the calculated duration of trip in order to find any inconsistencies
combined_df["trip_duration"] = combined_df["ended_at"] - combined_df["started_at"]

In [183]:
#filter out entries that have a trip duration of under a minute as they are likely user error and their inclusion could
# affect calculations such as avg trip duration 
combined_df = combined_df[combined_df["trip_duration"]> pd.Timedelta(1,'m')]


In [186]:
#select start station name for ids with multiple names depending on what entry is the most recent 
# this will resolve the start/end station name and start/end station id unique count discrepancy
consolidated_start_station= combined_df.sort_values('started_at').groupby('start_station_id').tail(1)[['start_station_id','start_station_name']]
# do same for end stations
consolidated_end_station= combined_df.sort_values('ended_at').groupby('end_station_id').tail(1)[['end_station_id','end_station_name']]



In [187]:
# drop columns that were used to assist in data cleaning and were not in original dataset
#also remove end station name and start station name as they will not be in main sql table 
cleaned_combined_df = combined_df.drop(['start_station_name', 'end_station_name',"trip_duration"], axis = 1)

In [189]:
# save cleaned data to csv files that will be used to be pushed to their respective SQL tables
consolidated_end_station.to_csv('combined_end_station.csv', index=False)
consolidated_start_station.to_csv('combined_start_station.csv', index=False)
cleaned_combined_df.to_csv('combined_trip_data.csv', index=False)

In [195]:
# Problem 4
import psycopg2
conn = psycopg2.connect(
   database="postgres", user='newuser', password='password', host='localhost', port= '5432'
)
cur = conn.cursor()
def load_start_station():
    try:
        
        with open('combined_start_station.csv', 'r') as f:
            next(f) # Skip the header row.
            cur.copy_from(f, 'start_station', sep=',')
            conn.commit()
            conn.close()
            
    except (Exception, psycopg2.Error) as error:
        print("Error pushing data to start_station table", error)

    finally:
        # closing database connection.
        if conn:
            cur.close()
            conn.close()
            print("PostgreSQL connection is closed")
        
def load_end_station():
   
    try:
        with open('combined_end_station.csv', 'r') as f:     
            next(f) # Skip the header row.
            cur.copy_from(f, 'end_station', sep=',')
            conn.commit()  
            conn.close()
            
    except (Exception, psycopg2.Error) as error:
        print("Error pushing data to end_station table", error)

    finally:
        # closing database connection.
        if conn:
            cur.close()
            conn.close()
            print("PostgreSQL connection is closed")
def trip_data():
    
    try:
        with open('combined_trip_data.csv', 'r') as f:
        
            next(f) # Skip the header row.
            cur.copy_from(f, 'trip_data', sep=',')
            conn.commit()  
            conn.close()
    except (Exception, psycopg2.Error) as error:
        print("Error pushing data to trip_data table", error)

    finally:
        # closing database connection.
        if conn:
            cur.close()
            conn.close()
            print("PostgreSQL connection is closed")
        

In [235]:
# code used to create tables 
import psycopg2
#Establishing the connection
conn = psycopg2.connect(
   database="postgres", user='newuser', password='password', host='localhost', port= '5432'
)
#Creating a cursor object using the cursor() method
cursor = conn.cursor()

#Doping EMPLOYEE table if already exists.
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE")

#Creating table as per requirement
sql ='''
CREATE TABLE start_station (
    start_station_id text PRIMARY KEY,
    start_station_name TEXT

);
CREATE TABLE end_station (
    end_station_id text PRIMARY KEY,
    end_station_name TEXT

);
CREATE TABLE trip_data (
  ride_id char(16) PRIMARY KEY,
  rideable_type TEXT,
  started_at TIMESTAMP ,
  ended_at TIMESTAMP ,
  start_station_id TEXT REFERENCES start_station (start_station_id),
  end_station_id TEXT REFERENCES end_station (end_station_id),
  start_lat NUMERIC ,
  start_lng NUMERIC,
  end_lat NUMERIC,
  end_lng NUMERIC,
  member_casual TEXT
  
);
CREATE INDEX index_date_range ON trip_data(started_at,ended_at);
CREATE INDEX index_start_station ON trip_data(start_station_id);
CREATE INDEX index_end_station ON trip_data(end_station_id);
'''
cursor.execute(sql)
print("Table created successfully........")
conn.commit()
#Closing the connection
conn.close()

DuplicateTable: relation "start_station" already exists


Design and implement an ETL process that watches the source website for new data and
automatically downloads, cleans and loads the new data into the Postgres database.
Show how you handle errors (e.g., connection drops while downloading the data,
download is corrupted) and any re-try logic. (A, C)


In [None]:
import requests
try:
    # try to request to url with a timeout limit added as a parameter 
    etl_process = requests.get(csv_url,timeout=.05)
except requests.ConnectionError as e:
    print("Connection Error. ")
    print(str(e))            
    
except requests.Timeout as e:
    print("Timeout Error")
    print(str(e))
    
except requests.RequestException as e:
    print("General Error")
    print(str(e))
    

 the main portion of the etl process is below(NOTE: the exception handling and retry logic has not been implemented.)

In [124]:
import zipfile
import io
import pandas as pd
import psycopg2

# the list of existing files downloaded from the website
global_list= []
def get_new_trip_data_file_names(existing):
    newfiles= []
    url = "https://s3.amazonaws.com/baywheels-data/"
    document = requests.get(url)
    soup= BeautifulSoup(document.content,"lxml-xml")
    monthlist = []
        # extract all text embedded in the key tag as that is where the file name is located and store in a list 
    for x in soup.find_all("Key"):
        #exclude the index file from the list and check if there is any new file in the website
        if x.text != 'index.html' and x.text not in existing:
            # make the global_list variable global so it can add any new files that will be processed into the list of
            # existing files 
            global global_list
            global_list.append(x.text)
            newfiles.append(x.text)
            
        # if there are newfile, proceed to downloading and processing them 
    if len(newfiles)>0:
        print("new data found")
        #process_zip_files(newfiles)
    else:
        print("no new data file found")
        
    # download and unzip the new files that were found into local drive 
def process_zip_files(zip_file_list):
    unzipped_file_list= []
    #create list of files that are successfully unzipped and extracted that
    for c in zip_file_list:
        url = 'https://s3.amazonaws.com/baywheels-data/'+ c
        r = requests.get(url, stream =True)
        check = zipfile.is_zipfile(io.BytesIO(r.content))
        #check that the file is a zipfile 
        if check:
            z = zipfile.ZipFile(io.BytesIO(r.content))
            # unzip the zip file 
            z.extractall()
            unzipped_file_list.append(c)
    
    #remove zip extension from file name
    for f in unzipped_file_list:
            unzipped_file_list.append(f.replace(".zip",""))
    clean_new_data(unzipped_file_list)
    
def get_end_station():
   
    try:
        
        conn = psycopg2.connect(database="postgres", user='newuser', password='password', host='localhost', port= '5432')
        cur = conn.cursor()
        postgreSQL_select_Query = "select * from end_station"
        cur.execute(postgreSQL_select_Query) 
        end_station_query = cur.fetchall()      
    except (Exception, psycopg2.Error) as error:
        print("Error pushing data to end_station table", error)

    finally:
        # closing database connection.
        if conn:
            cur.close()
            conn.close()
            
            return end_station_query

def get_start_station():
   
    try:
        
        conn = psycopg2.connect(database="postgres", user='newuser', password='password', host='localhost', port= '5432')
        cur = conn.cursor()
        postgreSQL_select_Query = "select * from start_station"
        cur.execute(postgreSQL_select_Query) 
        start_station_query = cur.fetchall()      
    except (Exception, psycopg2.Error) as error:
        print("Error pushing data to start_station table", error)

    finally:
        # closing database connection.
        if conn:
            cur.close()
            conn.close()
            return start_station_query 
    
    
def clean_new_data(unzipped_file_list):    
    new_combined_df = (pd.read_csv(f) for f in unzipped_file_list)
    new_combined_df = pd.concat(new_combined_df)
    
    # convert the started at and ended at fields to datetime types 
    new_combined_df.started_at = pd.to_datetime(new_combined_df.started_at)
    new_combined_df.ended_at = pd.to_datetime(new_combined_df.ended_at)
    
    #remove entries that have null values for start station or end station
    new_combined_df = new_combined_df[new_combined_df['start_station_id'].notnull()]
    new_combined_df = new_combined_df[new_combined_df['end_station_id'].notnull()]
    
    # create a trip_duration field that stores the calculated duration of trip in order to find any inconsistencies
    new_combined_df["trip_duration"] = new_combined_df["ended_at"] - new_combined_df["started_at"]
    
    #filter out entries that have a trip duration of under a minute as they are likely user error and their inclusion could
    # effect calculations such as avg trip duration 
    new_combined_df = new_combined_df[new_combined_df["trip_duration"]> pd.Timedelta(1,'m')]
    cleaned_new_combined_df = new_combined_df.drop(['start_station_name', 'end_station_name',"trip_duration"], axis = 1)
    
    #select start station name for ids with multiple names depending on what entry is the most recent 
    new_start_station= new_combined_df.sort_values('started_at').groupby('start_station_id').tail(1)[['start_station_id','start_station_name']]
    # do same for end stations
    new_end_station= new_combined_df.sort_values('ended_at').groupby('end_station_id').tail(1)[['end_station_id','end_station_name']]
    
    
    # read existing list of unique end station names/ids and right join with list obtained from new data 
      
    check_df_end = pd.DataFrame(get_end_station())
    df_end_station_join = check_df_end.merge(new_end_station.drop_duplicates(), on='end_station_id', 
                   how='right', indicator=True)

    df_end_station_join= df_end_station_join[df_end_station_join['_merge'] == 'right_only']
    # check if there any right join values, as those would be values that would need to be pushed to end/start station table
    if not df_end_station_join.empty:
        df_end_station_join = df_end_station_join.drop(['_merge'], axis = 1)
        df_end_station_join = df_end_station_join.filter(['end_station_id', 'end_station_name_y'])
        df_end_station_join.columns = ['end_station_id','end_station_name']
        # add right join data to new csv file to be added to end station table 
        df_end_station_join.to_csv('new_end_station_data.csv', index=False)
        load_new_end_station_data()
        
    check_df_start = pd.DataFrame(get_start_station())
    df_start_station_join = check_df_start.merge(new_start_station.drop_duplicates(), on='start_station_id', 
                   how='right', indicator=True)
    
    df_start_station_join= df_start_station_join[df_start_station_join['_merge'] == 'right_only']
    #check if there any right join values
    if not df_start_station_join.empty:
        df_start_station_join = df_start_station_join.drop(['_merge'], axis = 1)
        df_start_station_join = df_start_station_join.filter(['start_station_id', 'start_station_name_y'])
        df_start_station_join.columns = ['start_station_id','start_station_name']
        # add right join data to new csv file to be added to start station table 
        df_start_station_join.to_csv('new_start_station_data.csv', index=False)
        load_new_start_station_data()
    # check that the main dataframe is not empty 
    if not cleaned_new_combined_df.empty:
        # push cleaned data to local hard drive to later be loaded to the main trip_data table 
        cleaned_new_combined_df.to_csv('new_main_data.csv', index=False)
        load_new_trip_data()
    # push new data to trip data table
def load_new_trip_data():
    try:
        conn = psycopg2.connect(database="postgres", user='newuser', password='password', host='localhost', port= '5432')
        cur = conn.cursor()
        with open('new_main_data.csv', 'r') as f:
            next(f) 
            cur.copy_from(f, 'trip_data', sep=',')
            conn.commit()  
            
            
    except (Exception, psycopg2.Error) as error:
        print("Error pushing data to trip_data table", error)

    finally:
        # closing database connection.
        if conn:
            cur.close()
            conn.close()
            print("new trip data pushed successfully!")
            
    # push new data to end station table
def load_new_end_station_data():
    try:
        conn = psycopg2.connect(database="postgres", user='newuser', password='password', host='localhost', port= '5432')
        cur = conn.cursor()
        with open('new_end_station_data.csv', 'r') as f:
            next(f) 
            cur.copy_from(f, 'end_station', sep=',')
            conn.commit()  
            print("new end station  data pushed successfully!")
    except (Exception, psycopg2.Error) as error:
        print("Error pushing data to end_station table", error)

    finally:
        # closing database connection.
        if conn:
            cur.close()
            conn.close()
            print("new end_station data pushed successfully!")
        # push new data to start station table
def load_new_start_station_data():
    try:
        conn = psycopg2.connect(database="postgres", user='newuser', password='password', host='localhost', port= '5432')
        cur = conn.cursor()
        with open('new_start_station_data.csv', 'r') as f:
            next(f) 
            cur.copy_from(f, 'start_station', sep=',')
            conn.commit()  
            print("new start station  data pushed successfully!") 
    except (Exception, psycopg2.Error) as error:
        print("Error pushing data to start_station table", error)

    finally:
        # closing database connection.
        if conn:
            cur.close()
            conn.close()
            print("new start_station data pushed successfully!")
    


    

<class 'pandas.core.frame.DataFrame'>
Int64Index: 826339 entries, 0 to 183675
Data columns (total 11 columns):
 #   Column            Non-Null Count   Dtype         
---  ------            --------------   -----         
 0   ride_id           826339 non-null  object        
 1   rideable_type     826339 non-null  object        
 2   started_at        826339 non-null  datetime64[ns]
 3   ended_at          826339 non-null  datetime64[ns]
 4   start_station_id  826339 non-null  object        
 5   end_station_id    826339 non-null  object        
 6   start_lat         826339 non-null  float64       
 7   start_lng         826339 non-null  float64       
 8   end_lat           826339 non-null  float64       
 9   end_lng           826339 non-null  float64       
 10  member_casual     826339 non-null  object        
dtypes: datetime64[ns](2), float64(4), object(5)
memory usage: 75.7+ MB
