# Workbench

The goal of this project is to develop a PyShiny app capable of visualising the following for a selected neighbourhood of Boston, MA:

1. The most common **destination** for cyclists renting a BlueBike in that neighbourhood
2. The most common **origin** for cyclists who rented a BlueBike elsewhere, and finished their journey in the selected neighborhood
3. Over the course of the day, the times at which the most cyclists *arrived* in the neighbourhood by bicycle
4. Over the course of the day, the times at which the most cyclists *departed* the neighbourhood by bicycle

Ideally, the following variables will be selectable in the user input:
- The neighbourhood
- The direction (inbound vs. outbound)

The data processing will be carried out using a local instance of PySpark

## 1. Data engineering and processing
### 1. Setting up Spark

We start by setting up a PySpark cluster

In [1]:
#Reference: https://sparkbyexamples.com/spark/how-to-create-a-sparksession-and-spark-context/
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/06 20:31:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/06 20:31:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


We then check that it runs correctly

In [2]:
print("First SparkContext:");
print("APP Name :"+spark.sparkContext.appName);
print("Master :"+spark.sparkContext.master);

First SparkContext:
APP Name :SparkByExamples.com
Master :local[1]


### 2. Developing the BlueBike data access code locally

#### 1. Ridership data
First, we configure the environment zip path variables

The first approach was not successful

It was necessary to use the load_dotenv package:

In [3]:
#Location: https://stackoverflow.com/questions/63019506/python-get-value-of-env-variable-from-a-specific-env-file
from dotenv import load_dotenv
load_dotenv()
import os
x = os.getenv("zip_path")
print(x)

data/202211-bluebikes-tripdata.zip


Next, we download and unzip the file:

In [4]:
! wget https://s3.amazonaws.com/hubway-data/202211-bluebikes-tripdata.zip -P data/


--2023-01-06 20:31:29--  https://s3.amazonaws.com/hubway-data/202211-bluebikes-tripdata.zip
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.84.27, 52.216.146.189, 52.216.250.142, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.84.27|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12145938 (12M) [application/zip]
Saving to: 'data/202211-bluebikes-tripdata.zip.5'


2023-01-06 20:31:34 (2.98 MB/s) - 'data/202211-bluebikes-tripdata.zip.5' saved [12145938/12145938]



In [5]:
#https://stackoverflow.com/questions/3451111/unzipping-files-in-python
import zipfile
with zipfile.ZipFile(os.getenv("zip_path"), 'r') as zip_ref:
    zip_ref.extractall("data/")

Inspecting the data:

In [6]:
df = spark.read.csv("data/202211-bluebikes-tripdata.csv", header=True)
df.printSchema()
df.head()

                                                                                

