<a href="https://colab.research.google.com/github/Chayansp/Data-Engineer/blob/main/LoadtoBigQuery.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Import Module
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.utils.dates import days_ago
import pandas as pd
import requests

In [None]:
#Default Arguments
default_args={
        "owner": "DataTH",
        "start_date": datetime(2015, 12, 1),
        "email": ["airflow@example.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        "schedule_interval": "@once",
    },

In [None]:
MYSQL_CONNECTION = "mysql_default"   # connection name in Airflow
CONVERSION_RATE_URL = "https://r2de2-workshop-vmftiryt6q-ts.a.run.app/usd_thb_conversion_rate"

# path
mysql_output_path = "/home/airflow/gcs/data/audible_data_merged.csv"
conversion_rate_output_path = "/home/airflow/gcs/data/conversion_rate.csv"
final_output_path = "/home/airflow/gcs/data/output.csv"


In [None]:
def get_data_from_mysql(transaction_path):
    mysqlserver = MySqlHook(MYSQL_CONNECTION)

    audible_data = mysqlserver.get_pandas_df(sql="SELECT * FROM audible_data")
    audible_transaction = mysqlserver.get_pandas_df(sql="SELECT * FROM audible_transaction")

    df = audible_transaction.merge(audible_data, how="left", left_on="book_id", right_on="Book_ID")

    df.to_csv(transaction_path, index=False)
    print(f"Output to {transaction_path}")

def get_conversion_rate(conversion_rate_path):
    r = requests.get(CONVERSION_RATE_URL)
    result_conversion_rate = r.json()
    df = pd.DataFrame(result_conversion_rate)

    df = df.reset_index().rename(columns={"index": "date"})
    df.to_csv(conversion_rate_path, index=False)
    print(f"Output to {conversion_rate_path}")

def merge_data(transaction_path, conversion_rate_path, output_path):
    transaction = pd.read_csv(transaction_path)
    conversion_rate = pd.read_csv(conversion_rate_path)

    transaction['date'] = transaction['timestamp']
    transaction['date'] = pd.to_datetime(transaction['date']).dt.date
    conversion_rate['date'] = pd.to_datetime(conversion_rate['date']).dt.date

    final_df = transaction.merge(conversion_rate, how="left", left_on="date", right_on="date")

    final_df["Price"] = final_df.apply(lambda x: x["Price"].replace("$",""), axis=1)
    final_df["Price"] = final_df["Price"].astype(float)

    final_df["THBPrice"] = final_df["Price"] * final_df["conversion_rate"]
    final_df = final_df.drop(["date", "book_id"], axis=1)

    final_df.to_csv(output_path, index=False)
    print(f"Output to {output_path}")



In [None]:
#Instantiate as DAG
with DAG(
    "workshop5_bq_load_dag",
    start_date=days_ago(1),
    schedule_interval="@once",
    tags=["workshop"]
) as dag:


In [None]:
#Tasks
    ##get data from mysql
    t1 = PythonOperator(
        task_id="get_data_from_mysql",
        python_callable=get_data_from_mysql,
        op_kwargs={"transaction_path" : mysql_output_path},
    )

    ##get data from API
    t2 = PythonOperator(
        task_id="get_conversion_rate",
        python_callable=get_conversion_rate,
        op_kwargs={"conversion_rate_path" : conversion_rate_output_path},
    )

    ##merge data
    t3 = PythonOperator(
        task_id="merge_data",
        python_callable=merge_data,
        op_kwargs={"transaction_path" : mysql_output_path,
                   "conversion_rate_path" : conversion_rate_output_path,
                   "output_path" : final_output_path},
    )
    ##load to Bigquery
    t4 = BashOperator(
        task_id="bq_load",
        bash_command="bq load\
                    --source_format=CSV --autoodetect\
                    workshop.audible_data1\
                    gs://[GCS_bucket]/data/[filename].csv",
    )


In [None]:
#Setting up Dependencies
    [t1, t2] >> t3 >> t4