## Citi Bike Data

In [1]:
%load_ext sql

In [2]:
from pyspark.sql import SparkSession
import os
import configparser
import pandas as pd

In [3]:
config = configparser.ConfigParser()

config.read_file(open('dwh.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']

In [4]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [5]:
import requests, zipfile, io

r_citi = requests.get('https://s3.amazonaws.com/tripdata/JC-201801-citibike-tripdata.csv.zip')
z_citi = zipfile.ZipFile(io.BytesIO(r_citi.content))
z_citi.extractall('C:/Users/David/Desktop/DE/citi_data')

In [6]:
df_citi = spark.read.csv("C:/Users/David/Desktop/DE/citi_data/JC-201801-citibike-tripdata.csv")
df_citi.printSchema()
df_citi.show(5)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)
 |-- _c13: string (nullable = true)
 |-- _c14: string (nullable = true)

+------------+--------------------+--------------------+----------------+------------------+--------------------+--------------------+--------------+----------------+--------------------+--------------------+------+----------+----------+------+
|         _c0|                 _c1|                 _c2|             _c3|               _c4|                 _c5|                 _c6|           _c7|             _c8|                 _c9|                _c10|  _c11|    

In [7]:
df_citi = spark.read.csv("C:/Users/David/Desktop/DE/citi_data/JC-201801-citibike-tripdata.csv", inferSchema=True, header=True)
df_citi.printSchema()
df_citi.show(5)

root
 |-- tripduration: integer (nullable = true)
 |-- starttime: timestamp (nullable = true)
 |-- stoptime: timestamp (nullable = true)
 |-- start station id: integer (nullable = true)
 |-- start station name: string (nullable = true)
 |-- start station latitude: double (nullable = true)
 |-- start station longitude: double (nullable = true)
 |-- end station id: integer (nullable = true)
 |-- end station name: string (nullable = true)
 |-- end station latitude: double (nullable = true)
 |-- end station longitude: double (nullable = true)
 |-- bikeid: integer (nullable = true)
 |-- usertype: string (nullable = true)
 |-- birth year: integer (nullable = true)
 |-- gender: integer (nullable = true)

+------------+--------------------+--------------------+----------------+------------------+----------------------+-----------------------+--------------+----------------+--------------------+---------------------+------+----------+----------+------+
|tripduration|           starttime|       

In [8]:
from pyspark.sql.types import StructType as R, StructField as Fld, DoubleType as Dbl, StringType as Str, IntegerType as Int, DateType as Date
df_citiSchema = R([
    Fld("duration_sec",Int()),
    Fld("start_time",Date()),
    Fld("end_time",Date()),
    Fld("start_station_id",Int()),
    Fld("start_station_name",Str()),
    Fld("start_station_latitude",Dbl()),
    Fld("start_station_longitude",Dbl()),
    Fld("end_station_id",Int()),
    Fld("end_station_name",Str()),
    Fld("end_station_latitude",Dbl()),
    Fld("end_station_longitude",Dbl()),
    Fld("bike_id",Int()),
    Fld("user_type",Str()),
    Fld("birth_year",Int()),
    Fld("gender",Int())
])

In [9]:
df_citiwithSchema = spark.read.csv("C:/Users/David/Desktop/DE/citi_data/JC-201801-citibike-tripdata.csv", schema=df_citiSchema, header=True)
df_citiwithSchema.printSchema()
df_citiwithSchema.show(5)

