Load all requisite packages to build this pipeline

In [1]:
import numpy as np
import pandas as pd
import requests
import json
import boto3
import s3fs
from datetime import date, timedelta
import csv
import mysql.connector
import pymysql
import sys

# Part I: Extract

Let's scrape pollution data from the openaq API

In [2]:
# url for openaq api
url = "https://api.openaq.org/v1/measurements"

# specify time parameters
yesterday = date.today() - timedelta(days=1) # since data will be collected on following day
yesterday =  yesterday.strftime('%Y-%m-%d') 
time_steps = [("00","11"), ("12","23")] # necessary since too much data cannot be scraped at once

# list to store data
df = []

# Let's scrape!
for i,j in time_steps:
        
    print(i,j)

    parameters = {
    "country": "US",
    "limit": 10000, # 10000 is enough to collect data for half a day 
    "has_geo": True,
    "parameter": "pm25",   # for multiple parameters --> "parameter[]": {"pm25","o3"}  
    "date_from": yesterday + "T" + i + ":00:00",
    "date_to" : yesterday + "T" + j + ":59:00"}

    response = requests.request("GET", url, params=parameters) 

    response_json = response.json()  
    
    results_list = response_json['results']
    
    x = pd.DataFrame(results_list)

    df.append(x)

df = pd.concat(df, ignore_index = True)

df.head()  

00 11
12 23


