# W205 Final Project

#### James Black, Ramsey Magana, Aniruddh Nautiyal, Rich Ung

GitHub repository for the project can be found here: https://github.com/RichUng/w205-final-project

## Install packages necessary to run the below script

In [44]:
!pip install --upgrade gtfs-realtime-bindings
!pip install boto3
!pip install psycopg2

Requirement already up-to-date: gtfs-realtime-bindings in /usr/local/lib/python3.6/dist-packages
Requirement already up-to-date: setuptools in /usr/local/lib/python3.6/dist-packages (from gtfs-realtime-bindings)
Requirement already up-to-date: protobuf in /usr/local/lib/python3.6/dist-packages (from gtfs-realtime-bindings)
Requirement already up-to-date: six>=1.9 in /usr/local/lib/python3.6/dist-packages (from protobuf->gtfs-realtime-bindings)


## Load static datasets into Amazon S3

In [0]:
## Connect to s3

import boto3
import urllib
import pandas as pd
import json

ACCESS_KEY = 'AKIAJP4ASR6L7P6X2YIQ'
SECRET_KEY = '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'


s3 = boto3.resource(
    's3',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY
)

In [0]:
## Operator List
import boto3
import os, shutil
import pandas as pd
import json
import urllib.request

CWD = os.getcwd()

BUCKET_NAME = 'w205-redshift-intermediate'

DIR_1 = CWD+'/my_directory/'

## gtfs_operator_list.csv

response = urllib.request.urlopen('http://api.511.org/transit/gtfsoperators?api_key=baa045f5-dff4-44f4-ad59-8d50f70b12ad&format=json')

body = response.read().decode('utf-8-sig')
json_data = json.loads(body)
df = pd.io.json.json_normalize(json_data)

try:
    os.mkdir(DIR_1)
except FileExistsError:
    pass

df.to_csv(DIR_1+'gtfs_operator_list.csv', index=False)
s3.Object(BUCKET_NAME, 'static_operators/'+'gtfs_operator_list.csv').put(Body=open(DIR_1+'gtfs_operator_list.csv', 'rb'))

shutil.rmtree(DIR_1)

In [47]:
## DATAFEEDS
import boto3
import requests, zipfile, io, os
import os, shutil
import pandas as pd
import json
import urllib.request

APIKEY_511 = 'c446f9f0-5979-4667-a37b-d31b41480fa9'
URL_PREFIX =  'http://api.511.org/transit/datafeeds?api_key='

AGENCY_LIST = ['3D','AC','CC','SF','SR','VN']

BUCKET_NAME = 'w205-redshift-intermediate'

CWD = os.getcwd()

DIR_1 = CWD+'/my_directory/'

DIR_2 = CWD+'/tripsdir/'

## datafeed files

for agency in AGENCY_LIST:
    
    try:
        r = requests.get(URL_PREFIX+str(APIKEY_511)+'&operator_id='+str(agency))
        z = zipfile.ZipFile(io.BytesIO(r.content))
        z.extractall(DIR_1)

        if not os.path.exists(DIR_2):
            os.makedirs(DIR_2)
        
        
        FILENAME = 'trips'+str(agency)+'.txt'

        os.rename(DIR_1+'trips.txt', DIR_2+FILENAME)

        #delete the directory and contents
        shutil.rmtree(DIR_1)
        
        # Read file as pandas frame and append column
        df = pd.read_csv(DIR_2+FILENAME)
        df['agency'] = agency
        data = df.to_csv(index=False)

        # Write file to S3
        s3.Bucket(BUCKET_NAME).put_object(Key='static_datafeed/'+FILENAME, Body=data)
        print('Agency successfully uploaded: '+agency)
        
    except:
        print('Agency failed upload: '+agency)
        pass   
    
shutil.rmtree(DIR_2)
        

Agency successfully uploaded: 3D
Agency successfully uploaded: AC
Agency successfully uploaded: CC
Agency successfully uploaded: SF
Agency successfully uploaded: SR
Agency successfully uploaded: VN


In [0]:
##### CRIME DATA
import boto3

import urllib.request
import os, shutil, zipfile, io

import pandas as pd
import json
import csv

DIR_1 = CWD+'/my_directory/'


##### CRIME DATA

response = urllib.request.urlopen('https://data.sfgov.org/api/views/9v2m-8wqu/rows.json?accessType=DOWNLOAD')
body = response.read().decode('utf-8-sig')
json_data = json.loads(body)

try:
    os.mkdir(DIR_1)
except:
    pass

crime_headers = [i['fieldName'] for i in json_data['meta']['view']['columns']]

with open(DIR_1+'sf_crime.csv', 'w', newline='') as csvfile:
    spamwriter = csv.writer(csvfile, delimiter=',',
                            quotechar='"', quoting=csv.QUOTE_MINIMAL)
    
    spamwriter.writerow(crime_headers)
    for line in json_data['data']:
        spamwriter.writerow(','.join(str(line)))

s3.Object(BUCKET_NAME, 'static_crime/'+'sf_crime.csv').put(Body=open(DIR_1+'sf_crime.csv', 'rb'))        
        
        

shutil.rmtree(DIR_1)


In [0]:
##### INCOME DATA
import boto3

import urllib.request
import os, shutil, zipfile, io

import pandas as pd
import json
import csv

