In [7]:
from airflow import DAG
from datetime import timedelta,datetime
from airflow.operators.python import PythonOperator

import pandas as pd
import requests
from sqlalchemy import create_engine

In [None]:
#DECLARATION OF VARIABLES
api_url = "https://randomuser.me/api/"
engine = create_engine('postgresql://airflow:airflow@host.docker.internal:5436/postgres')
params = {
    "results": 100,  # Number of random user records to fetch (minimum: 100)
    "nat": "au",     # Specify a nationality (e.g., "au" for Australia)
}

In [None]:
# Make an HTTP GET request to the API
response = requests.get(api_url, params=params)

def get_json_data(api_url):
    response = requests.get(api_url, params=params)
    data = response.json()
    return data

In [None]:
def extract_relevant_data(data):
    # Check if the request was successful
    if response.status_code == 200:
        # Parse the JSON response
        data = response.json()

        # Extract and structure the relevant data
        first_name = []
        last_name = []
        gender = []
        email = []
        date_of_birth = []
        country = []
        street_address = []
        city = []
        state = []
        postcode = []
        phone_cell = []
        
        user_data = []

            
        for user in data["results"]:
            first_name = user["name"]["first"]
            last_name = user["name"]["last"]
            gender = user["gender"]
            email = user["email"]
            date_of_birth = user["dob"]["date"][:10]  # Extract YYYY-MM-DD from the full date
            country = user["nat"]
            street_address = user["location"]["street"]
            city = user["location"]["city"]
            state = user["location"]["state"]
            postcode = user["location"]["postcode"]
            phone_cell = user["cell"]

            # Append the extracted data as a dictionary to the list
        user_dict = {
                
            "First_Name": first_name,
            "Last_Name": last_name,
            "Gender": gender,
            "Email": email,
            "Date_of_Birth": date_of_birth,
            "Country": country,
            "Street_Address": street_address,
            "City": city,
            "State": state,
            "Postcode": postcode,
            "Phone_Cell": phone_cell
                
        }
            
        user_data.append(user_dict)
        
        return user_data
        

    else:
        print("Failed to fetch data from the API. Status code:", response.status_code)

In [None]:
def create_dataframe(random_user_dict):
    new_df = random_user_dict[0]
    df = pd.DataFrame(new_df)
    
    return df.to_json()

In [None]:
def load_datawarehouse(df,table_name,connection):
    df_json = pd.read_json(df)
    df_json.to_sql(table_name, con=connection,if_exists='replace')

In [None]:
default_args = {
    'owner': 'Oluseyi',
    'retries': 2,
    'start_date':datetime(2023,9,15),
    'retry_delay': timedelta(minutes=2)
}

with DAG(
    dag_id = 'random_user_data_v3',
    description = 'list of citizens and their details',
    schedule_interval = '@daily', #CRON EXPRESSION
    default_args = default_args
    ) as dag:
    
    task1 = PythonOperator(
        task_id = 'get_json_data',
        python_callable = get_json_data,
        op_args = [api_url]       
    )
    
    task2 = PythonOperator(
    task_id = 'extract_relevant_data',
    python_callable = extract_relevant_data,
    op_args = [task1.output]       
    )
    
    task3 = PythonOperator(
    task_id = 'create_dataframe',
    python_callable = create_dataframe,
    op_args = [task2.output]       
    )    
    
    task4 = PythonOperator(
    task_id = 'load_datawarehouse',
    python_callable = load_datawarehouse,
    op_args = [task3.output,'random_user_data', engine]       
    ) 
    
    task1 >> task2 >> task3 >> task4