#### IMPORT LIBRARIES

In [38]:
from os import getenv
from datetime import datetime, time
import configparser
import requests
import pandas as pd
import psycopg2
from dotenv import load_dotenv
load_dotenv()

True

#### CONFIGURATIONS

In [26]:
try:
  API_KEY = getenv('API_KEY')
  LAT = getenv('LAT')
  LON = getenv('LON')
  UNITS = getenv('UNITS')

  # read postgres database password
  DATABASE_HOST = getenv('DATABASE_HOST')
  DATABASE_USER = getenv('DATABASE_USER')
  DATABASE_NAME = getenv('DATABASE_NAME')
  DATABASE_PASSWORD = getenv('DATABASE_PASSWORD')

  # read snowflake credentials
  SN_USERNAME = getenv('SN_USERNAME')
  SN_PASSWORD = getenv('SN_PASSWORD')
  SN_ACCOUNTNAME = getenv('SN_ACCOUNTNAME')
  SN_WAREHOUSENAME = getenv('SN_WAREHOUSENAME')
  SN_DBNAME = getenv('SN_DBNAME')
  SN_SCHEMANAME = getenv('SN_SCHEMANAME')
  
except:
  print("An exception occurred in Configurations")

#### CONNECT DATABASE

In [27]:
def dbConnect():
  try:
    conn = psycopg2.connect(f"host={DATABASE_HOST} dbname={DATABASE_NAME} user={DATABASE_USER} password={DATABASE_PASSWORD}")
    return conn
  
  except:
    print("Error occurred while connecting to Database")

## CODE

#### Parsing Raw Data Function

In [26]:
def parsingRawData(rawData: dict) -> dict[dict]:
  myDict = {
    "date": datetime.datetime.fromtimestamp(rawData["dt"]),
    "city_name": rawData["name"],
    "city_id": rawData["id"],
    "country": rawData["sys"]["country"],
    "lon": rawData["coord"]["lon"],
    "lat": rawData["coord"]["lat"],
    "base": rawData["base"],
    "temp": rawData["main"]["temp"],
    "feels_like": rawData["main"]["feels_like"],
    "timezone": rawData["timezone"]
  }
  return myDict

#### Insert Data Function

In [27]:
def insertData(parsedData: dict):
    conn = dbConnect()
    try:
        cursor = conn.cursor()

        SQL = "INSERT INTO tbldata (city_id, lat, lon, temp, feels_like, city_name, country, timezone, date) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s);"
        data = (
            parsedData["city_id"],
            parsedData['lat'],
            parsedData["lon"],
            parsedData["temp"],
            parsedData["feels_like"],
            parsedData["city_name"],
            parsedData["country"],
            parsedData["timezone"],
            parsedData["date"],
            )
        cursor.execute(SQL, data)

        conn.commit()
        count = cursor.rowcount
        print(count, "Record inserted successfully into tbldata")

    except (Exception, psycopg2.Error) as error:
        print("Failed to insert record into tbldata", error)

    finally:
        # closing database connection.
        if conn:
            cursor.close()
            conn.close()
            print("PostgreSQL connection is closed")

#### API Call

In [28]:
rawData = requests.get(f"https://api.openweathermap.org/data/2.5/weather?lat={LAT}&lon={LON}&appid={API_KEY}&units={UNITS}").json()
rawData

{'coord': {'lon': 7.5906, 'lat': 50.3681},
 'weather': [{'id': 500,
   'main': 'Rain',
   'description': 'light rain',
   'icon': '10n'}],
 'base': 'stations',
 'main': {'temp': 12.84,
  'feels_like': 12.61,
  'temp_min': 11.92,
  'temp_max': 13.63,
  'pressure': 993,
  'humidity': 93},
 'visibility': 10000,
 'wind': {'speed': 15.65, 'deg': 22},
 'rain': {'1h': 0.21},
 'clouds': {'all': 100},
 'dt': 1698694920,
 'sys': {'type': 2,
  'id': 19193,
  'country': 'DE',
  'sunrise': 1698646557,
  'sunset': 1698682230},
 'timezone': 3600,
 'id': 2874772,
 'name': 'Lützel',
 'cod': 200}

In [29]:
parsedData = parsingRawData(rawData)
parsedData

