# Workbench

## 18.05.2023

The first step was to install ChatGPT's copilot extension.

Once this was done, we initially created a simple function to return the current timestamp:

In [2]:
# Create function that returns the current time
from sqlalchemy import create_engine
from datetime import datetime

def get_current_time():
    return datetime.now()

def connect_to_local_postgres_db():
    # Create the engine to connect to the PostgreSQL database
    engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
    # Create a connection to the engine called `conn`
    conn = engine.connect()
    # Return the connection
    return conn


Once this proved successful, it was decided to replicate the classes developed in the local repository.

One of the longest and most complex steps had been successfullz copying over the data from the Blue Bikes repository itself. We therefore started by writing a function to extract the data...

In [3]:
from urllib import request
from zipfile import ZipFile

def unzip_file_to_local_csv(zip_path):
    # Download the zip file from the URL
    request.urlretrieve(zip_path, 'data.zip')
    # Unzip the file
    ZipFile('data.zip').extractall('data')
    # Return the unzipped file
    # return 'data/tripdata.csv'

...and then tested it:

In [4]:
import os
unzip_file_to_local_csv('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')

It ran successfully. We then inserted the resulting CSV as a table in the local PostGRES database:

In [5]:
import pandas as pd

def insert_csv_as_table_in_local_postgres(csv_path, tablename = 'journeys'):
    # Read in the DataFrame from the CSV file
    df = pd.read_csv(csv_path)
    # Create a connection to the database
    conn = connect_to_local_postgres_db()
    # Append the data to the `trips` table
    df.to_sql(tablename, conn, index=False, if_exists='append')
    # Close the connection
    conn.close()

insert_csv_as_table_in_local_postgres('data/202304-bluebikes-tripdata.csv')

Note that the cell beneath was automatically generated by typing the word "finally" in a Python cell.

Finally, let's create a function that will run all of the functions we've created in order. This will allow us to run a single function to get the data from the URL, unzip it, and insert it into our database.