root
 |-- tripduration: string (nullable = true)
 |-- starttime: string (nullable = true)
 |-- stoptime: string (nullable = true)
 |-- start station id: string (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: string (nullable = true)
 |-- start station longitude: string (nullable = true)
 |-- end station id: string (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: string (nullable = true)
 |-- end station longitude: string (nullable = true)
 |-- bikeid: string (nullable = true)
 |-- usertype: string (nullable = true)
 |-- postal code: string (nullable = true)



Row(tripduration='803', starttime='2022-11-01 00:00:04.9940', stoptime='2022-11-01 00:13:28.6550', start station id='46', start station name='Christian Science Plaza - Massachusetts Ave at Westland Ave', start station latitude='42.3436658245146', start station longitude='-71.08582377433777', end station id='472', end station name='MIT Hayward St at Amherst St', end station latitude='42.36085937353302', end station longitude='-71.08551858007559', bikeid='2088', usertype='Subscriber', postal code='02142')

#### 2. Station data
It was also necessary to match the stations with Boston's various neighbourhoods.


In [7]:
import pyogrio

In [8]:
import pandas as pd
pd.read_csv(os.getenv("station_path"), header=1).head()

Unnamed: 0,Number,Name,Latitude,Longitude,District,Public,Total docks,Deployment Year
0,K32015,1200 Beacon St,42.344149,-71.114674,Brookline,Yes,15,2021.0
1,W32006,160 Arsenal,42.364664,-71.175694,Watertown,Yes,11,2021.0
2,A32019,175 N Harvard St,42.363796,-71.129164,Boston,Yes,17,2014.0
3,S32035,191 Beacon St,42.380323,-71.108786,Somerville,Yes,19,2018.0
4,C32094,2 Hummingbird Lane at Olmsted Green,42.28887,-71.095003,Boston,Yes,17,2020.0


In [9]:
from shapely.geometry import Point
from geopandas import datasets, GeoDataFrame, read_file
import geopandas
import pandas as pd

# Boston neighbourhoods
polydf = read_file('data/Boston_Neighborhoods.geojson')

stations = pd.read_csv(os.getenv("station_path"), header=1)
pointdf = GeoDataFrame(
    stations, geometry=geopandas.points_from_xy(stations.Longitude, stations.Latitude))

pointdf.set_crs(epsg='4326', inplace=True)

# Make sure they're using the same projection reference


import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In a future release, GeoPandas will switch to using Shapely by default. If you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  from geopandas import datasets, GeoDataFrame, read_file


Unnamed: 0,Number,Name,Latitude,Longitude,District,Public,Total docks,Deployment Year,geometry
0,K32015,1200 Beacon St,42.344149,-71.114674,Brookline,Yes,15,2021.0,POINT (-71.11467 42.34415)
1,W32006,160 Arsenal,42.364664,-71.175694,Watertown,Yes,11,2021.0,POINT (-71.17569 42.36466)
2,A32019,175 N Harvard St,42.363796,-71.129164,Boston,Yes,17,2014.0,POINT (-71.12916 42.36380)
3,S32035,191 Beacon St,42.380323,-71.108786,Somerville,Yes,19,2018.0,POINT (-71.10879 42.38032)
4,C32094,2 Hummingbird Lane at Olmsted Green,42.288870,-71.095003,Boston,Yes,17,2020.0,POINT (-71.09500 42.28887)
...,...,...,...,...,...,...,...,...,...
443,N32005,West Newton,42.349601,-71.226275,Newton,Yes,14,2020.0,POINT (-71.22627 42.34960)
444,A32043,Western Ave at Richardson St,42.361787,-71.143931,Boston,Yes,0,2019.0,POINT (-71.14393 42.36179)
445,B32059,Whittier St Health Center,42.332863,-71.092189,Boston,Yes,19,2019.0,POINT (-71.09219 42.33286)
446,D32040,Williams St at Washington St,42.306539,-71.107669,Boston,Yes,23,2018.0,POINT (-71.10767 42.30654)


In [10]:
#https://geopandas.org/en/stable/gallery/spatial_joins.html
joined_df = pointdf.sjoin(polydf, how="left")
grab_df = joined_df[['Name_left', 'Name_right', 'District']]
import sqldf
matched_pairs = sqldf.run('SELECT DISTINCT Name_left as Station, Name_right as Neighbourhood from grab_df where District = \'Boston\'')
matched_pairs.to_csv('data/neighbourhood_stations.csv')
matched_pairs.head()

Unnamed: 0,Station,Neighbourhood
0,175 N Harvard St,Allston
1,2 Hummingbird Lane at Olmsted Green,Mattapan
2,555 Metropolitan Ave,Hyde Park
3,606 American Legion Hwy at Canterbury St,Roslindale
4,645 Summer St,South Boston Waterfront


In [11]:
grab_df

Unnamed: 0,Name_left,Name_right,District
0,1200 Beacon St,,Brookline
1,160 Arsenal,,Watertown
2,175 N Harvard St,Allston,Boston
3,191 Beacon St,,Somerville
4,2 Hummingbird Lane at Olmsted Green,Mattapan,Boston
...,...,...,...
443,West Newton,,Newton
444,Western Ave at Richardson St,Brighton,Boston
445,Whittier St Health Center,Roxbury,Boston
446,Williams St at Washington St,Jamaica Plain,Boston


### 3. Calling the ETL classes

In [1]:
from sparkbike.etl import ExtractTransformLoad
worker = ExtractTransformLoad()
worker.zip()
worker.geojoin()


import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In a future release, GeoPandas will switch to using Shapely by default. If you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  from geopandas import GeoDataFrame, read_file


AttributeError: module '__main__' has no attribute 'joined_df'

### 4. Checking retrieval of the data
## 2. PyShiny development

## 3. Airflow development
- Install Airflow
- Then change config to point to local dags setup (this should be part of setup file)
- Then launch/create database