Unnamed: 0,city,coordinates,country,date,location,parameter,unit,value
0,MATANUSKA-SUSITNA,"{'latitude': 61.534163, 'longitude': -149.031655}",US,"{'utc': '2020-06-28T11:00:00.000Z', 'local': '...",Butte (Harrison Cour,pm25,µg/m³,5.0
1,LA PAZ,"{'latitude': 34.243889, 'longitude': -113.558611}",US,"{'utc': '2020-06-28T11:00:00.000Z', 'local': '...",Alamo Lake,pm25,µg/m³,5.0
2,Riverside-San Bernardino-Ontario,"{'latitude': 33.999449, 'longitude': -117.415831}",US,"{'utc': '2020-06-28T11:00:00.000Z', 'local': '...",Riverside - Rubidoux,pm25,µg/m³,8.0
3,Phoenix-Mesa-Scottsdale,"{'latitude': 33.503601, 'longitude': -112.095001}",US,"{'utc': '2020-06-28T11:00:00.000Z', 'local': '...",Phoenix JLG Supersit,pm25,µg/m³,6.0
4,PIMA,"{'latitude': 32.322573, 'longitude': -111.037709}",US,"{'utc': '2020-06-28T11:00:00.000Z', 'local': '...",Orange Grove,pm25,µg/m³,4.0


We can see that the openaq API does not provide state names. This is an issue since there are many cities with the same names across the country. To procure state names, I use Google's geocode API which can be seen in the Collect_States file. 

# Part II: Transform

Clean columns and remove rows with missing values 

In [3]:
# only select first part of hyphenated city to align with geocode API data  
df['city'] = df['city'].str.split('-').str[0]

# make everything capital for consistency
df['city'] = df['city'].str.upper()
df['location'] = df['location'].str.upper()

# only keep non null records 
df = df.where((pd.notnull(df)), None)

Split date and coordinates columns

In [4]:
## Date
# change from object to string
df["date"] = df["date"].astype(str)

# split column by comma
new = df["date"].str.split(",", n = 1, expand = True) 

# add split columns
df["utc"] = new[0] 
df["local"] = new[1] 

# keep only numbers
df['utc'] = df['utc'].str.split(':').str[1]
df['utc'] = df['utc'].map(lambda x: x.lstrip("' "))

df['local'] = df['local'].str.split(':').str[1]
df['local'] = df['local'].map(lambda x: x.lstrip("' "))



## Coordinates
# change from object to string
df["coordinates"] = df["coordinates"].astype(str)

# split column by comma
new = df["coordinates"].str.split(",", n = 1, expand = True) 

# add split columns
df["latitude"] = new[0] 
df["longitude"] = new[1] 

# keep only numbers
df['latitude'] = df['latitude'].str.split(':').str[1]
df['latitude'] = pd.to_numeric(df['latitude'])

df['longitude'] = df['longitude'].map(lambda x: x.rstrip('}'))
df['longitude'] = df['longitude'].str.split(':').str[1]
df['longitude'] = pd.to_numeric(df['longitude'])


# drop old columns
df = df.drop(columns = ["date", "coordinates"])
df.head()

Unnamed: 0,city,country,location,parameter,unit,value,utc,local,latitude,longitude
0,MATANUSKA,US,BUTTE (HARRISON COUR,pm25,µg/m³,5.0,2020-06-28T11,2020-06-28T02,61.534163,-149.031655
1,LA PAZ,US,ALAMO LAKE,pm25,µg/m³,5.0,2020-06-28T11,2020-06-28T04,34.243889,-113.558611
2,RIVERSIDE,US,RIVERSIDE - RUBIDOUX,pm25,µg/m³,8.0,2020-06-28T11,2020-06-28T03,33.999449,-117.415831
3,PHOENIX,US,PHOENIX JLG SUPERSIT,pm25,µg/m³,6.0,2020-06-28T11,2020-06-28T04,33.503601,-112.095001
4,PIMA,US,ORANGE GROVE,pm25,µg/m³,4.0,2020-06-28T11,2020-06-28T04,32.322573,-111.037709


Hours is attached to date i.e. T23, so let's split that into another column

In [5]:
# Split utc time

# first split time
hour = df["utc"].str.split("T", n = 1, expand = True)
# add split columns
df["hour_utc"] = hour[1] 
# strip T and after from utc
df['utc'] = df['utc'].str.split('T').str[0]


# Split local time

# first split time
hour = df["local"].str.split("T", n = 1, expand = True)
# add split columns
df["hour_local"] = hour[1] 
# strip T and after from local
df['local'] = df['local'].str.split('T').str[0]

df.head()

Unnamed: 0,city,country,location,parameter,unit,value,utc,local,latitude,longitude,hour_utc,hour_local
0,MATANUSKA,US,BUTTE (HARRISON COUR,pm25,µg/m³,5.0,2020-06-28,2020-06-28,61.534163,-149.031655,11,2
1,LA PAZ,US,ALAMO LAKE,pm25,µg/m³,5.0,2020-06-28,2020-06-28,34.243889,-113.558611,11,4
2,RIVERSIDE,US,RIVERSIDE - RUBIDOUX,pm25,µg/m³,8.0,2020-06-28,2020-06-28,33.999449,-117.415831,11,3
3,PHOENIX,US,PHOENIX JLG SUPERSIT,pm25,µg/m³,6.0,2020-06-28,2020-06-28,33.503601,-112.095001,11,4
4,PIMA,US,ORANGE GROVE,pm25,µg/m³,4.0,2020-06-28,2020-06-28,32.322573,-111.037709,11,4


# Part III: Load

I previously created a MySQL database and loaded tables with historical data (collected using Amazon Athena)

In [6]:
## create openaq database

# mydb = mysql.connector.connect (
#     host = "localhost",
#     user = "root",
#     passwd = "*********")

# cur = mydb.cursor()

# cur.execute("CREATE DATABASE openaq")

# print("Database created successfully")

Now that the historical data is loaded, I can update the tables daily with new pollution data 

In [7]:
#connect to mysql openaq database
mydb = mysql.connector.connect (
    host = "localhost",
    user = "root",
    passwd = "********",
    database = "openaq")

print("Database opened successfully")

Database opened successfully


Drop pollution table since it contains old, openaq data

In [8]:
#drop pollution table
cur = mydb.cursor()
cur.execute("DROP TABLE pollution")
print("Table dropped... ")

Table dropped... 


Recreate the 'pollution' table

Note: 'state' table contains corresponding states for cities in pollution table (collected using Google's geocode API)

In [9]:
cur = mydb.cursor()

cur.execute("CREATE TABLE pollution (city VARCHAR(50), country VARCHAR(50), location VARCHAR(50), parameter VARCHAR(50), unit VARCHAR(50), value FLOAT(10), utc DATE, local DATE, latitude FLOAT(10), longitude FLOAT(10), hour_utc INT, hour_local INT)")

print("Table created successfully")

# cur.execute("CREATE TABLE state (state VARCHAR(50), city VARCHAR(50), min_lat FLOAT(10), max_lat FLOAT(10), min_long FLOAT(10), max_long FLOAT(10))")

# print("Table created successfully")

Table created successfully


Populate 'pollution' table with the most recent pollution data

Note: 'state' table does not need to be updated since state names are nontemporal 

In [10]:
# pollution
df = df.where((pd.notnull(df)), None)
cur = mydb.cursor()

# creating column list for insertion
cols = ",".join([str(i) for i in df.columns.tolist()])

# Insert DataFrame records one by one.
for i,row in df.iterrows():
    sql = "INSERT INTO pollution (" +cols+ ") VALUES (" + "%s,"*(len(row)-1) + "%s)"
    cur.execute(sql, tuple(row))
    
mydb.commit()


#state

# cur = mydb.cursor()

# with open("/Users/halabanz/Desktop/openaq_project/state/state_final_update.csv", 'r', encoding='utf-8',
#                  errors='ignore') as f:
#     reader = csv.reader(f)
#     next(reader) # Skip the header row.
#     for row in reader:
#         cur.execute("INSERT INTO state (state, city, min_lat, max_lat, min_long, max_long) VALUES (%s, %s, %s, %s, %s, %s)", row)
        
# mydb.commit()

print("Done")

Done


Merge 'pollution' and 'state' tables to get the corresponding state for each city

In [11]:
query = "SELECT p.location, p.city, s.state, p.country, p.value, p.unit, p.parameter, p.utc, p.local, p.latitude, p.longitude, p.hour_utc, p.hour_local FROM pollution p join state s on p.latitude >= s.min_lat and p.latitude <= s.max_lat and p.longitude >= s.min_long and p.longitude <= s.max_long and s.city = p.city"
merged = pd.read_sql(query, mydb)
merged.head()

Unnamed: 0,location,city,state,country,value,unit,parameter,utc,local,latitude,longitude,hour_utc,hour_local
0,RIVERSIDE - RUBIDOUX,RIVERSIDE,CA,US,8.0,µg/m³,pm25,2020-06-28,2020-06-28,33.9995,-117.416,11,3
1,PHOENIX JLG SUPERSIT,PHOENIX,AZ,US,6.0,µg/m³,pm25,2020-06-28,2020-06-28,33.5036,-112.095,11,4
2,CHILDREN'S PARK SITE,TUCSON,AZ,US,6.0,µg/m³,pm25,2020-06-28,2020-06-28,32.2953,-110.982,11,4
3,ROSE ELEMENTARY,TUCSON,AZ,US,10.0,µg/m³,pm25,2020-06-28,2020-06-28,32.1731,-110.98,11,4
4,GERONIMO,TUCSON,AZ,US,12.0,µg/m³,pm25,2020-06-28,2020-06-28,32.25,-110.967,11,4


Create 'merged' table in MySQL

In [12]:
# cur = mydb.cursor()

# cur.execute("CREATE TABLE merged (location VARCHAR(50), city VARCHAR(50), state VARCHAR(50), country VARCHAR(50), value FLOAT(10), unit VARCHAR(50), parameter VARCHAR(50), utc DATE, local DATE, latitude FLOAT(10), longitude FLOAT(10), hour_utc INT, hour_local INT)")

# print("Table created successfully")

Populate 'merged' table with 'merged' dataframe create above

In [13]:
cur = mydb.cursor()

# creating column list for insertion
cols = ",".join([str(i) for i in merged.columns.tolist()])

# Insert DataFrame recrds one by one.
for i,row in merged.iterrows():
    sql = "INSERT INTO merged (" +cols+ ") VALUES (" + "%s,"*(len(row)-1) + "%s)"
    cur.execute(sql, tuple(row))
    
mydb.commit()
cur.close()

True

Let's save both the unmerged and merged data to aws bucket for backup

Starting with unmerged

In [14]:
# First export to disk (can be deleted after)
path_unmerged = "/Users/halabanz/Desktop/openaq_project/unmerged/" + yesterday 
df.to_csv(path_unmerged, index = None, header=True)

# export to bucket
aws_key_unmerged = "unmerged/" 

from botocore.client import Config

ACCESS_KEY_ID = '*******'
ACCESS_SECRET_KEY = '*********'
BUCKET_NAME = 'openaqhajime' #created a bucket in aws

data = open(path_unmerged, 'rb')

s3 = boto3.resource(
    's3',
    aws_access_key_id = ACCESS_KEY_ID,
    aws_secret_access_key = ACCESS_SECRET_KEY,
    config = Config(signature_version='s3v4')
)

s3.Bucket(BUCKET_NAME).put_object(Key = aws_key_unmerged, Body = data)

print ("Done")

Done


Then merged

In [15]:
path_merged = "/Users/halabanz/Desktop/openaq_project/merged/" + yesterday 
merged.to_csv(path_merged, index = None, header=True)

aws_key_merged = "merged/"  

from botocore.client import Config

ACCESS_KEY_ID = '******'
ACCESS_SECRET_KEY = '*********'
BUCKET_NAME = 'openaqhajime' #created a bucket in aws

data = open(path_merged, 'rb')

s3 = boto3.resource(
    's3',
    aws_access_key_id = ACCESS_KEY_ID,
    aws_secret_access_key = ACCESS_SECRET_KEY,
    config = Config(signature_version='s3v4')
)

s3.Bucket(BUCKET_NAME).put_object(Key = aws_key_merged, Body = data)

print ("Done")

Done


# 'merged' table can now be accessed in Tableau to produce real time visuals!!