In [6]:
def call_prior_functions():
    # Unzip the file
    unzip_file_to_local_csv('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
    # Insert the CSV into the database
    insert_csv_as_table_in_local_postgres('data/202304-bluebikes-tripdata.csv')

However, it was not yet time to take this step. We needed to first create a function to load data from the created table:

In [7]:
def run_sql_query_on_postgres_db(query):
    # Create a connection to the database
    conn = connect_to_local_postgres_db()
    # Read the SQL query into a DataFrame
    df = pd.read_sql(query, conn)
    # Close the connection
    conn.close()
    # Return the DataFrame
    return df


This was then tested:

In [8]:
sql_query = """
SELECT * FROM journeys LIMIT 10;
"""
test_df = run_sql_query_on_postgres_db(sql_query)
test_df.head()


Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
0,0093AA5E7E3E0158,docked_bike,2023-04-13 13:49:59,2023-04-13 13:55:04,Innovation Lab - 125 Western Ave at Batten Way,A32011,Soldiers Field Park - 111 Western Ave,A32006,42.363713,-71.124598,42.364263,-71.118276,member
1,BFA8B88E063688F4,docked_bike,2023-04-25 09:44:38,2023-04-25 09:51:28,Museum of Science,M32045,One Broadway / Kendall Sq at Main St / 3rd St,M32003,42.36769,-71.071163,42.362242,-71.083111,member
2,A9C51FA200C31A81,docked_bike,2023-04-24 18:39:31,2023-04-24 18:58:05,New Balance - 20 Guest St,D32001,HMS/HSPH - Avenue Louis Pasteur at Longwood Ave,B32003,42.357329,-71.146735,42.337417,-71.102861,casual
3,0C1D451797FF0871,docked_bike,2023-04-04 19:25:31,2023-04-04 19:32:14,Museum of Science,M32045,Gore Street at Lambert Street,M32081,42.36769,-71.071163,42.37308,-71.086342,member
4,DDDCD0A2D2EE7A37,docked_bike,2023-04-11 08:36:14,2023-04-11 08:52:39,Museum of Science,M32045,Columbus Ave at W. Canton St,C32077,42.36769,-71.071163,42.344742,-71.076482,member


In [9]:
def spatial_join_to_nearest_station():
    # Create the SQL query
    sql_query = """
    SELECT
        t.*,
        s.station_id,
        s.station_name,
        s.station_geom
    FROM trips AS t
    JOIN stations AS s
    ON ST_DWithin(t.start_geom, s.station_geom, 100)
    LIMIT 10;
    """
    # Run the query and return the DataFrame
    return run_sql_query_on_postgres_db(sql_query)

In [10]:
insert_csv_as_table_in_local_postgres('data/current_bluebikes_stations.csv', 'stations')


In [11]:
from geopandas import GeoDataFrame, read_file, points_from_xy
import sqldf

# Boston neighbourhoods
polydf = read_file('data/Boston_Neighborhoods.geojson')

stations = run_sql_query_on_postgres_db("SELECT * FROM stations")
pointdf = GeoDataFrame(
    stations, geometry=points_from_xy(stations.Longitude, stations.Latitude))

pointdf.set_crs(epsg='4326', inplace=True)

# Make sure they're using the same projection reference

#https://geopandas.org/en/stable/gallery/spatial_joins.html
joined_df = pointdf.sjoin(polydf, how="left")
grab_df = joined_df[['Name_left', 'Name_right', 'District']]
matched_pairs = sqldf.run('SELECT DISTINCT Name_left as station, Name_right as neighbourhood from grab_df where District = \'Boston\'')
matched_pairs.to_csv('data/neighbourhood_stations.csv')
insert_csv_as_table_in_local_postgres('data/neighbourhood_stations.csv', 'neighbourhood_stations')


In [12]:
test_df = run_sql_query_on_postgres_db('SELECT * FROM neighbourhood_stations')
test_df

Unnamed: 0.1,Unnamed: 0,station,neighbourhood
0,0,175 N Harvard St,Allston
1,1,2 Hummingbird Lane at Olmsted Green,Mattapan
2,2,555 Metropolitan Ave,Hyde Park
3,3,606 American Legion Hwy at Canterbury St,Roslindale
4,4,645 Summer St,South Boston Waterfront
...,...,...,...
245,245,Wentworth Institute of Technology - Huntington...,Fenway
246,246,West End Park,West End
247,247,Western Ave at Richardson St,Brighton
248,248,Whittier St Health Center,Roxbury


In [13]:
#Join trips and station key using the run_sql_query_on_postgres_db function
def insert_df_as_table_in_local_postgres(df, tablename):
    # Create a connection to the database
    conn = connect_to_local_postgres_db()
    # Append the data to the `trips` table
    df.to_sql(tablename, conn, index=False, if_exists='append')
    # Close the connection
    conn.close()

def join_trips_and_station_key_tables():
    # Create the SQL query
    sql_query = """
    SELECT
        j.*,
        s1.neighbourhood as start_neighbourhood,
        s2.neighbourhood as end_neighbourhood
    FROM journeys AS j
    LEFT JOIN neighbourhood_stations AS s1
    ON j.start_station_name = s1.station
    LEFT JOIN neighbourhood_stations AS s2
    ON j.end_station_name = s2.station

    """
    # Run the query and return the DataFrame
    df = run_sql_query_on_postgres_db(sql_query)
    insert_df_as_table_in_local_postgres(df, 'journeys_enriched')

join_trips_and_station_key_tables()

In [14]:
# Write a Dash application in Python to visualise number of bike trips by neighbourhood
import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly.express as px    


def create_dash_application(group_by='start_neighbourhood'):
    # Create the Dash app
    app = dash.Dash(__name__)
    # Create a DataFrame from the Postgres table
    df = run_sql_query_on_postgres_db('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by))
    # Create a bar chart of the number of trips by neighbourhood
    fig = px.bar(df, x=group_by, y='journeys_count')
    # Create the Dash app layout
    app.layout = html.Div(children=[
        html.H1(children='Hello Dash'),
        dcc.Graph(
            id='example-graph',
            figure=fig
        )
    ])
    # Return the app
    return app

# Run the application
app = create_dash_application()
app.run_server(debug=True, use_reloader=False)  # Turn off reloader if inside Jupyter



The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc
The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html


Dash is running on http://127.0.0.1:8050/



[33m * Tip: There are .env or .flaskenv files present. Do "pip install python-dotenv" to use them.[0m


 * Serving Flask app '__main__'
 * Debug mode: on


In [None]:
# Create a class that defines the functions defined above as methods, executes them, creates a Dash app, and runs the app

import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly.express as px   
from sqlalchemy import create_engine
from datetime import datetime
from urllib import request
from zipfile import ZipFile
import pandas as pd


class BlueBikesDataPipeline:
    def __init__(self, group_by='start_neighbourhood'):
        self.group_by = group_by

    def get_current_time(self):
        return datetime.now()
    def connect_to_local_postgres_db(self):
        # Create the engine to connect to the PostgreSQL database
        engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        conn = engine.connect()
        # Return the connection
        return conn
    def unzip_file_to_local_csv(self, zip_path):
        # Download the zip file from the URL
        request.urlretrieve(zip_path, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'
    def insert_csv_as_table_in_local_postgres(self, csv_path, tablename = 'journeys'):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv(csv_path)
        # Create a connection to the database
        conn = connect_to_local_postgres_db()
        # Append the data to the `trips` table
        df.to_sql(tablename, conn, index=False, if_exists='append')
        # Close the connection
        conn.close()
    def call_prior_functions(self):
        # Unzip the file
        unzip_file_to_local_csv('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
        # Insert the CSV into the database
        self.insert_csv_as_table_in_local_postgres('data/202304-bluebikes-tripdata.csv')
    def run_sql_query_on_postgres_db(self, query):
        # Create a connection to the database
        conn = self.connect_to_local_postgres_db()
        # Read the SQL query into a DataFrame
        df = pd.read_sql(query, conn)
        # Close the connection
        conn.close()
        # Return the DataFrame
        return df
    def insert_df_as_table_in_local_postgres(self, df, tablename):
        # Create a connection to the database
        conn = self.connect_to_local_postgres_db()
        # Append the data to the `trips` table
        df.to_sql(tablename, conn, index=False, if_exists='append')
        # Close the connection
        conn.close()
    def join_trips_and_station_key_tables(self):
        # Create the SQL query
        sql_query = """
        SELECT
            j.*,
            s1.neighbourhood as start_neighbourhood,
            s2.neighbourhood as end_neighbourhood
        FROM journeys AS j
        LEFT JOIN neighbourhood_stations AS s1
        ON j.start_station_name = s1.station
        LEFT JOIN neighbourhood_stations AS s2
        ON j.end_station_name = s2.station

        """
        # Run the query and return the DataFrame
        df = self.run_sql_query_on_postgres_db(sql_query)
        self.insert_df_as_table_in_local_postgres(df, 'journeys_enriched')

    def select_all_from_journeys_enriched(self):
        return self.run_sql_query_on_postgres_db('SELECT * FROM journeys_enriched')
    
    def create_dash_application(self):
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        df = self.run_sql_query_on_postgres_db('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(self.group_by))
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x='start_neighbourhood', y='journeys_count')
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app


The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc
The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html


In [None]:
main = BlueBikesDataPipeline()
main.select_all_from_journeys_enriched()
#main.create_dash_application().run_server(debug=True, use_reloader=False)

Unnamed: 0,ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual,start_neighbourhood,end_neighbourhood
0,0093AA5E7E3E0158,docked_bike,2023-04-13 13:49:59,2023-04-13 13:55:04,Innovation Lab - 125 Western Ave at Batten Way,A32011,Soldiers Field Park - 111 Western Ave,A32006,42.363713,-71.124598,42.364263,-71.118276,member,,
1,BFA8B88E063688F4,docked_bike,2023-04-25 09:44:38,2023-04-25 09:51:28,Museum of Science,M32045,One Broadway / Kendall Sq at Main St / 3rd St,M32003,42.367690,-71.071163,42.362242,-71.083111,member,,
2,A9C51FA200C31A81,docked_bike,2023-04-24 18:39:31,2023-04-24 18:58:05,New Balance - 20 Guest St,D32001,HMS/HSPH - Avenue Louis Pasteur at Longwood Ave,B32003,42.357329,-71.146735,42.337417,-71.102861,casual,,
3,0C1D451797FF0871,docked_bike,2023-04-04 19:25:31,2023-04-04 19:32:14,Museum of Science,M32045,Gore Street at Lambert Street,M32081,42.367690,-71.071163,42.373080,-71.086342,member,,
4,DDDCD0A2D2EE7A37,docked_bike,2023-04-11 08:36:14,2023-04-11 08:52:39,Museum of Science,M32045,Columbus Ave at W. Canton St,C32077,42.367690,-71.071163,42.344742,-71.076482,member,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
888868,F4ECEA82D2C4B806,docked_bike,2023-04-18 09:10:33,2023-04-18 09:14:04,MIT at Mass Ave / Amherst St,M32006,Galileo Galilei Way at Main Street,M32072,42.358100,-71.093198,42.363004,-71.089740,member,,
888869,2667902845A5247B,docked_bike,2023-04-18 08:18:38,2023-04-18 08:23:07,MIT at Mass Ave / Amherst St,M32006,Galileo Galilei Way at Main Street,M32072,42.358100,-71.093198,42.363004,-71.089740,member,,
888870,D524CCC2E75AD37D,docked_bike,2023-04-17 11:34:35,2023-04-17 11:49:10,MIT at Mass Ave / Amherst St,M32006,Beacon St at Washington / Kirkland,S32003,42.358100,-71.093198,42.378754,-71.107072,member,,
888871,A41CB259754F6EF2,docked_bike,2023-04-06 17:26:11,2023-04-06 17:34:44,Central Sq Post Office / Cambridge City Hall a...,M32012,Beacon St at Washington / Kirkland,S32003,42.366426,-71.105495,42.378754,-71.107072,member,,


In [None]:
# Create a class that executes the functions defined above, creates a Dash app, and runs the app
class BlueBikesDashboard:
    def __init__(self):
        pass

    def run(self):
        # Call the functions
        call_prior_functions()
        spatial_join_to_nearest_station()
        join_trips_and_station_key_tables()
        # Create the Dash app
        app = create_dash_application()
        # Run the app
        app.run_server(debug=True, use_reloader=False)

In [None]:
# Write a class that creates a Dash app comparing the number of trips by start and end neighbourhood from the journeys_enriched table
class BlueBikesDashboard:
    def __init__(self):
        pass

    def run(self):
        # Create the Dash app
        app = create_dash_application()
        # Run the app
        app.run_server(debug=True, use_reloader=False)

In [None]:
board = BlueBikesDashboard()
board.run()

NameError: name 'create_dash_application' is not defined

In [None]:
# Create class with the following methods:
# - init: create a connection to the local postgres database
# - run_sql_query_on_postgres_db: run a SQL query on the database and return a DataFrame
# - create_dash_application: create a Dash app that displays the number of trips by start and end neighbourhood
# - run: call the create_dash_application method and run the app
from sqlalchemy import create_engine
import pandas as pd
import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly.express as px

class BlueBikesDashboard:

    def __init__(self):
        engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = engine.connect()
    
    def run_sql_query_on_postgres_db(self, query):
        # Read the SQL query into a DataFrame
        df = pd.read_sql(query, self.conn)
        # Return the DataFrame
        return df
    def join_trips_and_station_key_tables(self):
        # Create the SQL query
        sql_query = """
        SELECT
            j.*,
            s1.neighbourhood as start_neighbourhood,
            s2.neighbourhood as end_neighbourhood
        FROM journeys AS j
        LEFT JOIN neighbourhood_stations AS s1
        ON j.start_station_name = s1.station
        LEFT JOIN neighbourhood_stations AS s2
        ON j.end_station_name = s2.station

        """
        # Run the query and return the DataFrame
        df = self.run_sql_query_on_postgres_db(sql_query)
        self.insert_df_as_table_in_local_postgres(df, 'journeys_enriched')
        
    def create_dash_application(self, group_by='start_neighbourhood'):
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        df = self.run_sql_query_on_postgres_db('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by))
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x='start_neighbourhood', y='journeys_count')
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app
    def run(self):
        # Create the Dash app
        app = self.create_dash_application()
        # Run the app
        app.run_server(debug=True, use_reloader=False)
        

The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc
The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html


In [None]:
# Create an instance of BlueBikesDashboard and run the app  
board = BlueBikesDashboard()
board.join_trips_and_station_key_tables()
board.run()

: 

: 

In [3]:
%%writefile bikeshareai/bluebikes.py
# Create two classes:
# The first one should include the following methods:
# __init__: Takes a URL of a zip file as argument, and unzips the file into a CSV in the data folder
# connect_to_db: Creates and returns a connection to the local PostgreSQL database using the sqlalchemy package.
# csv_to_db: Takes the arguments table_name and csv_path. If no table with the name table_name exists in the local PostgreSQL, then insert the csv located at csv_path as a table with the name table_name
# geojoin: Takes the argument geojson_path. Runs the csv_to_db method on 'data/current_bluebikes_stations.csv', with stations as the table_name. Loads the data from geojson path as polydf, loads the table stations as a Geopandas GeoDataFrame with epsg='4326', and then performs a spatial join between polydf and the stations GeoDataFrame. The resulting Dataframe should be written to the local PostgreSQL database as the table neighbourhood_stations
# enrich_journeys: This method joins the journeys and neighbourhood_stations tables using journeys.start_station_name = neighbourhood_stations.station twice, producing a table with all of the columns from journeys, plus the start and end neighbourhoods and stations for each journey, creating a table in the PostgreSQL database called journeys_enriched
# The second one should include the following methods:
# - create_dash_application: create a Dash app that displays the number of trips by start and end neighbourhood, from the journeys_enriched table
# - run: call the create_dash_application method and run the app
from sqlalchemy import create_engine
import pandas as pd
from urllib import request
from zipfile import ZipFile
from geopandas import GeoDataFrame, read_file, points_from_xy
import sqldf
import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly.express as px

class BlueBikesDataPipeline:
    def __init__(self, url):
        self.url = url
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
    def unzip_file_to_local_csv(self):
        # Download the zip file from the URL
        request.urlretrieve(self.url, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'
    def csv_to_db(self, table_name, csv_path):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv(csv_path)
        # Append the data to the `trips` table
        df.to_sql(table_name, self.conn, index=False, if_exists='append')
    def geojoin(self, geojson_path):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv('data/current_bluebikes_stations.csv')
        # Append the data to the `trips` table
        df.to_sql('stations', self.conn, index=False, if_exists='append')
        # Boston neighbourhoods
        polydf = read_file(geojson_path)
        stations = pd.read_sql('SELECT * FROM stations', self.conn)
        pointdf = GeoDataFrame(
            stations, geometry=points_from_xy(stations.Longitude, stations.Latitude))
        pointdf.set_crs(epsg='4326', inplace=True)
        joined_df = pointdf.sjoin(polydf, how="left")
        grab_df = joined_df[['Name_left', 'Name_right', 'District']]
        matched_pairs = sqldf.run('SELECT DISTINCT Name_left as station, Name_right as neighbourhood from grab_df where District = \'Boston\'')
        matched_pairs.to_sql('neighbourhood_stations', self.conn, index=False, if_exists='append')

    def enrich_journeys(self):
        # Create the SQL query
        sql_query = """
        SELECT
            j.*,
            s1.neighbourhood as start_neighbourhood,
            s2.neighbourhood as end_neighbourhood
        FROM journeys AS j
        LEFT JOIN neighbourhood_stations AS s1
        ON j.start_station_name = s1.station
        LEFT JOIN neighbourhood_stations AS s2
        ON j.end_station_name = s2.station

        """
        # Run the query and return the DataFrame
        df = pd.read_sql(sql_query, self.conn)
        df.to_sql('journeys_enriched', self.conn, index=False, if_exists='append')

class BlueBikesDashboard:

    def __init__(self):
        pass

    def create_dash_application(self, group_by='start_neighbourhood'):
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x='start_neighbourhood', y='journeys_count')
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app
    def run(self):
        # Create the Dash app
        app = self.create_dash_application()
        # Run the app
        app.run_server(debug=True, use_reloader=False)


Writing bikeshareai/bluebikes.py


In [1]:
from bikeshareai.bluebikes import BlueBikesDataPipeline, BlueBikesDashboard
# Create an instance of BlueBikesDataPipeline and run the methods
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
pipeline.unzip_file_to_local_csv()
pipeline.csv_to_db('journeys', 'data/202304-bluebikes-tripdata.csv')
pipeline.geojoin('data/Boston_Neighborhoods.geojson')
pipeline.enrich_journeys()
# Create an instance of BlueBikesDashboard and run the app
board = BlueBikesDashboard()
board.run()


The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc
The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html


AttributeError: module '__main__' has no attribute 'grab_df'

The geojoin class had to be modified by hand, as sqldf cannot be run within a function. We therefore removed it from the imported packages to see if this would induce Copilot to choose a different joining method

In [1]:
%%writefile bikeshareai/bluebikes.py
# Create two classes:
# The first one should include the following methods:
# __init__: Takes a URL of a zip file as argument, and unzips the file into a CSV in the data folder
# connect_to_db: Creates and returns a connection to the local PostgreSQL database using the sqlalchemy package.
# csv_to_db: Takes the arguments table_name and csv_path. If no table with the name table_name exists in the local PostgreSQL, then insert the csv located at csv_path as a table with the name table_name
# geojoin: Takes the argument geojson_path. Runs the csv_to_db method on 'data/current_bluebikes_stations.csv', with stations as the table_name. Loads the data from geojson path as polydf, loads the table stations as a Geopandas GeoDataFrame with epsg='4326', and then performs a spatial join between polydf and the stations GeoDataFrame, using pandas to select the stations and neighbourhood for stations in Boston. The resulting Dataframe should be written to the local PostgreSQL database as the table neighbourhood_stations
# enrich_journeys: This method joins the journeys and neighbourhood_stations tables using journeys.start_station_name = neighbourhood_stations.station twice, producing a table with all of the columns from journeys, plus the start and end neighbourhoods and stations for each journey, creating a table in the PostgreSQL database called journeys_enriched
# The second one should include the following methods:
# - create_dash_application: create a Dash app that displays the number of trips by start and end neighbourhood, from the journeys_enriched table
# - run: call the create_dash_application method and run the app
from sqlalchemy import create_engine
import pandas as pd
from urllib import request
from zipfile import ZipFile
from geopandas import GeoDataFrame, read_file, points_from_xy
import dash
#import dash_core_components as dcc
from dash import dcc
#import dash_html_components as html
from dash import html
import plotly.express as px

class BlueBikesDataPipeline:

    def __init__(self, url):
        self.url = url
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
    
    def unzip_file_to_local_csv(self):
        # Download the zip file from the URL
        request.urlretrieve(self.url, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'
    
    def csv_to_db(self, table_name, csv_path):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv(csv_path)
        # Append the data to the `trips` table
        df.to_sql(table_name, self.conn, index=False, if_exists='append')
    
    def geojoin(self, geojson_path):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv('data/current_bluebikes_stations.csv')
        # Append the data to the `trips` table
        df.to_sql('stations', self.conn, index=False, if_exists='append')
        # Boston neighbourhoods
        polydf = read_file(geojson_path)
        stations = pd.read_sql('SELECT * FROM stations', self.conn)
        pointdf = GeoDataFrame(
            stations, geometry=points_from_xy(stations.Longitude, stations.Latitude))
        pointdf.set_crs(epsg='4326', inplace=True)
        joined_df = pointdf.sjoin(polydf, how="left")
        grab_df = joined_df[['Name_left', 'Name_right', 'District']]
        matched_pairs_with_pandas = grab_df[grab_df['District'] == 'Boston']
        matched_pairs_with_pandas.columns = ['station', 'neighbourhood', 'District']
        #matched_pairs = sqldf.run('SELECT DISTINCT Name_left as station, Name_right as neighbourhood from grab_df where District = \'Boston\'')
        matched_pairs_with_pandas.to_sql('neighbourhood_stations', self.conn, index=False, if_exists='append')

    def enrich_journeys(self):
        # Create the SQL query
        sql_query = """
        SELECT
            j.*,
            s1.neighbourhood as start_neighbourhood,
            s2.neighbourhood as end_neighbourhood
        FROM journeys AS j
        LEFT JOIN neighbourhood_stations AS s1
        ON j.start_station_name = s1.station
        LEFT JOIN neighbourhood_stations AS s2
        ON j.end_station_name = s2.station

        """
        # Run the query and return the DataFrame
        df = pd.read_sql(sql_query, self.conn)
        df.to_sql('journeys_enriched', self.conn, index=False, if_exists='append')
    
class BlueBikesDashboard:
    def __init__(self):
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
    def create_dash_application(self, group_by='start_neighbourhood'):
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x='start_neighbourhood', y='journeys_count')
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app
    def run(self):
        # Create the Dash app
        app = self.create_dash_application()
        # Run the app
        app.run_server(debug=True, use_reloader=False)

Overwriting bikeshareai/bluebikes.py


Calling for "matched pairs with pandas" enabled Copilot to skip the hurdle

In [None]:
from bikeshareai.bluebikes import BlueBikesDataPipeline, BlueBikesDashboard
# Create an instance of BlueBikesDataPipeline and run the methods
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
pipeline.unzip_file_to_local_csv()
pipeline.csv_to_db('journeys', 'data/202304-bluebikes-tripdata.csv')
pipeline.geojoin('data/Boston_Neighborhoods.geojson')
pipeline.enrich_journeys()



In [4]:
# Create an instance of BlueBikesDashboard and run the app
board = BlueBikesDashboard()
board.run()

Dash is running on http://127.0.0.1:8050/



[33m * Tip: There are .env or .flaskenv files present. Do "pip install python-dotenv" to use them.[0m


 * Serving Flask app '__main__'
 * Debug mode: on


## 21.05.2023

The design objectives are as follows:
1. Create a DAG that performs the DataPipeline step if table journeys_enriched is not present, and then proceeds to BlueBikesDashboard
2. Use Spark to parallelise the steps performed with DataPipeline. This will be the most difficult step, as I am less familiar with the pyspark library, and had not reached this step in the original project yet. It may be necessary simply to dive in here and see what Copilot produces as inspiration.
    1. On the other hand, we can also examine which steps are most Python - intensive. Geojoin is mostly performed locally in Python, which makes it a potentially fruitful target for PySparkification
3. Make the Dash app more interactive, with the following features:
    1. Enable user to look at either number of journeys or journey length
    2. Add a select option that enables them to group by either start or end neighbourhood, day of the week, hour of the day or month of the year
4. Create a README for the miniproject

The first (relatively lightweight) set of instructions is beneath:

In [None]:
# Create two classes:
# The first one should include the following methods:
# __init__: Takes a URL of a zip file as argument, and unzips the file into a CSV in the data folder
# connect_to_db: Creates and returns a connection to the local PostgreSQL database using the sqlalchemy package.
# csv_to_db: Takes the arguments table_name and csv_path. If no table with the name table_name exists in the local PostgreSQL, then insert the csv located at csv_path as a table with the name table_name
# main_join: Takes the arguments geojson_path, station_path, and journeys_path, and creates a PySpark context to carry out the following tasks using Spark SQL:
# Loads the csv at station_path as a dataframe called stations, and the csv at journeys_path as journeys. Loads the data from geojson path as polydf, loads the table stations as a Geopandas GeoDataFrame with epsg='4326', and then performs a spatial join between polydf and the stations GeoDataFrame. 
# The resulting Dataframe should be joined to the journey dataframe using journeys.start_station_name = neighbourhood_stations.station twice, producing a table with all of the columns from journeys, plus the start and end neighbourhoods and stations for each journey. Finally, the resulting DataFrame should be written to the PostgreSQL database as a table called journeys_enriched
# The second one should include the following methods:
# - create_dash_application: create a Dash application. In this application, 
#it should be possible to select the number of journeys or the average duration as response_variable, and then summarise response_variable by any of the following variables:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# - run: call the create_dash_application method and run the app

from sqlalchemy import create_engine
import pandas as pd
from urllib import request
from zipfile import ZipFile
from geopandas import GeoDataFrame, read_file, points_from_xy
import dash
#import dash_core_components as dcc
from dash import dcc
#import dash_html_components as html
from dash import html
import plotly.express as px
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, TimestampType
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
from pyspark.sql.functions import rank

class BlueBikesDataPipeline:
    def __init__(self, url):
        self.url = url
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()

    def unzip_file_to_local_csv(self):
        # Download the zip file from the URL
        request.urlretrieve(self.url, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'

    def csv_to_db(self, table_name, csv_path):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv(csv_path)
        # Append the data to the `trips` table
        df.to_sql(table_name, self.conn, index=False, if_exists='append')
    
    def main_join(self, geojson_path, station_path, journeys_path):
        poly_df = read_file(geojson_path)
        poly_df = poly_df[poly_df['District'] == 'Boston']
        poly_df = poly_df[['Name', 'geometry']]
        poly_df.columns = ['neighbourhood', 'geometry']
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.lower()
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(' ', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('-', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('\'', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('(', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(')', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('.', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('&', 'and')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('é', 'e')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('ö', 'o')



Copilot got stuck in a loop of suggesting character replacements (see above). To break out of this, it was necessary to add code for a next step manually.

In [18]:
# Create two classes:
# The first one should include the following methods:
# __init__: Takes a URL of a zip file as argument, and unzips the file into a CSV in the data folder
# connect_to_db: Creates and returns a connection to the local PostgreSQL database using the sqlalchemy package.
# df_to_db: Takes the arguments table_name and dataframe. If no table with the name table_name exists in the local PostgreSQL, then insert the csv located at csv_path as a table with the name table_name
# main_join: Takes the arguments geojson_path, station_path, and journeys_path, and creates a PySpark context to carry out the following tasks using Spark SQL:
# Loads the csv at station_path as a dataframe called stations, and the csv at journeys_path as journeys. Loads the data from geojson path as polydf, loads the table stations as a Geopandas GeoDataFrame with epsg='4326', and then performs a spatial join between polydf and the stations GeoDataFrame. 
# The resulting Dataframe should be joined to the journey dataframe using journeys.start_station_name = neighbourhood_stations.station twice, producing a table with all of the columns from journeys, plus the start and end neighbourhoods and stations for each journey. Finally, the resulting DataFrame should be written to the PostgreSQL database as a table called journeys_enriched
# The second one should include the following methods:
# - create_dash_application: create a Dash application. In this application, 
#it should be possible to select the number of journeys or the average duration as response_variable, and then summarise response_variable by any of the following variables:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# - run: call the create_dash_application method and run the app

from sqlalchemy import create_engine
import pandas as pd
from urllib import request
from zipfile import ZipFile
from geopandas import GeoDataFrame, read_file, points_from_xy
import dash
#import dash_core_components as dcc
from dash import dcc
#import dash_html_components as html
from dash import html
import plotly.express as px
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, TimestampType
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
from pyspark.sql.functions import rank

class BlueBikesDataPipeline:
    def __init__(self, url):
        self.url = url
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()

    def unzip_file_to_local_csv(self):
        # Download the zip file from the URL
        request.urlretrieve(self.url, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'
    
    def df_to_db(self, table_name, dataframe):
        # Append the data to the `trips` table
        dataframe.to_sql(table_name, self.conn, index=False, if_exists='append')


    def csv_to_db(self, table_name, csv_path):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv(csv_path)
        # Append the data to the `trips` table
        df.to_sql(table_name, self.conn, index=False, if_exists='append')
    
    def main_join(self, geojson_path, station_path, journeys_path):
        poly_df = read_file(geojson_path)
        poly_df = poly_df[poly_df['District'] == 'Boston']
        poly_df = poly_df[['Name', 'geometry']]
        poly_df.columns = ['neighbourhood', 'geometry']
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.lower()
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(' ', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('-', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('\'', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('(', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(')', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('.', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('&', 'and')
        # Here is where manual was added
        stations = pd.read_csv(station_path)
        stations = stations[['Station ID', 'Station Name', 'Latitude', 'Longitude']]
        stations.columns = ['station_id', 'station_name', 'latitude', 'longitude']
        stations['station_name'] = stations['station_name'].str.lower()
        stations['station_name'] = stations['station_name'].str.replace(' ', '_')
        stations['station_name'] = stations['station_name'].str.replace('-', '_')
        stations['station_name'] = stations['station_name'].str.replace('\'', '')
        stations['station_name'] = stations['station_name'].str.replace('(', '')
        stations['station_name'] = stations['station_name'].str.replace(')', '')
        stations['station_name'] = stations['station_name'].str.replace('.', '')
        stations['station_name'] = stations['station_name'].str.replace('&', 'and')
        # Here, the loop began again, forcing manual changes
        stations_geo = GeoDataFrame(stations, geometry=points_from_xy(stations.longitude, stations.latitude))
        stations_geo.set_crs(epsg='4326', inplace=True)
        joined_df = stations_geo.sjoin(poly_df, how="left")
        grab_df = joined_df[['station_name', 'neighbourhood']]
        matched_pairs_with_pandas = grab_df[grab_df['neighbourhood'] == 'boston']
        matched_pairs_with_pandas.columns = ['station', 'neighbourhood']
        matched_pairs_with_pandas.to_sql('neighbourhood_stations', self.conn, index=False, if_exists='append')
        # Here, the loop began again, forcing manual changes
        spark = SparkSession.builder.appName("BlueBikes").getOrCreate()
        schema = StructType([])
        journeys = spark.read.csv(journeys_path, header=True, schema=schema)
        journeys = journeys.withColumnRenamed('starttime', 'start_time')
        journeys = journeys.withColumnRenamed('stoptime', 'end_time')
        #journeys = journeys.withColumnRenamed('start station id', 'start_station_id')
        #journeys = journeys.withColumnRenamed('end station id', 'end_station_id')
        #journeys = journeys.withColumnRenamed('start station name', 'start_station_name')
        #journeys = journeys.withColumnRenamed('end station name', 'end_station_name')
        #journeys = journeys.withColumnRenamed('usertype', 'user_type')
        #journeys = journeys.withColumnRenamed('birth year', 'birth_year')
        #Suugestions died here
        journeys = journeys.withColumn('start_time', to_timestamp(journeys.start_time, 'MM/dd/yyyy HH:mm'))
        journeys = journeys.withColumn('end_time', to_timestamp(journeys.end_time, 'MM/dd/yyyy HH:mm'))
        journeys = journeys.withColumn('start_station_id', journeys.start_station_id.cast(IntegerType()))
        journeys = journeys.withColumn('end_station_id', journeys.end_station_id.cast(IntegerType()))
        journeys = journeys.withColumn('start_station_name', journeys.start_station_name.cast(StringType()))
        journeys = journeys.withColumn('end_station_name', journeys.end_station_name.cast(StringType()))
        journeys = journeys.withColumn('user_type', journeys.user_type.cast(StringType()))
        journeys = journeys.withColumn('birth_year', journeys.birth_year.cast(IntegerType()))
        #Manual changes
        journeys_enriched = journeys.join(matched_pairs_with_pandas, journeys.start_station_name == matched_pairs_with_pandas.station, how='left')
        journeys_enriched = journeys_enriched.drop('station')
        journeys_enriched = journeys_enriched.join(matched_pairs_with_pandas, journeys_enriched.end_station_name == matched_pairs_with_pandas.station, how='left')
        journeys_enriched = journeys_enriched.drop('station')
        journeys_enriched = journeys_enriched.withColumnRenamed('neighbourhood', 'start_neighbourhood')
        journeys_enriched = journeys_enriched.withColumnRenamed('neighbourhood', 'end_neighbourhood')
        journeys_enriched = journeys_enriched.withColumn('journey_id', monotonically_increasing_id())
        journeys_enriched = journeys_enriched.withColumn('start_date', to_date(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('end_date', to_date(journeys_enriched.end_time))
        journeys_enriched = journeys_enriched.withColumn('day_of_week', dayofweek(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('hour_of_day', hour(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('month_of_year', month(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', (journeys_enriched.end_time.cast(LongType()) - journeys_enriched.start_time.cast(LongType())))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        # Here, another loop began
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        self.df_to_db('journeys_enriched', journeys_enriched.toPandas())

class BlueBikesDashboard:

    def __init__(self) -> None:
        
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
    
    def create_dash_application(self, response_variable='journeys_count', group_by='start_neighbourhood'):
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x='start_neighbourhood', y='journeys_count')
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app
    
    def create_dash_application_manual(self, response_variable='journeys_count', group_by='start_neighbourhood'):
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        assert response_variable in ['journeys_count', 'journey_duration']
        if response_variable == 'journeys_count':
            df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        else:
            df = pd.read_sql('SELECT {}, AVG(journey_duration) as journey_duration FROM journeys_enriched group by 1'.format(group_by), self.conn)
        df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x=group_by, y=response_variable)
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app


In [19]:
#from bikeshareai.bluebikes import BlueBikesDataPipeline, BlueBikesDashboard
# Create an instance of BlueBikesDataPipeline and run the methods
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
pipeline.unzip_file_to_local_csv()
pipeline.csv_to_db('journeys', 'data/202304-bluebikes-tripdata.csv')
#Arguments were automatically suggested
pipeline.main_join('data/Boston_Neighborhoods.geojson', 'data/current_bluebikes_stations.csv', 'data/202304-bluebikes-tripdata.csv')
pipeline.enrich_journeys()



In [7]:
#        grab_df = joined_df[['Name_left', 'Name_right', 'District']]
# Create two classes:
# The first one should include the following methods:
# __init__: Takes a URL of a zip file as argument, and unzips the file into a CSV in the data folder
# connect_to_db: Creates and returns a connection to the local PostgreSQL database using the sqlalchemy package.
# df_to_db: Takes the arguments table_name and dataframe. If no table with the name table_name exists in the local PostgreSQL, then insert the csv located at csv_path as a table with the name table_name
# main_join: Takes the arguments geojson_path, station_path, and journeys_path, and creates a PySpark context to carry out the following tasks using Spark SQL:
# Loads the csv at station_path as a dataframe called stations, and the csv at journeys_path as journeys. Loads the data from geojson path as polydf, loads the table stations as a Geopandas GeoDataFrame with epsg='4326', and then performs a spatial join between polydf and the stations GeoDataFrame. 
# The resulting Dataframe should be joined to the journey dataframe using journeys.start_station_name = neighbourhood_stations.station twice, producing a table with all of the columns from journeys, plus the start and end neighbourhoods and stations for each journey. Finally, the resulting DataFrame should be written to the PostgreSQL database as a table called journeys_enriched
# The second one should include the following methods:
# - create_dash_application: create a Dash application. In this application, 
#it should be possible to select the number of journeys or the average duration as response_variable, and then summarise response_variable by any of the following variables:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# - run: call the create_dash_application method and run the app

from sqlalchemy import create_engine
import pandas as pd
from urllib import request
from zipfile import ZipFile
from geopandas import GeoDataFrame, read_file, points_from_xy
import dash
#import dash_core_components as dcc
from dash import dcc
#import dash_html_components as html
from dash import html
import plotly.express as px
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, TimestampType
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
from pyspark.sql.functions import rank

class BlueBikesDataPipeline:
    def __init__(self, url):
        self.url = url
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()

    def unzip_file_to_local_csv(self):
        # Download the zip file from the URL
        request.urlretrieve(self.url, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'
    
    def df_to_db(self, table_name, dataframe):
        # Append the data to the `trips` table
        dataframe.to_sql(table_name, self.conn, index=False, if_exists='append')


    def csv_to_db(self, table_name, csv_path):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv(csv_path)
        # Append the data to the `trips` table
        df.to_sql(table_name, self.conn, index=False, if_exists='append')
    
    def main_join(self, geojson_path, station_path, journeys_path):
        poly_df = read_file(geojson_path)
        poly_df = poly_df[poly_df['District'] == 'Boston']
        poly_df = poly_df[['Name', 'geometry']]
        poly_df.columns = ['neighbourhood', 'geometry']
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.lower()
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(' ', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('-', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('\'', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('(', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(')', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('.', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('&', 'and')
        # Here is where manual was added
        stations = pd.read_csv(station_path)
        stations = stations[['Station ID', 'Station Name', 'Latitude', 'Longitude']]
        stations.columns = ['station_id', 'station_name', 'latitude', 'longitude']
        stations['station_name'] = stations['station_name'].str.lower()
        stations['station_name'] = stations['station_name'].str.replace(' ', '_')
        stations['station_name'] = stations['station_name'].str.replace('-', '_')
        stations['station_name'] = stations['station_name'].str.replace('\'', '')
        stations['station_name'] = stations['station_name'].str.replace('(', '')
        stations['station_name'] = stations['station_name'].str.replace(')', '')
        stations['station_name'] = stations['station_name'].str.replace('.', '')
        stations['station_name'] = stations['station_name'].str.replace('&', 'and')
        # Here, the loop began again, forcing manual changes
        stations_geo = GeoDataFrame(stations, geometry=points_from_xy(stations.longitude, stations.latitude))
        stations_geo.set_crs(epsg='4326', inplace=True)
        joined_df = stations_geo.sjoin(poly_df, how="left")
        grab_df = joined_df[['Name_left', 'Name_right', 'District']]
        matched_pairs_with_pandas = grab_df[grab_df['District'] == 'Boston']
        matched_pairs_with_pandas.columns = ['station', 'neighbourhood']
        spark = SparkSession.builder.appName("BlueBikes").getOrCreate()
        schema = StructType([])
        journeys = spark.read.csv(journeys_path, header=True, schema=schema)
        journeys = journeys.withColumnRenamed('starttime', 'start_time')
        journeys = journeys.withColumnRenamed('stoptime', 'end_time')
        journeys = journeys.withColumnRenamed('start station id', 'start_station_id')
        journeys = journeys.withColumnRenamed('end station id', 'end_station_id')
        journeys = journeys.withColumnRenamed('start station name', 'start_station_name')
        journeys = journeys.withColumnRenamed('end station name', 'end_station_name')
        journeys = journeys.withColumnRenamed('usertype', 'user_type')
        journeys = journeys.withColumnRenamed('birth year', 'birth_year')
        #Suugestions died here
        journeys = journeys.withColumn('start_time', to_timestamp(journeys.start_time, 'MM/dd/yyyy HH:mm'))
        journeys = journeys.withColumn('end_time', to_timestamp(journeys.end_time, 'MM/dd/yyyy HH:mm'))
        journeys = journeys.withColumn('start_station_id', journeys.start_station_id.cast(IntegerType()))
        journeys = journeys.withColumn('end_station_id', journeys.end_station_id.cast(IntegerType()))
        journeys = journeys.withColumn('start_station_name', journeys.start_station_name.cast(StringType()))
        journeys = journeys.withColumn('end_station_name', journeys.end_station_name.cast(StringType()))
        journeys = journeys.withColumn('user_type', journeys.user_type.cast(StringType()))
        journeys = journeys.withColumn('birth_year', journeys.birth_year.cast(IntegerType()))
        #Manual changes
        journeys_enriched = journeys.join(matched_pairs_with_pandas, journeys.start_station_name == matched_pairs_with_pandas.station, how='left')
        journeys_enriched = journeys_enriched.drop('station')
        journeys_enriched = journeys_enriched.join(matched_pairs_with_pandas, journeys_enriched.end_station_name == matched_pairs_with_pandas.station, how='left')
        journeys_enriched = journeys_enriched.drop('station')
        journeys_enriched = journeys_enriched.withColumnRenamed('neighbourhood', 'start_neighbourhood')
        journeys_enriched = journeys_enriched.withColumnRenamed('neighbourhood', 'end_neighbourhood')
        journeys_enriched = journeys_enriched.withColumn('journey_id', monotonically_increasing_id())
        journeys_enriched = journeys_enriched.withColumn('start_date', to_date(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('end_date', to_date(journeys_enriched.end_time))
        journeys_enriched = journeys_enriched.withColumn('day_of_week', dayofweek(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('hour_of_day', hour(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('month_of_year', month(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', (journeys_enriched.end_time.cast(LongType()) - journeys_enriched.start_time.cast(LongType())))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        # Here, another loop began
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        self.df_to_db('journeys_enriched', journeys_enriched.toPandas())

class BlueBikesDashboard:

    def __init__(self) -> None:
        
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
    
    def create_dash_application_manual(self, response_variable='journeys_count', group_by='start_neighbourhood'):
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        assert response_variable in ['journeys_count', 'journey_duration']
        if response_variable == 'journeys_count':
            df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        else:
            df = pd.read_sql('SELECT {}, AVG(journey_duration) as journey_duration FROM journeys_enriched group by 1'.format(group_by), self.conn)
        df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x=group_by, y=response_variable)
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app


In [8]:
#from bikeshareai.bluebikes import BlueBikesDataPipeline, BlueBikesDashboard
# Create an instance of BlueBikesDataPipeline and run the methods
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
pipeline.unzip_file_to_local_csv()
pipeline.csv_to_db('journeys', 'data/202304-bluebikes-tripdata.csv')
#Arguments were automatically suggested
pipeline.main_join('data/Boston_Neighborhoods.geojson', 'data/current_bluebikes_stations.csv', 'data/202304-bluebikes-tripdata.csv')
pipeline.enrich_journeys()



KeyError: 'District'

In [None]:
grab_df = joined_df[['Name_left', 'Name_right', 'District']]
        matched_pairs_with_pandas = grab_df[grab_df['District'] == 'Boston']
        matched_pairs_with_pandas.columns = ['station', 'neighbourhood', 'District']

In [1]:
#        grab_df = joined_df[['Name_left', 'Name_right', 'District']]
# Create two classes:
# The first one should include the following methods:
# __init__: Takes a URL of a zip file as argument, and unzips the file into a CSV in the data folder
# connect_to_db: Creates and returns a connection to the local PostgreSQL database using the sqlalchemy package.
# df_to_db: Takes the arguments table_name and dataframe. If no table with the name table_name exists in the local PostgreSQL, then insert the csv located at csv_path as a table with the name table_name
# main_join: Takes the arguments geojson_path, station_path, and journeys_path, and creates a PySpark context to carry out the following tasks using Spark SQL:
# Loads the csv at station_path as a dataframe called stations, and the csv at journeys_path as journeys. Loads the data from geojson path as polydf, loads the table stations as a Geopandas GeoDataFrame with epsg='4326', and then performs a spatial join between polydf and the stations GeoDataFrame. 
# The resulting Dataframe should be joined to the journey dataframe using journeys.start_station_name = neighbourhood_stations.station twice, producing a table with all of the columns from journeys, plus the start and end neighbourhoods and stations for each journey. Finally, the resulting DataFrame should be written to the PostgreSQL database as a table called journeys_enriched
# The second one should include the following methods:
# - create_dash_application: create a Dash application. In this application, 
#it should be possible to select the number of journeys or the average duration as response_variable, and then summarise response_variable by any of the following variables:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# - run: call the create_dash_application method and run the app

from sqlalchemy import create_engine
import pandas as pd
from urllib import request
from zipfile import ZipFile
from geopandas import GeoDataFrame, read_file, points_from_xy
import dash
#import dash_core_components as dcc
from dash import dcc
#import dash_html_components as html
from dash import html
import plotly.express as px
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType, TimestampType
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, dayofweek
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
from pyspark.sql.functions import rank

class BlueBikesDataPipeline:
    def __init__(self, url):
        self.url = url
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()

    def unzip_file_to_local_csv(self):
        # Download the zip file from the URL
        request.urlretrieve(self.url, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'
    
    def df_to_db(self, table_name, dataframe):
        # Append the data to the `trips` table
        dataframe.to_sql(table_name, self.conn, index=False, if_exists='append')


    def csv_to_db(self, table_name, csv_path):
        # Read in the DataFrame from the CSV file
        df = pd.read_csv(csv_path)
        # Append the data to the `trips` table
        df.to_sql(table_name, self.conn, index=False, if_exists='append')
    
    def main_join(self, geojson_path, station_path, journeys_path):
        poly_df = read_file(geojson_path)
#        poly_df = poly_df[poly_df['District'] == 'Boston']
        poly_df = poly_df[['Name', 'geometry']]
        poly_df.columns = ['neighbourhood', 'geometry']
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.lower()
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(' ', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('-', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('\'', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('(', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(')', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('.', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('&', 'and')
        # Here is where manual was added
        stations = pd.read_csv(station_path)
        #stations = stations[['Station ID', 'Station Name', 'Latitude', 'Longitude']]
        stations = stations[['Number', 'Name', 'Latitude', 'Longitude']]
        stations.columns = ['station_id', 'station_name', 'latitude', 'longitude']
        stations['station_name'] = stations['station_name'].str.lower()
        stations['station_name'] = stations['station_name'].str.replace(' ', '_')
        stations['station_name'] = stations['station_name'].str.replace('-', '_')
        stations['station_name'] = stations['station_name'].str.replace('\'', '')
        stations['station_name'] = stations['station_name'].str.replace('(', '')
        stations['station_name'] = stations['station_name'].str.replace(')', '')
        stations['station_name'] = stations['station_name'].str.replace('.', '')
        stations['station_name'] = stations['station_name'].str.replace('&', 'and')
        # Here, the loop began again, forcing manual changes
        stations_geo = GeoDataFrame(stations, geometry=points_from_xy(stations.longitude, stations.latitude))
        stations_geo.set_crs(epsg='4326', inplace=True)
        joined_df = stations_geo.sjoin(poly_df, how="left")
        spark = SparkSession.builder.appName("BlueBikes").getOrCreate()
        schema = StructType([])
        journeys = spark.read.csv(journeys_path, header=True, schema=schema)
        #started_at was actual name
        #journeys = journeys.withColumnRenamed('started_at', 'start_time')
# This line was automatic
        #journeys = journeys.withColumnRenamed('ended_at', 'end_time')

        #Suugestions died here
        journeys = journeys.withColumn('start_time', to_timestamp(journeys.started_at, 'MM/dd/yyyy HH:mm'))
        journeys = journeys.withColumn('end_time', to_timestamp(journeys.ended_at, 'MM/dd/yyyy HH:mm'))
        journeys = journeys.withColumn('start_station_id', journeys.start_station_id.cast(IntegerType()))
        journeys = journeys.withColumn('end_station_id', journeys.end_station_id.cast(IntegerType()))
        journeys = journeys.withColumn('start_station_name', journeys.start_station_name.cast(StringType()))
        journeys = journeys.withColumn('end_station_name', journeys.end_station_name.cast(StringType()))
        #Manual changes
        journeys_enriched = journeys.join(grab_df, journeys.start_station_name == grab_df.station, how='left')
        journeys_enriched = journeys_enriched.drop('station')
        journeys_enriched = journeys_enriched.join(grab_df, journeys_enriched.end_station_name == grab_df.station, how='left')
        journeys_enriched = journeys_enriched.drop('station')
        journeys_enriched = journeys_enriched.withColumnRenamed('neighbourhood', 'start_neighbourhood')
        journeys_enriched = journeys_enriched.withColumnRenamed('neighbourhood', 'end_neighbourhood')
        journeys_enriched = journeys_enriched.withColumn('journey_id', monotonically_increasing_id())
        journeys_enriched = journeys_enriched.withColumn('start_date', to_date(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('end_date', to_date(journeys_enriched.end_time))
        journeys_enriched = journeys_enriched.withColumn('day_of_week', dayofweek(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('hour_of_day', hour(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('month_of_year', month(journeys_enriched.start_time))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', (journeys_enriched.end_time.cast(LongType()) - journeys_enriched.start_time.cast(LongType())))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        # Here, another loop began
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration / 60)
        journeys_enriched = journeys_enriched.withColumn('journey_duration', journeys_enriched.journey_duration.cast(IntegerType()))
        self.df_to_db('journeys_enriched', journeys_enriched.toPandas())

class BlueBikesDashboard:

    def __init__(self) -> None:
        
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
    
    def create_dash_application_manual(self, response_variable='journeys_count', group_by='start_neighbourhood'):
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        assert response_variable in ['journeys_count', 'journey_duration']
        if response_variable == 'journeys_count':
            df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        else:
            df = pd.read_sql('SELECT {}, AVG(journey_duration) as journey_duration FROM journeys_enriched group by 1'.format(group_by), self.conn)
        df = pd.read_sql('SELECT {}, COUNT(*) as journeys_count FROM journeys_enriched group by 1'.format(group_by), self.conn)
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x=group_by, y=response_variable)
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app


In [2]:
#from bikeshareai.bluebikes import BlueBikesDataPipeline, BlueBikesDashboard
# Create an instance of BlueBikesDataPipeline and run the methods
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
pipeline.unzip_file_to_local_csv()
#Arguments were automatically suggested
pipeline.main_join('data/Boston_Neighborhoods.geojson', 'data/current_bluebikes_stations.csv', 'data/202304-bluebikes-tripdata.csv')
pipeline.enrich_journeys()



  poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('(', '')
  poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(')', '')
  poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('.', '')
  stations['station_name'] = stations['station_name'].str.replace('(', '')
  stations['station_name'] = stations['station_name'].str.replace(')', '')
  stations['station_name'] = stations['station_name'].str.replace('.', '')
23/05/21 18:30:46 WARN Utils: Your hostname, Desmonds-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.3 instead (on interface en0)
23/05/21 18:30:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/21 18:30:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


AttributeError: 'DataFrame' object has no attribute 'started_at'

In [6]:
spark = SparkSession.builder.appName("BlueBikes").getOrCreate()
schema = StructType([])
journeys = spark.read.csv('data/202304-bluebikes-tripdata.csv', header=True, schema=schema)
journeys_pandas = journeys.toPandas()


23/05/21 18:34:50 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 13, schema size: 0
CSV file: file:///Users/desmondmolloy/Documents/GitHub/BikeShareAI/data/202304-bluebikes-tripdata.csv
                                                                                

In [9]:
journeys_pandas.columns

Index([], dtype='object')

PySpark was dropped as an option

In [None]:
# Create two classes:
# The first one should include the following methods:
# __init__: Takes a URL of a zip file as argument, and unzips the file into a CSV in the data folder
# connect_to_db: Creates and returns a connection to the local PostgreSQL database using the sqlalchemy package.
# df_to_db: Takes the arguments table_name and dataframe. If no table with the name table_name exists in the local PostgreSQL, then insert the csv located at csv_path as a table with the name table_name
# main_join: Takes the arguments geojson_path, station_path, and journeys_path. Loads the csv at station_path as a dataframe called stations, and the csv at journeys_path as journeys. Loads the data from geojson path as polydf, loads the table stations as a Geopandas GeoDataFrame with epsg='4326', and then performs a spatial join between polydf and the stations GeoDataFrame. 
# The resulting Dataframe should be joined to the journey dataframe using journeys.start_station_name = neighbourhood_stations.station twice, producing a table with all of the columns from journeys, plus the start and end neighbourhoods and stations for each journey. Finally, the resulting DataFrame should be written to the PostgreSQL database as a table called journeys_enriched
# The second one should include the following methods:
# - create_dash_application: create a Dash application. In this application, 
#it should be possible to select the number of journeys or the average duration as response_variable, and then summarise response_variable by any of the following variables:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# - run: call the create_dash_application method and run the app

from sqlalchemy import create_engine
import pandas as pd
from urllib import request
from zipfile import ZipFile
from geopandas import GeoDataFrame, read_file, points_from_xy
import dash
#import dash_core_components as dcc
from dash import dcc
#import dash_html_components as html
from dash import html
import plotly.express as px

class BlueBikesDataPipeline:

    def __init__(self, url):
        self.url = url
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
    
    def unzip_file_to_local_csv(self):
        # Download the zip file from the URL
        request.urlretrieve(self.url, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'
    
    def df_to_db(self, table_name, dataframe):
        # Append the data to the `trips` table
        dataframe.to_sql(table_name, self.conn, index=False, if_exists='append')
    
    def main_join(self, geojson_path, station_path, journeys_path):
        poly_df = read_file(geojson_path)
#        poly_df = poly_df[poly_df['District'] == 'Boston']
        poly_df = poly_df[['Name', 'geometry']]
        poly_df.columns = ['neighbourhood', 'geometry']
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.lower()
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(' ', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('-', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('\'', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('(', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(')', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('.', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('&', 'and')
        # Here is where manual was added
        stations = pd.read_csv(station_path)
        #stations = stations[['Station ID', 'Station Name', 'Latitude', 'Longitude']]
        stations = stations[['Number', 'Name', 'Latitude', 'Longitude']]
        stations.columns = ['station_id', 'station_name', 'latitude', 'longitude']
        stations['station_name'] = stations['station_name'].str.lower()
        stations['station_name'] = stations['station_name'].str.replace(' ', '_')
        stations['station_name'] = stations['station_name'].str.replace('-', '_')
        stations['station_name'] = stations['station_name'].str.replace('\'', '')
        stations['station_name'] = stations['station_name'].str.replace('(', '')
        stations['station_name'] = stations['station_name'].str.replace(')', '')
        stations['station_name'] = stations['station_name'].str.replace('.', '')
        stations['station_name'] = stations['station_name'].str.replace('&', 'and')
        # Here, the loop began again, forcing manual changes
        stations_geo = GeoDataFrame(stations, geometry=points_from_xy(stations.longitude, stations.latitude))
        stations_geo.set_crs(epsg='4326', inplace=True)
        joined_df = stations_geo.sjoin(poly_df, how="left")
        #grab_df = joined_df[['Name_left', 'Name_right', 'District']]
        #matched_pairs_with_pandas = grab_df[grab_df['District'] == 'Boston']
        #matched_pairs_with_pandas.columns = ['station', 'neighbourhood', 'District']
        # Create a SparkSession
        spark = SparkSession.builder.appName("BlueBikes").getOrCreate()
        # Create a schema for the dataframe
        schema = StructType([])
        # Load the CSV file into a dataframe
        #journeys = spark.read.csv(journeys_path, header=True, schema=schema)
        journeys_pandas_df = pd.read_csv(journeys_path)
                

A more specific version of the visualisation class prompt (describing specific SQL steps):

In [1]:
# Create two classes:
# The first one should include the following methods:
# __init__: Takes a URL of a zip file as argument, and unzips the file into a CSV in the data folder
# connect_to_db: Creates and returns a connection to the local PostgreSQL database using the sqlalchemy package.
# df_to_db: Takes the arguments table_name and dataframe. If no table with the name table_name exists in the local PostgreSQL, then insert the csv located at csv_path as a table with the name table_name
# main_join: Takes the arguments geojson_path, station_path, and journeys_path. Loads the csv at station_path as a dataframe called stations, and the csv at journeys_path as journeys. Loads the data from geojson path as polydf, loads the table stations as a Geopandas GeoDataFrame with epsg='4326', and then performs a spatial join between polydf and the stations GeoDataFrame. 
# The resulting Dataframe should be joined to the journey dataframe using journeys.start_station_name = neighbourhood_stations.station twice, producing a table with all of the columns from journeys, plus the start and end neighbourhoods and stations for each journey. Finally, the resulting DataFrame should be written to the PostgreSQL database as a table called journeys_enriched
# the second one should be called BlueBikesVisual and have the following methods
# - __init__: Connect to the local PostgreSQL instance
# - create_dash_application: create a Dash application. It should have two dropdown menus, with the variables response_variable and grouping_variable
# response_variable should be in ["journeys","duration"]. If response_variable = journeys, then pass a SQL statement that counts all records by grouping_variable
# If response_variable is duration, then pass a SQL statement that selects the mean duration froup by grouping variable
# Grouping variable should be in the following:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# The application should have a bar graph showing the response variable by grouping variable
# - run: call the create_dash_application method and run the app
from sqlalchemy import create_engine
import pandas as pd
from urllib import request
from zipfile import ZipFile
from geopandas import GeoDataFrame, read_file, points_from_xy
import dash
#import dash_core_components as dcc
from dash import dcc
#import dash_html_components as html
from dash import html
import plotly.express as px

class BlueBikesDataPipeline:

    def __init__(self, url):
        self.url = url
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
    
    def unzip_file_to_local_csv(self):
        # Download the zip file from the URL
        request.urlretrieve(self.url, 'data.zip')
        # Unzip the file
        ZipFile('data.zip').extractall('data')
        # Return the unzipped file
        # return 'data/tripdata.csv'
    
    def df_to_db(self, table_name, dataframe):
        # Append the data to the `trips` table
        dataframe.to_sql(table_name, self.conn, index=False, if_exists='append')
    
    def main_join(self, geojson_path, station_path, journeys_path):
        poly_df = read_file(geojson_path)
#        poly_df = poly_df[poly_df['District'] == 'Boston']
        poly_df = poly_df[['Name', 'geometry']]
        poly_df.columns = ['neighbourhood', 'geometry']
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.lower()
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(' ', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('-', '_')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('\'', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('(', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(')', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('.', '')
        poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('&', 'and')
        # Here is where manual was added
        stations = pd.read_csv(station_path)
        #stations = stations[['Station ID', 'Station Name', 'Latitude', 'Longitude']]
        stations = stations[['Number', 'Name', 'Latitude', 'Longitude']]
        stations.columns = ['station_id', 'station_name', 'latitude', 'longitude']
        stations['station_name'] = stations['station_name'].str.lower()
        stations['station_name'] = stations['station_name'].str.replace(' ', '_')
        stations['station_name'] = stations['station_name'].str.replace('-', '_')
        stations['station_name'] = stations['station_name'].str.replace('\'', '')
        stations['station_name'] = stations['station_name'].str.replace('(', '')
        stations['station_name'] = stations['station_name'].str.replace(')', '')
        stations['station_name'] = stations['station_name'].str.replace('.', '')
        stations['station_name'] = stations['station_name'].str.replace('&', 'and')
        # Here, the loop began again, forcing manual changes
        stations_geo = GeoDataFrame(stations, geometry=points_from_xy(stations.longitude, stations.latitude))
        stations_geo.set_crs(epsg='4326', inplace=True)
        joined_df = stations_geo.sjoin(poly_df, how="left")
        journeys_pandas_df = pd.read_csv(journeys_path)
        journeys_enriched = journeys_pandas_df.merge(joined_df, left_on='start_station_name', right_on='station_name', how='left')
        journeys_enriched = journeys_enriched.merge(joined_df, left_on='end_station_name', right_on='station_name', how='left')
        journeys_enriched = journeys_enriched.drop(['station_name_x', 'station_name_y'], axis=1)
        journeys_enriched = journeys_enriched.rename(columns={'Name_x': 'start_neighbourhood', 'Name_y': 'end_neighbourhood'})
        journeys_enriched = journeys_enriched.dropna()
        journeys_enriched['duration'] = pd.to_datetime(journeys_enriched['ended_at']) - pd.to_datetime(journeys_enriched['started_at'])
        journeys_enriched['duration'] = journeys_enriched['duration'].dt.total_seconds()
        journeys_enriched['duration'] = journeys_enriched['duration'] / 60
        journeys_enriched['duration'] = journeys_enriched['duration'].astype(int)
        journeys_enriched['journey_id'] = journeys_enriched.index
        #journeys_enriched = journeys_enriched[['journey_id', 'started_at', 'ended_at', 'duration', 'start_neighbourhood', 'end_neighbourhood', 'day_of_week', 'hour_of_day', 'month_of_year']]
        self.df_to_db('journeys_enriched', journeys_enriched)

class BlueBikesVisual:

    def _init__(self):
        pass

    def create_dash_application(self, response_variable='journeys', grouping_variable='start_neighbourhood'):
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        assert response_variable in ['journeys', 'duration']
        if response_variable == 'journeys':
            df = pd.read_sql('SELECT {}, COUNT(*) as journeys FROM journeys_enriched group by 1'.format(grouping_variable), self.conn)
        else:
            df = pd.read_sql('SELECT {}, AVG(duration) as duration FROM journeys_enriched group by 1'.format(grouping_variable), self.conn)
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x=grouping_variable, y=response_variable)
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app
    
    def run(self):
        app = self.create_dash_application()
        app.run_server(debug=True)


In [2]:
# Run the pipeline
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
pipeline.unzip_file_to_local_csv()
pipeline.main_join('data/Boston_Neighborhoods.geojson', 'data/current_bluebikes_stations.csv', 'data/202304-bluebikes-tripdata.csv')
visual = BlueBikesVisual()
visual.run()

  poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('(', '')
  poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace(')', '')
  poly_df['neighbourhood'] = poly_df['neighbourhood'].str.replace('.', '')
  stations['station_name'] = stations['station_name'].str.replace('(', '')
  stations['station_name'] = stations['station_name'].str.replace(')', '')
  stations['station_name'] = stations['station_name'].str.replace('.', '')


Dash is running on http://127.0.0.1:8050/



[33m * Tip: There are .env or .flaskenv files present. Do "pip install python-dotenv" to use them.[0m


 * Serving Flask app '__main__'
 * Debug mode: on


0.00s - make the debugger miss breakpoints. Please pass -Xfrozen_modules=off
0.00s - to python to disable frozen modules.
0.00s - Note: Debugging will proceed. Set PYDEVD_DISABLE_FILE_VALIDATION=1 to disable this validation.
  warn(
  warn(
Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/desmondmolloy/Documents/GitHub/BikeShareAI/.conda/lib/python3.11/site-packages/ipykernel_launcher.py", line 17, in <module>
    app.launch_new_instance()
  File "/Users/desmondmolloy/Documents/GitHub/BikeShareAI/.conda/lib/python3.11/site-packages/traitlets/config/application.py", line 1042, in launch_instance
    app.initialize(argv)
  File "/Users/desmondmolloy/Documents/GitHub/BikeShareAI/.conda/lib/python3.11/site-packages/traitlets/config/application.py", line 113, in inner
    return method(app, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmondmolloy/Documents/G

SystemExit: 1


To exit: use 'exit', 'quit', or Ctrl-D.



## 28.05.2023

When work was picked back up on 28-05-2023, a different approach to writing design specifications was adapted. To keep from being hemmed in or limited by the visible, existing code, a separate textfile was used to write a specification for the dashboard script, at dashboard_20230528.txt. This was then copied into the notebook, and the code was written to match the specification. Please note that Copilot wrote the last sentence.

In [None]:
# Write a class called DashboardBikes with the following methods:

# create_dash_application: Creates a Dash application with one bar graph, and two instances of dcc.Dropdown. 
# The first instance of dcc.Dropdown should be called response_variable
# The second instance should be called grouping_variable

# If response_variable = journeys, then pass a SQL statement that counts all records by grouping_variable
# If response_variable = duration, then pass a SQL statement that selects the mean duration group by grouping variable

# Grouping variable should be in the following:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# The application should have a bar graph showing the response variable by grouping variable

class DashboardBikes:

    def __init__(self):
        pass
    
    def create_dash_application(self, response_variable='journeys', grouping_variable='start_neighbourhood'):
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        # Create a connection to the engine called `conn`
        self.conn = self.engine.connect()
        # Create the Dash app
        app = dash.Dash(__name__)
        # Create a DataFrame from the Postgres table
        assert response_variable in ['journeys', 'duration']
        if response_variable == 'journeys':
            df = pd.read_sql('SELECT {}, COUNT(*) as journeys FROM journeys_enriched group by 1'.format(grouping_variable), self.conn)
        else:
            df = pd.read_sql('SELECT {}, AVG(duration) as duration FROM journeys_enriched group by 1'.format(grouping_variable), self.conn)
        # Create a bar chart of the number of trips by neighbourhood
        fig = px.bar(df, x=grouping_variable, y=response_variable)
        # Create the Dash app layout
        app.layout = html.Div(children=[
            html.H1(children='Hello Dash'),
            dcc.Graph(
                id='example-graph',
                figure=fig
            )
        ])
        # Return the app
        return app

Unfortunately, Copilot simply copied the earlier code, including its flaws (such as a lack of dropdown). We therefore had to paste the prompt into a different notebook, to escape the local bias.

This approach was successful, returning the following code:

```
# Write a class called DashboardBikes with the following methods:

# create_dash_application: Creates a Dash application with one bar graph, and two instances of dcc.Dropdown. 
# The first instance of dcc.Dropdown should be called response_variable
# The second instance should be called grouping_variable

# If response_variable = journeys, then pass a SQL statement that counts all records by grouping_variable
# If response_variable = duration, then pass a SQL statement that selects the mean duration group by grouping variable

# Grouping variable should be in the following:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# The application should have a bar graph showing the response variable by grouping variable

# The application should have a title called "Bike Dashboard"

# The application should have a subtitle called "Created by Desmond Molloy"

class DashboardBikes:
    
    def __init__(self, df):
        self.df = df
        
    def create_dash_application(self):
        
        app = dash.Dash(__name__)
        
        app.layout = html.Div([
            html.H1('Bike Dashboard'),
            html.H2('Created by Desmond Molloy'),
            dcc.Dropdown(
                id='response_variable',
                options=[
                    {'label': 'Journeys', 'value': 'journeys'},
                    {'label': 'Duration', 'value': 'duration'}
                ],
                value='journeys'
            ),
            dcc.Dropdown(
                id='grouping_variable',
                options=[
                    {'label': 'Start Neighbourhood', 'value': 'start_neighbourhood'},
                    {'label': 'End Neighbourhood', 'value': 'end_neighbourhood'},
                    {'label': 'Day of Week', 'value': 'day_of_week'},
                    {'label': 'Hour of Day', 'value': 'hour_of_day'},
                    {'label': 'Month of Year', 'value': 'month_of_year'}
                ],
                value='start_neighbourhood'
            ),
            dcc.Graph(id='bike_graph')
        ])
        
        @app.callback(
            Output('bike_graph', 'figure'),
            Input('response_variable', 'value'),
            Input('grouping_variable', 'value'))
        def update_graph(response_variable, grouping_variable):
            if response_variable == 'journeys':
                df = self.df.groupby(grouping_variable).count().reset_index()
                fig = px.bar(df, x=grouping_variable, y='journey_id')
            elif response_variable == 'duration':
                df = self.df.groupby(grouping_variable).mean().reset_index()
                fig = px.bar(df, x=grouping_variable, y='duration')
            return fig
        
        return app
    
    def run_dash_application(self, app):
        app.run_server(debug=True, use_reloader=False)
    
# Path: sub_benches/outbench_20230528.ipynb
```

In [1]:
from bikeshareai.bluebikes import BlueBikesDataPipeline, DashboardBikes
#Launch the dashboard
dashboard = DashboardBikes()
dashboard.create_dash_application()
dashboard.run()


TypeError: DashboardBikes.__init__() missing 1 required positional argument: 'df'

It was necessary to specify that there should be no df argument

In [5]:
# Write a class called DashboardBike with the following methods:

# - __init__: Takes no arguments. Creates a Dash application with one bar graph, and two instances of dcc.Dropdown.

# The first instance of dcc.Dropdown should be called response_variable
# The second instance should be called grouping_variable

# If response_variable = journeys, then pass a SQL statement that counts all records by grouping_variable
# If response_variable = duration, then pass a SQL statement that selects the mean duration group by grouping variable

# Grouping variable should be in the following:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# The application should have a bar graph showing the response variable by grouping variable

# - run: Calls create_dash_application and runs the Dash application on the local server

# - get_sql: Takes no arguments. Returns a SQL statement that counts all records by grouping_variable

# - get_sql2: Takes no arguments. Returns a SQL statement that selects the mean duration group by grouping variable

class DashboardBike:

    def __init__(self):
        self.app = dash.Dash(__name__)
        self.app.layout = html.Div([
            html.H1('Bike Dashboard'),
            html.H2('Created by Desmond Molloy'),
            dcc.Dropdown(
                id='response_variable',
                options=[
                    {'label': 'Journeys', 'value': 'journeys'},
                    {'label': 'Duration', 'value': 'duration'}
                ],
                value='journeys'
            ),
            dcc.Dropdown(
                id='grouping_variable',
                options=[
                    {'label': 'Start Neighbourhood', 'value': 'start_neighbourhood'},
                    {'label': 'End Neighbourhood', 'value': 'end_neighbourhood'},
                    {'label': 'Day of Week', 'value': 'day_of_week'},
                    {'label': 'Hour of Day', 'value': 'hour_of_day'},
                    {'label': 'Month of Year', 'value': 'month_of_year'}
                ],
                value='start_neighbourhood'
            ),
            dcc.Graph(id='bike_graph')
        ])
            
        @self.app.callback(
            Output('bike_graph', 'figure'),
            Input('response_variable', 'value'),
            Input('grouping_variable', 'value'))
        def update_graph(response_variable, grouping_variable):
            if response_variable == 'journeys':
                sql = self.get_sql(grouping_variable)
            else:
                sql = self.get_sql2(grouping_variable)
            df = pd.read_sql(sql, con=engine)
            fig = px.bar(df, x=grouping_variable, y='count')
            return fig

    def run(self):
        self.app.run_server(debug=True, use_reloader=False)
        


In [6]:
#Create an instance of the DashboardBike class and run the run method
dashboard = DashboardBike()
dashboard.run()


NameError: name 'dash' is not defined

Needed imports had to be added to the design specification

In [7]:
# Write a class called DashboardBike with the following methods:

# - __init__: Takes no arguments. Creates a Dash application with one bar graph, and two instances of dcc.Dropdown.

# The first instance of dcc.Dropdown should be called response_variable
# The second instance should be called grouping_variable

# If response_variable = journeys, then pass a SQL statement that counts all records by grouping_variable
# If response_variable = duration, then pass a SQL statement that selects the mean duration group by grouping variable

# Grouping variable should be in the following:
# start_neighbourhood
# end_neighbourhood
# day_of_week
# hour_of_day
# month_of_year
# The application should have a bar graph showing the response variable by grouping variable

# - run: Calls create_dash_application and runs the Dash application on the local server

# - get_sql: Takes no arguments. Returns a SQL statement that counts all records by grouping_variable

# - get_sql2: Takes no arguments. Returns a SQL statement that selects the mean duration group by grouping variable

#First import all needed libraries

import pandas as pd
import plotly.express as px
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output

#Then create the class

class DashboardBike:
    
        def __init__(self):
            self.app = dash.Dash(__name__)
            self.app.layout = html.Div([
                html.H1('Bike Dashboard'),
                html.H2('Created by Desmond Molloy'),
                dcc.Dropdown(
                    id='response_variable',
                    options=[
                        {'label': 'Journeys', 'value': 'journeys'},
                        {'label': 'Duration', 'value': 'duration'}
                    ],
                    value='journeys'
                ),
                dcc.Dropdown(
                    id='grouping_variable',
                    options=[
                        {'label': 'Start Neighbourhood', 'value': 'start_neighbourhood'},
                        {'label': 'End Neighbourhood', 'value': 'end_neighbourhood'},
                        {'label': 'Day of Week', 'value': 'day_of_week'},
                        {'label': 'Hour of Day', 'value': 'hour_of_day'},
                        {'label': 'Month of Year', 'value': 'month_of_year'}
                    ],
                    value='start_neighbourhood'
                ),
                dcc.Graph(id='bike_graph')
            ])
                
            @self.app.callback(
                Output('bike_graph', 'figure'),
                Input('response_variable', 'value'),
                Input('grouping_variable', 'value'))
            def update_graph(response_variable, grouping_variable):
                if response_variable == 'journeys':
                    sql = self.get_sql(grouping_variable)
                else:
                    sql = self.get_sql2(grouping_variable)
                df = pd.read_sql(sql, con=engine)
                fig = px.bar(df, x=grouping_variable, y='count')
                return fig
    
        def run(self):
            self.app.run_server(debug=True, use_reloader=False)


The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc
The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html


In [8]:
#Call the class and run the run method

dashboard = DashboardBike()
dashboard.run()


Dash is running on http://127.0.0.1:8050/



[33m * Tip: There are .env or .flaskenv files present. Do "pip install python-dotenv" to use them.[0m


 * Serving Flask app '__main__'
 * Debug mode: on


The design had to be rewritten due to the absence of the get_sql and get_sql2 methods. The following code was used instead:


In [3]:
import pandas as pd
import plotly.express as px
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
from sqlalchemy import create_engine


class DashboardBike:

    def __init__(self):
        # The line below was added manually
        self.engine = create_engine('postgresql://postgres:postgres@localhost:5432/desmondmolloy')
        self.app = dash.Dash(__name__)
        self.app.layout = html.Div([
            html.H1('Bike Dashboard'),
            html.H2('Created by Desmond Molloy'),
            dcc.Dropdown(
                id='response_variable',
                options=[
                    {'label': 'Journeys', 'value': 'journeys'},
                    {'label': 'Duration', 'value': 'duration'}
                ],
                value='journeys'
            ),
            dcc.Dropdown(
                id='grouping_variable',
                options=[
                    {'label': 'Start Neighbourhood', 'value': 'start_neighbourhood'},
                    {'label': 'End Neighbourhood', 'value': 'end_neighbourhood'},
                    {'label': 'Day of Week', 'value': 'day_of_week'},
                    {'label': 'Hour of Day', 'value': 'hour_of_day'},
                    {'label': 'Month of Year', 'value': 'month_of_year'}
                ],
                value='start_neighbourhood'
            ),
            dcc.Graph(id='bike_graph')
        ])

        @self.app.callback(
            Output('bike_graph', 'figure'),
            Input('response_variable', 'value'),
            Input('grouping_variable', 'value'))
        def update_graph(response_variable, grouping_variable):
            if response_variable == 'journeys':
                sql = 'SELECT {}, COUNT(*) as count FROM journeys_enriched group by 1'.format(grouping_variable)
            else:
                sql = 'SELECT {}, AVG(duration) as count FROM journeys_enriched group by 1'.format(grouping_variable)
            df = pd.read_sql(sql, con=self.engine)
            fig = px.bar(df, x=grouping_variable, y='count')
            return fig
        
    def run(self):
        self.app.run_server(debug=True, use_reloader=False)


The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
  import dash_core_components as dcc
The dash_html_components package is deprecated. Please replace
`import dash_html_components as html` with `from dash import html`
  import dash_html_components as html


In [4]:
#Call the class and run the run method

dashboard = DashboardBike()
dashboard.run()


Dash is running on http://127.0.0.1:8050/



[33m * Tip: There are .env or .flaskenv files present. Do "pip install python-dotenv" to use them.[0m


 * Serving Flask app '__main__'
 * Debug mode: on


On this occasion, the dashboard ran exactly as desired

In [2]:
from bikeshareai.bluebikes import BlueBikesDataPipeline, DashboardBike
# Create an instance of BlueBikesDataPipeline and run the methods
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
pipeline.unzip_file_to_local_csv()
pipeline.main_join('data/Boston_Neighborhoods.geojson', 'data/current_bluebikes_stations.csv', 'data/202304-bluebikes-tripdata.csv')
pipeline.enrich_journeys()
# Create an instance of DashboardBike and run the run method
dashboard = DashboardBike()
dashboard.run()



AttributeError: 'BlueBikesDataPipeline' object has no attribute 'main_join'

In [3]:
# Write a README text string for this repository
readme_text = """
# Blue Bikes Data Pipeline

This repository contains a data pipeline for the Blue Bikes bike sharing system in Boston. The pipeline downloads the data from the Blue Bikes website, joins it to a shapefile of Boston neighbourhoods, and then loads it into a PostgreSQL database. It also contains a dashboard that allows the user to visualise the data.

## Installation

To install the package, run the following command in the terminal:

```
pip install git+
```

## Usage

To use the package, run the following commands in the terminal:

```
from bikeshareai.bluebikes import BlueBikesDataPipeline, DashboardBike
# Create an instance of BlueBikesDataPipeline and run the methods
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/202304-bluebikes-tripdata.zip')
pipeline.unzip_file_to_local_csv()
pipeline.main_join('data/Boston_Neighborhoods.geojson', 'data/current_bluebikes_stations.csv', 'data/202304-bluebikes-tripdata.csv')
pipeline.enrich_journeys()
# Create an instance of DashboardBike and run the run method
dashboard = DashboardBike()
dashboard.run()
```

## License

This package is licensed under the MIT License. See the LICENSE file for details.

## Credits

This package was created by Desmond Molloy.

"""

# Write the README string to a file
with open('README.md', 'w') as f:
    f.write(readme_text)




In [1]:
# Import all functions from bikeshareai.bluebikes module, call the pipeline and create the Dash app
from bikeshareai.bluebikes import BlueBikesDataPipeline, DashboardBike

# Create an instance of the BlueBikesDataPipeline class
pipeline = BlueBikesDataPipeline('https://s3.amazonaws.com/hubway-data/201705-bluebikes-tripdata.zip')
# Connect to the database
pipeline.connect_to_db()
# Unzip the file and save the CSV locally
pipeline.unzip_file_to_local_csv()
# Create the `stations` table in the database
pipeline.csv_to_db('stations', 'data/current_bluebikes_stations.csv')
pipeline.enrich_journeys('data/Boston_Neighborhoods.geojson','data/202304-bluebikes-tripdata.csv')

# Create an instance of the DashboardBike class
app = DashboardBike()
# Run the Dash app
app.run_server(debug=True)

# Path: outbench_20230528.ipynb



: 

: 