root
 |-- duration_sec: integer (nullable = true)
 |-- start_time: date (nullable = true)
 |-- end_time: date (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- start_station_latitude: double (nullable = true)
 |-- start_station_longitude: double (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- end_station_latitude: double (nullable = true)
 |-- end_station_longitude: double (nullable = true)
 |-- bike_id: integer (nullable = true)
 |-- user_type: string (nullable = true)
 |-- birth_year: integer (nullable = true)
 |-- gender: integer (nullable = true)

+------------+----------+----------+----------------+------------------+----------------------+-----------------------+--------------+----------------+--------------------+---------------------+-------+----------+----------+------+
|duration_sec|start_time|  end_time|start_station_id|start_station_n

In [11]:
df_citiwithSchema.select("end_station_id").distinct().count()

56

check for missing data

extract only nyc data

## Uber Data

In [11]:
z_uber = zipfile.ZipFile("C:/Users/David/Desktop/DE/uber_data/movement-speeds-hourly-new-york-2018-1.csv.zip", 'r')
z_uber.extractall('C:/Users/David/Desktop/DE/uber_data')

## make unzipping faster
## https://stackoverflow.com/questions/16158648/faster-alternative-to-pythons-zipfile-module

In [12]:
df_uber = spark.read.csv("C:/Users/David/Desktop/DE/uber_data/movement-speeds-hourly-new-york-2018-1.csv", inferSchema=True, header=True)
df_uber.printSchema()
df_uber.show(5)

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- utc_timestamp: timestamp (nullable = true)
 |-- segment_id: string (nullable = true)
 |-- start_junction_id: string (nullable = true)
 |-- end_junction_id: string (nullable = true)
 |-- osm_way_id: integer (nullable = true)
 |-- osm_start_node_id: long (nullable = true)
 |-- osm_end_node_id: long (nullable = true)
 |-- speed_mph_mean: double (nullable = true)
 |-- speed_mph_stddev: double (nullable = true)

+----+-----+---+----+-------------------+--------------------+--------------------+--------------------+----------+-----------------+---------------+--------------+----------------+
|year|month|day|hour|      utc_timestamp|          segment_id|   start_junction_id|     end_junction_id|osm_way_id|osm_start_node_id|osm_end_node_id|speed_mph_mean|speed_mph_stddev|
+----+-----+---+----+-------------------+--------------------+-----

In [13]:
df_uber.count()

21647009

In [20]:
df_uber.select("osm_way_id","osm_start_node_id","osm_end_node_id").distinct().count()

95034

translate osm node and route data to street names and latitude and altitudes

https://nominatim.org/release-docs/develop/api/Lookup/

remove year month day hour and rename utc_timestamp to time

## NYC Bicycle Routes Data

In [26]:
import json

r_nycbike = requests.get('https://data.cityofnewyork.us/resource/cc5c-sm6z.json')
df_nycbike = pd.read_json(r_nycbike.text)
df_nycbike = pd.DataFrame(df_nycbike)
df_nycbike = df_nycbike.join(pd.json_normalize(df_nycbike.pop('the_geom')))

df_cleaning = df_nycbike.coordinates.apply(pd.Series)
df_cleaning.columns = ['iter1']

num_nodes = []
start_street_latitude = []
start_street_longitude = []
end_street_latitude = []
end_street_longitude = []
for i in df_cleaning.iter1:
    num_nodes.append(len(i))
    start_street_latitude.append(float(i[0][1]))
    start_street_longitude.append(float(i[0][0]))
    end_street_latitude.append(float(i[len(i)-1][1]))
    end_street_longitude.append(float(i[len(i)-1][0]))

d = {'num_nodes' : num_nodes, 'start_street_latitude' : start_street_latitude,
    'start_street_longitude' : start_street_longitude, 'end_street_latitude' : end_street_latitude,
    'end_street_longitude' : end_street_longitude}
df_clean = pd.DataFrame(d)

df_nycbike = df_nycbike.drop(['coordinates'], axis=1)
df_nycbike = df_nycbike.join(df_clean)

df_nycbike

Unnamed: 0,street,boro,segmentid,facilitycl,fromstreet,tostreet,onoffst,allclasses,instdate,moddate,...,lanecount,tf_facilit,ft_facilit,comments,type,num_nodes,start_street_latitude,start_street_longitude,end_street_latitude,end_street_longitude
0,63 AVE,4,150483,III,WOODHAVEN BLVD,82 PLACE,ON,III,2016-11-25T00:00:00,2016-11-25T00:00:00,...,1,Sharrows,,,MultiLineString,2,40.723159,-73.872182,40.723523,-73.871378
1,NEPTUNE AV,3,9009151,II,W 37 ST,BRIGHTON 8 ST,ON,II,2005-08-01T00:00:00,2005-08-01T00:00:00,...,1,Standard,,,MultiLineString,2,40.577172,-74.000667,40.577121,-74.001105
2,84 ST,4,252570,II,SHORE PKWY SR,157 AV,ON,II,2008-04-01T00:00:00,2008-04-01T00:00:00,...,1,,Standard,,MultiLineString,2,40.662348,-73.849378,40.662175,-73.849319
3,P P BARTEL PRITCHARD SQ APPR,3,253073,I,PROSPECT PARK W,WEST DR,OFF,I,1980-07-01T00:00:00,1980-07-01T00:00:00,...,2,Greenway,Greenway,Prospect Park Auto-Free Hours: Closed to Cars,MultiLineString,4,40.661046,-73.979510,40.661156,-73.978835
4,GREENWICH ST,1,313087,II,CANAL ST,GANSEVOORT ST,ON,II,2008-04-01T00:00:00,2008-04-01T00:00:00,...,1,,Standard,,MultiLineString,2,40.725297,-74.009214,40.725410,-74.009194
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
995,CARLTON AV,3,163863,II,ATLANTIC AV,FLUSHING AV,ON,II,2007-05-01T00:00:00,2007-05-01T00:00:00,...,1,Standard,,,MultiLineString,2,40.696240,-73.973463,40.696142,-73.973453
996,PACIFIC ST,3,43013,II,BROOKLYN AV,HOPKINSON AV,ON,II,2003-06-01T00:00:00,2003-06-01T00:00:00,...,1,Standard,,,MultiLineString,2,40.677199,-73.941463,40.677051,-73.938695
997,FT WASHINGTON PARK BICYCLE TRAIL,1,238472,I,W 145 ST,W 181 ST,OFF,I,1999-07-01T00:00:00,1999-07-01T00:00:00,...,2,Greenway,Greenway,,MultiLineString,2,40.850268,-73.945939,40.850328,-73.945897
998,PARSONS BLVD,4,9005876,II,65 AVE,71 AVE,ON,II,2017-12-15T00:00:00,2017-12-15T00:00:00,...,1,,Standard,,MultiLineString,2,40.730886,-73.810928,40.730766,-73.810935


In [30]:
df_nycbike.columns

Index(['street', 'boro', 'segmentid', 'facilitycl', 'fromstreet', 'tostreet',
       'onoffst', 'allclasses', 'instdate', 'moddate', 'bikedir', 'lanecount',
       'tf_facilit', 'ft_facilit', 'comments', 'type', 'num_nodes',
       'start_street_latitude', 'start_street_longitude',
       'end_street_latitude', 'end_street_longitude'],
      dtype='object')

drop irrelevant columns

## NYC OpenData Bicycle Parking

In [22]:
z_nyc = zipfile.ZipFile("C:/Users/David/Desktop/DE/NYC_data/2013-cityracks-shp.zip", 'r')
z_nyc.extractall('C:/Users/David/Desktop/DE/NYC_data')

## https://mygeodata.cloud/conversion use this to convert

In [23]:
z_nyc_translated = zipfile.ZipFile("C:/Users/David/Desktop/DE/NYC_data/mygeodata.zip", 'r')
z_nyc_translated.extractall('C:/Users/David/Desktop/DE/NYC_data')

In [24]:
df_nycparking = pd.read_csv("C:/Users/David/Desktop/DE/NYC_data/city_racks_2013_06_28.csv")
df_nycparking

Unnamed: 0,X,Y,Name,small,large,circular,mini_hoop,total_rack
0,9.829036e+05,205129.998582,1 7 AV S,5,0,0,0,5
1,9.873304e+05,191302.730305,1 BOERUM PL,1,0,0,0,1
2,9.832110e+05,199016.513434,1 CENTRE ST,10,0,0,0,10
3,9.858978e+05,207157.885275,1 E 13 ST,1,0,0,0,1
4,1.010994e+06,252137.339607,1 E 183 ST,0,0,2,0,2
...,...,...,...,...,...,...,...,...
11729,9.941604e+05,215021.044745,997 1 AV,0,1,0,0,1
11730,9.945118e+05,188057.800772,997 FULTON ST,1,0,0,0,1
11731,9.941257e+05,231703.877849,998 AMSTERDAM AV,1,0,0,0,1
11732,9.949362e+05,231279.406913,998 COLUMBUS AV,0,0,0,1,1


In [25]:
from pyproj import Proj, transform

inProj = Proj(proj='lcc',datum='NAD83', lat_1=40.66666666666666, lat_2=41.03333333333333,
                  lat_0=40.16666666666666, lon_0=-74.0, x_0=983308.79, y_0=142601.67)
longitude = []
latitude = []

for i, j in zip(df_nycparking.X, df_nycparking.Y):
    lg, lt = inProj(i, j, inverse=True)
    longitude.append(lg)
    latitude.append(lt)

d = {'longitude' : longitude, 'latitude' : latitude}
df_coor = pd.DataFrame(d)

df_nycparking = df_nycparking.join(df_coor)
df_nycparking

Unnamed: 0,X,Y,Name,small,large,circular,mini_hoop,total_rack,longitude,latitude
0,9.829036e+05,205129.998582,1 7 AV S,5,0,0,0,5,-74.004797,40.729752
1,9.873304e+05,191302.730305,1 BOERUM PL,1,0,0,0,1,-73.952483,40.605226
2,9.832110e+05,199016.513434,1 CENTRE ST,10,0,0,0,10,-74.001157,40.674700
3,9.858978e+05,207157.885275,1 E 13 ST,1,0,0,0,1,-73.969344,40.748010
4,1.010994e+06,252137.339607,1 E 183 ST,0,0,2,0,2,-73.670185,41.152572
...,...,...,...,...,...,...,...,...,...,...
11729,9.941604e+05,215021.044745,997 1 AV,0,1,0,0,1,-73.871372,40.818750
11730,9.945118e+05,188057.800772,997 FULTON ST,1,0,0,0,1,-73.867691,40.575938
11731,9.941257e+05,231703.877849,998 AMSTERDAM AV,1,0,0,0,1,-73.871495,40.968976
11732,9.949362e+05,231279.406913,998 COLUMBUS AV,0,0,0,1,1,-73.861873,40.965143


drop X Y ??
check other things