# HiveOperator

Cu task-ul precedent ne-am mutat fișierul în HDFS. Datele însă de cele mai multe ori o să le stocăm în cadrul unui table, într-o bază de date. Cu ajutorul operatorului Hive o să ne creem un tabel în HDFS pentru a stoca datele. Ca să putem face acest lucru avem nevoie de următoarele:

- hql = codul HQl care să fie executat

- hive_cli_conn_id = conexiunea către hive 

Conexiunea către Hive arată așa:

<img src="../../ss/airflow-section-03/section-03-ss-09.png">

De asemena, Hive a fost instalat în momentul în care am rulat acel docker-compose file.

Acum că avem și conexiunea, putem să ne creem task-ul prin care să se creeze acest fișiser în HDFS. Codul o să arate așa:

In [None]:
from ariflow.providers.apache.hive.operators.hive import HiveOperator

create_forex_rates_table_hdfs = HiveOperator(
    task_id="create_forex_rates_table_hdfs",
    hive_cli_conn_id="hive_conn",
    hql="""
        CREATE EXTERNAL TABLE IF NOT EXISTS forex_rates(
            base STRING,
            last_update DATE,
            eur DOUBLE,
            usd DOUBLE,
            nzd DOUBLE,
            gbp DOUBLE,
            jpy DOUBLE,
            cad DOUBLE
            )
        ROW FORMAT DELIMITED
        FIELDS TERMINATED BY ','
        STORED AS TEXTFILE
    """
)

Codul final este următorul:

In [None]:
from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.hive.operators.hive import HiveOperator

import json
import csv
import requests
from datetime import datetime, timedelta


# Define the default_args
default_args = {
    "owner": "airflow",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": False,
    "email": "georgiuandrei05@gmail.com"
}

# Define the function that donwloads the data from the API
def download_rates():
    BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
    ENDPOINTS = {
        'USD': 'api_forex_exchange_usd.json',
        'EUR': 'api_forex_exchange_eur.json'
    }
    with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies:
        reader = csv.DictReader(forex_currencies, delimiter=';')
        for idx, row in enumerate(reader):
            base = row['base']
            with_pairs = row['with_pairs'].split(' ')
            indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
            outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
            for pair in with_pairs:
                outdata['rates'][pair] = indata['rates'][pair]
            with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile:
                json.dump(outdata, outfile)
                outfile.write('\n')

# Define the DAG
with DAG(
    dag_id = 'test_forex_data_pipeline',
    description='Testing the Data Forex Pipeline from Udemy Course',
    schedule_interval="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False
    ) as dag:
    
    # Define the HttpSensor task
    is_forex_rates_available = HttpSensor(
        task_id="is_forex_rates_available",
        http_conn_id="forex_api",
        endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b",
        response_check=lambda response: "rates" in response.text,
        poke_interval=5,
        timeout=25
    )

    # Define the FileSensor task
    is_forex_file_available = FileSensor(
        task_id="is_forex_file_available",
        fs_conn_id="forex_path",
        filepath="forex_currencies.csv",
        poke_interval=5,
        timeout=20
    )

    # Define the PythonOperator task
    download_forex_rates = PythonOperator(
        task_id="download_forex_rates",
        python_callable=download_rates
    )

    # Define the BashOperator task
    save_forex_rates_to_hdfs = BashOperator(
        task_id="save_forex_rates_to_hdfs",
        bash_command="""
            hdfs dfs -mkdir -p /forex && \
            hdfs dfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex
        """
    )

    # Define the HiveOperator task
    create_forex_rates_table_hdfs = HiveOperator(
        task_id="create_forex_rates_table_hdfs",
        hive_cli_conn_id="hive_conn",
        hql="""
            CREATE EXTERNAL TABLE IF NOT EXISTS forex_rates(
                base STRING,
                last_update DATE,
                eur DOUBLE,
                usd DOUBLE,
                nzd DOUBLE,
                gbp DOUBLE,
                jpy DOUBLE,
                cad DOUBLE
                )
            ROW FORMAT DELIMITED
            FIELDS TERMINATED BY ','
            STORED AS TEXTFILE
        """
    )



Acest table poate fi văzut în HUE la secțiunea de tabele

<img src="../../ss/airflow-section-03/section-03-ss-10.png">