Potential workflow
- save raw json data in s3 bucket
- then process it and push to sql
- after a week remove processed data from sql

so no need to push data to s3 weekly

In [1]:
import pandas as pd
import os
import requests as req
import json
import csv
from datetime import datetime, timedelta
from datetime import date

import asyncio
import aiohttp

In [2]:
trt_data = pd.read_csv('dimTRTstore.csv', header=0)

In [3]:
options = {}
options["aqi_url"] = "http://api.openweathermap.org/data/2.5/air_pollution"
options["weather_url"] = "https://api.openweathermap.org/data/2.5/weather"
options["units"] = "metric"
options["api_key"] = 'c5d22ed423af74ba40fc97b49c023304'

In [4]:
# Builds request url to access weather data from OpenWeather
# Params: 
# lat: latitude of desired location
# lon: longitude of desired location
# return: request url for weather

def request_weather_url(lat,lon):

    REQUEST_URL = options["weather_url"] + "?lat=" \
    + str(lat) \
    + "&lon=" + str(lon) \
    + "&appid=" + options["api_key"] \
    + "&units=" + options["units"]

    return REQUEST_URL

In [5]:
# Builds request url to access AQI data from OpenWeather
# Params: 
# lat: latitude of desired location
# lon: longitude of desired location
# return: request url for aqi

def request_aqi_url(lat,lon):

    REQUEST_URL = options["aqi_url"] + "?lat=" \
    + str(lat) \
    + "&lon=" + str(lon) \
    + "&appid=" + options["api_key"]

    return REQUEST_URL

In [6]:
WEATHER_URL_LIST = []
AQI_URL_LIST = []
for trt,lat,lon in zip(trt_data["TRT_ID"],trt_data["Latitude"], trt_data["Longitude"]):
    WEATHER_URL_LIST.append({'url': request_weather_url(lat,lon), 'TRT_ID': trt})
    AQI_URL_LIST.append({'url': request_aqi_url(lat,lon), 'TRT_ID': trt})

In [7]:
async def fetch(session, url, trt):
    async with session.get(url) as response:
        json_response = await response.json(content_type=None)
        json_response.update({'TRT_ID': trt})
        return json_response

In [8]:
async def main_weather(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, trt) for url, trt in zip(WEATHER_URL_LIST[0], WEATHER_URL_LIST[1])]
        return await asyncio.gather(*tasks)

In [9]:
async def main_aqi(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url['url'], trt['TRT_ID']) for url, trt in zip(AQI_URL_LIST, AQI_URL_LIST)]
        return await asyncio.gather(*tasks)

In [10]:
result = await main_weather(WEATHER_URL_LIST)

In [10]:
result_aqi = await main_aqi(AQI_URL_LIST)

In [12]:
def hour_rounder(t):
    # Rounds down to the nearest hour
    return (t.replace(second=0, microsecond=0, minute=0, hour=t.hour))

In [19]:
def weather_to_csv():
    dw = pd.json_normalize(result, 'weather', record_prefix='weather.')
    dn = pd.json_normalize(result)
    #dn['dt'] = dn['dt'].apply(lambda t: hour_rounder(datetime.utcfromtimestamp(t)).strftime('%Y-%m-%d %H:%M:%S'))
    #dn['sys.sunrise'] = dn['sys.sunrise'].apply(lambda t: datetime.utcfromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S'))
    #dn['sys.sunset'] = dn['sys.sunset'].apply(lambda t: datetime.utcfromtimestamp(t).strftime('%Y-%m-%d %H:%M:%S'))
    dn = dn.drop('weather', axis=1)
    df_update = pd.concat([dn,dw], axis=1)
    df_update = df_update[df_update['base'].notna()]
    weather_file_name = "Weather" + datetime.now().strftime("_%Y%m%d%H%M%S.csv")
    df_update.to_csv(weather_file_name, index=False)

In [20]:
weather_to_csv()

In [11]:
def aqi_to_csv():
    dw = pd.json_normalize(result_aqi, 'list', record_prefix='aqi.')
    dn = pd.json_normalize(result_aqi)
    #dw['aqi.dt'] = dw['aqi.dt'].apply(lambda t: hour_rounder(datetime.utcfromtimestamp(t)).strftime('%Y-%m-%d %H:%M:%S'))
    dn = dn.drop('list', axis=1)
    df = pd.concat([dn,dw], axis=1)
    aqi_file_name = "AQI" + datetime.now().strftime("_%Y%m%d%H%M%S.csv")
    df.to_csv(aqi_file_name, index=False)

In [12]:
aqi_to_csv()