In [2]:
import pandas as pd

from datetime import datetime
from pathlib import Path
import os

from snowflake.connector.pandas_tools import pd_writer
import snowflake.connector.pandas_tools
import snowflake.connector

from dotenv import load_dotenv
# Load env variable (to get credentials)
load_dotenv(dotenv_path=Path().absolute().as_posix().split('working_files')[0]  + '/working_files/dags/.env')

True

In [3]:
# Note: This is for us to able to change where we run this code (either in my PC or Docker environment or from Airflow engine)
# In Docker env follows Linux - e.g /usr/local/airflow//working_files/data
# In PC follows Windows       - e.g. C:/Users/Azwan/Folder/DOSM/airflow/workingfiles/data/....

# In Docker
# -----------
# If we run manually from inside notebook in Docker, the notebook will be running from           /usr/local/airflow/working_files
# But if run from Airflow engine, it will run from entrypoint.sh $AIRFLOW_HOME variable which is /usr/local/airflow
# The following is a workaround

data_path = Path().absolute().as_posix().split('working_files')[0]  + '/working_files/data'
print(data_path)

c:/Users/AzwanDesktop/OneDrive/Teaching Materials/Invoke/Codes/w4_2_airflow_ETL/Airflow//working_files/data


In [1]:
# Define a function to load data and merge them from DOSM website
def load_df(date_of_file = '2022-01'):
    
    # Read from DOSM API using pandas - Surveys
    df = pd.read_parquet(f'https://storage.data.gov.my/pricecatcher/pricecatcher_{date_of_file}.parquet')
    # Convert date column into datetime type
    if 'date' in df.columns: df['date'] = pd.to_datetime(df['date'])
    print('Number of rows loaded...',len(df))

    # Premise
    premise = pd.read_parquet('https://storage.data.gov.my/pricecatcher/lookup_premise.parquet')
    print(len(premise))
    premise.head()

    # Items
    items = pd.read_parquet('https://storage.data.gov.my/pricecatcher/lookup_item.parquet')
    print(len(items))
    items.head()

    # Combine data
    merged_data_premise = df.merge(premise, how = 'left', left_on = 'premise_code', right_on = 'premise_code')
    merged_data = merged_data_premise.merge(items, how = 'left', left_on = 'item_code', right_on = 'item_code')

    # Only get AEON Subang Jaya data
    aeon_subang = merged_data.query('premise_code == 3178')
    aeon_subang.loc[:,['etl_time']] = datetime.now().isoformat()

    return aeon_subang

In [5]:
def read_df_push_to_sf():
    # Read AEON Subang data from parquet file which was saved previously
    data_path = Path().absolute().as_posix().split('working_files')[0]  + '/working_files/data'

    df_raw = pd.read_parquet(data_path + '/price_catcher_raw/aeon.parquet')
    # Connect to Snowflake via its API
    ctx = snowflake.connector.connect(  user=       os.environ['USER'],
                                        password=   os.environ['PASSWORD'],
                                        account=    os.environ['ACCOUNT'],
                                        warehouse=  os.environ['WH'],
                                        database=   os.environ['DB'],
                                        schema=     os.environ['SCHEMA'],
                                        role=       os.environ['ROLE'])
    
    # Write the filtered rows into Snowflake using the same connection credentials previously
    snowflake.connector.pandas_tools.write_pandas(
        conn = ctx,
        df = df_raw,
        table_name ='AEON',
        database = os.environ['DB'],
        schema = os.environ['SCHEMA'], 
        quote_identifiers=False
    )

In [6]:
# Instruction: Write a script to load all data from 2022-01 up until 2024-05 (source: https://open.dosm.gov.my/data-catalogue at PriceCatcher section)
# You may use a for-loop, manual code, or be creative to load all of the data


# Example, you can save the AEON Subang data to a parquet file then push to Snowflake
df = load_df(date_of_file = '2024-01')
df.to_parquet(data_path + '/price_catcher_raw/aeon.parquet')
read_df_push_to_sf()

Number of rows loaded... 2664434
2897
757


Unnamed: 0,date,premise_code,item_code,price,premise,address,premise_type,state,district,item,unit,item_group,item_category,etl_time
654641,2024-01-08,3178,24,8.9,AEON BIG SUBANG JAYA,"NO 3 JALAN SS16/1,47500 SUBANG JAYA, SELANGOR ...",Hypermarket,Selangor,Petaling,TEMBIKAI SUSU,1kg,BARANGAN SEGAR,BUAH-BUAHAN,2024-05-30T15:47:01.509803
2370845,2024-01-29,3178,1491,6.05,AEON BIG SUBANG JAYA,"NO 3 JALAN SS16/1,47500 SUBANG JAYA, SELANGOR ...",Hypermarket,Selangor,Petaling,BERAS PULUT THAILAND (SUSU) PELBAGAI JENAMA,1kg,BARANGAN BERBUNGKUS,BERAS,2024-05-30T15:47:01.509803
2083256,2024-01-24,3178,68,35.9,AEON BIG SUBANG JAYA,"NO 3 JALAN SS16/1,47500 SUBANG JAYA, SELANGOR ...",Hypermarket,Selangor,Petaling,IKAN PARI (KEPINGAN),1kg,BARANGAN SEGAR,BAHAN LAUT,2024-05-30T15:47:01.509803
495278,2024-01-06,3178,257,33.1,AEON BIG SUBANG JAYA,"NO 3 JALAN SS16/1,47500 SUBANG JAYA, SELANGOR ...",Hypermarket,Selangor,Petaling,MINYAK MASAK SEBATIAN CAP NEPTUNE,5 kg,BARANGAN BERBUNGKUS,MINYAK DAN LEMAK,2024-05-30T15:47:01.509803
2284726,2024-01-28,3178,1930,9.05,AEON BIG SUBANG JAYA,"NO 3 JALAN SS16/1,47500 SUBANG JAYA, SELANGOR ...",Hypermarket,Selangor,Petaling,TELUR AYAM KAMPUNG,10 biji,BARANGAN SEGAR,TELUR,2024-05-30T15:47:01.509803


  snowflake.connector.pandas_tools.write_pandas(
