####**Define ETL functions**


In [23]:
import psycopg2

# Redshift connection function
def get_Redshift_connection():
    host = "ssde.cnqux5xggmn5.us-east-2.redshift.amazonaws.com"
    redshift_user = "choyoura"
    redshift_pass = "password"
    port = 5439
    dbname = "dev"

    conn = psycopg2.connect("dbname={dbname} user={user} host={host} password={password} port={port}".format(
        dbname=dbname,
        user=redshift_user,
        password=redshift_pass,
        host=host,
        port=port
    ))

    conn.set_session(autocommit=True)
    return conn.cursor()

In [24]:
import requests
import json

def extract(url):
    response = requests.get(url)
    print(response)
    data = response.json()['daily']
    print("Extract Done")
    print(data)
    return data

In [25]:
from datetime import datetime

def transform(data):
  ret = []

  for d in data[1:]:
    date = datetime.fromtimestamp(d["dt"]).strftime('%Y-%m-%d')
    avg_temp = d["temp"]["day"]
    min_temp = d["temp"]["min"]
    max_temp = d["temp"]["max"]

    ret.append([date, avg_temp, min_temp, max_temp])
  for row in ret:
    print(row)

  print("Transform Done")
  return ret

In [20]:
from datetime import datetime

def load(data):

  cur = get_Redshift_connection()

  try:
    # copy origin Table to temp Table 
    sql = """BEGIN;
            CREATE TABLE IF NOT EXISTS choyoura.weather_forecast (
              date date primary key,
              temp float,
              min_temp float,
              max_temp float,
              updated_date timestamp default GETDATE()
           );

            DROP TABLE IF EXISTS choyoura.temp_weather_forecast;

            CREATE TABLE IF NOT EXISTS choyoura.temp_weather_forecast (
              date date primary key,
              temp float,
              min_temp float,
              max_temp float,
              updated_date timestamp default GETDATE()
           );

            INSERT INTO choyoura.temp_weather_forecast 
            SELECT * FROM choyoura.weather_forecast;"""

    cur.execute(sql)

    cur.execute("SELECT count(1) FROM choyoura.weather_forecast;")
    org_count = cur.fetchone()[0]

    # ETL to temp Table
    for date, temp, min_temp, max_temp in data:
      sql = f"""
            INSERT INTO choyoura.temp_weather_forecast (date, temp, min_temp, max_temp)
            VALUES ('{date}', '{temp}', '{min_temp}', '{max_temp}');
            """
      cur.execute(sql)

    cur.execute("SELECT count(1) FROM choyoura.temp_weather_forecast;")
    temp_count = cur.fetchone()[0]


    # temp Table input validation    
    if int(org_count) + 7 != int(temp_count):
      raise ValueError('Some Data is missing!')

    # Temp table -> original data, removed duplicates
    sql = """DELETE FROM choyoura.weather_forecast;

            INSERT INTO choyoura.weather_forecast 
            SELECT date, temp, min_temp, max_temp, updated_date
              FROM (SELECT *, ROW_NUMBER() OVER (PARTITION BY date ORDER BY updated_date DESC) seq
                    FROM choyoura.temp_weather_forecast)
              WHERE seq = 1
              ORDER BY date;"""
    
    cur.execute(sql)

    cur.execute("END")
    print("Load Done: Incrementally Updated")

  except:
    print("Error! ROLLBACK")
    cur.execute("ROLLBACK")

  else:
    cur.execute("SELECT count(1) FROM choyoura.weather_forecast;")
    count = cur.fetchone()[0]
    print("\n Number of data inserted to org_table : ", count)         

#### **Run the ETL Process**




https://colab.research.google.com/github/apolitical/colab-env/blob/master/colab_env_testbed.ipynb

Seoul/Coordinates: 37.5665° N, 126.9780° E

In [42]:
!pip install colab-env --upgrade

In [8]:
import colab_env
import os 
# !more gdrive/My\ Drive/vars.env

API_key = os.getenv("API_key")

Mounted at /content/gdrive


In [26]:
params = {'lat':37.57, 'lon':126.98, 'part':'current,minutely,hourly', 'unit':'metric', 'key': API_key}
API_url = 'https://api.openweathermap.org/data/2.5/onecall?lat={lat}&lon={lon}&exclude={part}&units={unit}&appid={key}'.format(**params)

In [27]:
json_data = extract(API_url) # Temperature in Celsius 

<Response [200]>
Extract Done
[{'dt': 1637550000, 'sunrise': 1637533144, 'sunset': 1637569054, 'moonrise': 1637575920, 'moonset': 1637542080, 'moon_phase': 0.58, 'temp': {'day': 5.61, 'min': 1.14, 'max': 8.99, 'night': 1.14, 'eve': 3.58, 'morn': 4.5}, 'feels_like': {'day': 1.03, 'night': -3.97, 'eve': -0.97, 'morn': 0.69}, 'pressure': 1014, 'humidity': 43, 'dew_point': -5.3, 'wind_speed': 8.49, 'wind_deg': 300, 'wind_gust': 14.79, 'weather': [{'id': 616, 'main': 'Snow', 'description': 'rain and snow', 'icon': '13d'}], 'clouds': 82, 'pop': 1, 'rain': 4.28, 'snow': 0.43, 'uvi': 1.91}, {'dt': 1637636400, 'sunrise': 1637619607, 'sunset': 1637655425, 'moonrise': 1637665380, 'moonset': 1637631600, 'moon_phase': 0.61, 'temp': {'day': 3.2, 'min': -0.16, 'max': 4.73, 'night': 2.1, 'eve': 2.99, 'morn': -0.09}, 'feels_like': {'day': -1.91, 'night': -2.09, 'eve': -1.4, 'morn': -4.88}, 'pressure': 1018, 'humidity': 39, 'dew_point': -9.6, 'wind_speed': 7.58, 'wind_deg': 296, 'wind_gust': 14.57, 'wea

In [28]:
week_data = transform(json_data)

['2021-11-23', 3.2, -0.16, 4.73]
['2021-11-24', 5.44, 1.25, 6.84]
['2021-11-25', 7.22, 2.93, 8.64]
['2021-11-26', 6.97, 3.01, 8.42]
['2021-11-27', 6.21, 2.73, 8.07]
['2021-11-28', 8.12, 3.13, 10.11]
['2021-11-29', 9.53, 6.32, 10.98]
Transform Done


In [41]:
!pip install sqlalchemy==1.3.2

In [32]:
import sqlalchemy

In [38]:
load(week_data)

In [39]:
%load_ext sql

%sql postgresql://choyoura:Choyoura!1@ssde.cnqux5xggmn5.us-east-2.redshift.amazonaws.com:5439/dev

In [40]:
%%sql

select * from choyoura.weather_forecast order by date;