{'date': datetime.datetime(2023, 10, 30, 20, 42),
 'city_name': 'Lützel',
 'city_id': 2874772,
 'country': 'DE',
 'lon': 7.5906,
 'lat': 50.3681,
 'base': 'stations',
 'temp': 12.84,
 'feels_like': 12.61,
 'timezone': 3600}

In [30]:
insertData(parsedData)

1 Record inserted successfully into tbldata
PostgreSQL connection is closed


# Export CSV

In [47]:
# start_of_day = datetime.combine(datetime.now(), time.min)
start_of_day = datetime.now().strftime('%Y-%m-%d')
print(start_of_day)

2024-01-07


In [56]:
def exportData():
    conn = dbConnect()
    try:
        start_of_day = datetime.combine(datetime.now(), time.min)
        end_of_day = datetime.combine(datetime.now(), time.max)
        cursor = conn.cursor()

        filename = 'tblweatherdata_' + datetime.now().strftime('%Y-%m-%d') + '.csv'
        csv_file = open(filename, 'w')

        SQL = f"""
        COPY (select * from tblweatherdata where date between '{start_of_day}' and '{end_of_day}') TO STDOUT  WITH DELIMITER ',' CSV HEADER;
        """
        cursor.copy_expert(sql=SQL, file=csv_file)

        conn.commit()

    except (Exception, psycopg2.Error) as error:
        print("Failed to export CSV", error)

    finally:
        # closing database connection.
        if conn:
            cursor.close()
            conn.close()
            print("PostgreSQL connection is closed")
exportData()

PostgreSQL connection is closed


# Snowflake

#### snowflake connection

In [23]:
import snowflake.connector
try:
    SF_Conn = snowflake.connector.connect(
        user=SN_USERNAME,
        password=SN_PASSWORD,
        account=SN_ACCOUNTNAME,
        warehouse=SN_WAREHOUSENAME,
        database=SN_DBNAME,
        schema=SN_SCHEMANAME
    )
except (Exception) as error:
    print(error)

#### snowflake create staging

In [24]:
SF_Conn.cursor().execute("""
create or replace stage my_postgres_stage
copy_options = (on_error='skip_file')
file_format = (type = 'CSV' field_delimiter = ',' skip_header = 1 field_optionally_enclosed_by='"');
""")

<snowflake.connector.cursor.SnowflakeCursor at 0x7f35171f3640>

#### snowflake PUT staging

In [None]:
SF_Conn.cursor().execute("""
PUT file:///home/azhar/repo/etl-weather/snowflakeQueries/tblweatherdata.csv @my_postgres_stage;
""")

#### snowflake execute staging to temporary table

In [None]:
SF_Conn.cursor().execute("""
COPY INTO "SF_OPENWEATHER"."PUBLIC"."weather_data_clone"
FROM '@"SF_OPENWEATHER"."PUBLIC"."MY_POSTGRES_STAGE"'
PATTERN = '.*tblweatherdata_.*[.]csv[.]gz'
FILE_FORMAT = (
    TYPE=CSV,
    SKIP_HEADER=1,
    FIELD_DELIMITER=',',
    TRIM_SPACE=FALSE,
    FIELD_OPTIONALLY_ENCLOSED_BY=NONE,
    REPLACE_INVALID_CHARACTERS=TRUE,
    DATE_FORMAT='YYYY-MM-DD',
    TIME_FORMAT=AUTO,
    TIMESTAMP_FORMAT='YYYY-MM-DD HH24:MI:SS'
)
ON_ERROR=CONTINUE
PURGE=TRUE;
""")

#### merge Source table into Target table
<!-- Transient || Temporary Table to Target||Final Table -->



In [None]:
SF_Conn.cursor().execute("""
MERGE INTO "weather_data" AS target
  USING (SELECT * FROM "weather_data_clone") AS source 
ON target."id" = source."id"
WHEN NOT MATCHED THEN INSERT (target."id", target."city_id", target."lon", target."lat", target."temp", target."feels_like", target."city_name", target."country", target."timezone", target."date") VALUES (source."id", source."city_id", source."lon", source."lat", source."temp", source."feels_like", source."city_name", source."country", source."timezone", source."date");
""")