response = urllib.request.urlopen('https://www.irs.gov/pub/irs-soi/15zpallagi.csv')
string = response.read().decode('utf-8-sig')

body =  string.split('\n')

try:
    os.mkdir(DIR_1)
except:
    pass

with open(DIR_1+'irs_income.csv', 'w', newline='') as csvfile:
    spamwriter = csv.writer(csvfile, delimiter=',',
                            quotechar='"', quoting=csv.QUOTE_MINIMAL)
    
    for line in body:
        spamwriter.writerow(line)

s3.Object(BUCKET_NAME, 'static_income/'+'irs_income.csv').put(Body=open(DIR_1+'irs_income.csv', 'rb'))
        
shutil.rmtree(DIR_1)
    


## Load S3 static dataset objects into Amazon Redshift

In [0]:
# To push static data from S3 bucket to Amazon redshift database

import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

# Connection fields
thisdb            = "dev"
thisuser          = "w205"
thispassword      = "W205final"
thishost          = "w205-final-project.cspk3mfgs5hv.us-east-1.redshift.amazonaws.com"
thisport          = "5439"

# S3 static buckets URLs here

s3_load_buckets   = ['s3://w205-redshift-intermediate/static_operators',
                     's3://w205-redshift-intermediate/static_datafeed',
                     's3://w205-redshift-intermediate/static_crime',
                     's3://w205-redshift-intermediate/static_income']

# Connection to the database
conn = psycopg2.connect(database=thisdb, user=thisuser, password=thispassword, host=thishost, port=thisport)

cur = conn.cursor()

# load operator_list tables

cur.execute('''COPY operator_list
               FROM %s
               CREDENTIALS 'aws_access_key_id=AKIAIJPY52GPN4J6LKWQ;aws_secret_access_key=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'
               DELIMITER ','
               REMOVEQUOTES
               IGNOREHEADER 1
               REGION 'us-east-1'
               ;''', (s3_load_buckets[0],) )
conn.commit()

# load data_feed table

cur.execute('''COPY data_feed
               FROM %s
               CREDENTIALS 'aws_access_key_id=AKIAIJPY52GPN4J6LKWQ;aws_secret_access_key=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'
               DELIMITER ','
               REMOVEQUOTES
               IGNOREHEADER 1
               REGION 'us-east-1'
               ;''', (s3_load_buckets[1],) )
conn.commit()

# load crime_data table

cur.execute('''COPY crime_data
               FROM %s
               CREDENTIALS 'aws_access_key_id=AKIAIJPY52GPN4J6LKWQ;aws_secret_access_key=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'
               DELIMITER ','
               REMOVEQUOTES
               IGNOREHEADER 1
               ESCAPE
               REGION 'us-east-1'
               ;''', (s3_load_buckets[2],) )
conn.commit()

# load income_data table

cur.execute('''COPY income_data
               FROM %s
               CREDENTIALS 'aws_access_key_id=AKIAIJPY52GPN4J6LKWQ;aws_secret_access_key=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'
               DELIMITER ','
               REMOVEQUOTES
               IGNOREHEADER 1
               IGNOREBLANKLINES
               REGION 'us-east-1'
               ;''', (s3_load_buckets[3],) )
conn.commit()

conn.close()

## Ad-hoc script to push streaming data into Amazon Kinesis

This script runs once every minute through an Amazon Lambda function

In [51]:
import boto3
import time
import urllib.request
from google.transit import gtfs_realtime_pb2

# All agencies
# agencies = ['3D','AC','AM','AT','AY','BA','BG','CC','CE','CM','CT','DE','EM','FS','GF','GG','HF','MA','MS','PE','RV','SA','SB','SC','SF','SM','SO','SR','ST','UC','VC','VN','WC','WH']

# All succcessful API call agencies
# agencies = ['3D','AC','BA','CC','CT','DE','GG','MA','SC','SF','SM','SR','VN','WC']

# All succcessful API call agencies that return vehicle positions
agencies = ['3D','AC','CC','SF','SR','VN']

kinesis_client = boto3.client('kinesis', region_name='us-east-1', 
    aws_access_key_id='AKIAJP4ASR6L7P6X2YIQ',
    aws_secret_access_key='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')

# Pick a specific agency depending on the minute
minute = int(time.strftime("%M"))
agency = agencies[minute % len(agencies)]
print("Loading Agency:",agency)

# Load latest vehicle positions for agency and send data to Kinesis
try:
  feed = gtfs_realtime_pb2.FeedMessage()
  response = urllib.request.urlopen('http://api.511.org/Transit/VehiclePositions?api_key=c446f9f0-5979-4667-a37b-d31b41480fa9&agency='+agency)
  feed.ParseFromString(response.read())

  print("Sending", len(feed.entity), agency, "rows to Kinesis")
  
  for x in range(0, len(feed.entity)):
    put_response = kinesis_client.put_record(
        StreamName = 'W205',
        Data = "{}, {}, {}, {}, {}, {} \n".format(
                feed.entity[x].vehicle.vehicle.id, 
                feed.entity[x].vehicle.timestamp,
                feed.entity[x].vehicle.position.latitude,
                feed.entity[x].vehicle.position.longitude,
                feed.entity[x].vehicle.trip.trip_id,
                agency),
        PartitionKey = "abc123")
  print(agency,"Successful")
except:
  print(agency,"Failed")

Loading Agency: VN
Sending 0 VN rows to Kinesis
VN Successful
