<a href="https://colab.research.google.com/github/KNVIDYASHREE/Powerbi-desktop/blob/main/Untitled19.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# using install command of pip package
!pip install -U pandas numpy sqlalchemy pydantic requests apache-airflow

Collecting sqlalchemy
  Downloading sqlalchemy-2.0.43-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.6 kB)


In [2]:
# import requests library
import requests

# API URL to fetch data from
url = '<a href="https://api.hevoexampleapi.com/data" target="_blank" rel="noreferrer noopener nofollow">https://api.hevoexampleapi.com/data</a>'

# Function that calls the API, fetches the data, and returns a JSON Object
def fetch_data_from_api(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"API request failed with status code {response.status_code}")

In [3]:
# importing pandas and numpy used for data transformation
import pandas as pd
import numpy as np

# Function definition for data transformation logic
def transform_data(df):
    # Example transformation: Fill missing values
    df.fillna(0, inplace=True)

    # Example transformation: Add a calculated column
    df['adjusted_value'] = df['value'] * np.log(df['quantity'] + 1)

    return df

In [4]:
# importing BaseModel from the pydantic package
from pydantic import BaseModel, ValidationError, conint

# Defining the data model based on the required data schema
class DataModel(BaseModel):
    name: str
    age: conint(gt=0)
    city: str

# Function definition for data validation across the data model.
def validate_data(data):
    try:
        return DataModel(**data)
    except ValidationError as e:
        print(e.json())
        return None

In [5]:
# importing create_engine function from sqlalchemy package
from sqlalchemy import create_engine

# function definition to write transformed data to the database
def store_data_in_db(df, connection_string, table_name):
    engine = create_engine(connection_string)
    df.to_sql(table_name, con=engine, if_exists='replace', index=False)

In [9]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import pandas as pd
import numpy as np
from pydantic import BaseModel, ValidationError, conint
from sqlalchemy import create_engine
import requests

# API URL to fetch data from
url = 'https://api.hevoexampleapi.com/data'

# Function that calls the API, fetches the data, and returns a JSON Object
def fetch_data_from_api(url):
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(f"API request failed with status code {response.status_code}")

# Function definition for data transformation logic
def transform_data(df):
    # Example transformation: Fill missing values
    df.fillna(0, inplace=True)

    # Example transformation: Add a calculated column
    df['adjusted_value'] = df['value'] * np.log(df['quantity'] + 1)

    return df

# Defining the data model based on the required data schema
class DataModel(BaseModel):
    name: str
    age: conint(gt=0)
    city: str

# Function definition for data validation across the data model.
def validate_data(data):
    try:
        return DataModel(**data)
    except ValidationError as e:
        print(e.json())
        return None

# function definition to write transformed data to the database
def store_data_in_db(df, connection_string, table_name):
    engine = create_engine(connection_string)
    df.to_sql(table_name, con=engine, if_exists='replace', index=False)

# Default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 9, 7),
    'retries': 1,
}

# Defining a DAG
dag = DAG(
    'simple_data_pipeline',
    default_args=default_args,
    description='A simple data pipeline using Airflow',
    schedule='@daily',
)

# Python operator for extraction task
extract_task = PythonOperator(
    task_id='extract',
    # Calling python function
    python_callable=fetch_data_from_api,
    dag=dag,
)

# Python operator for transform task
transform_task = PythonOperator(
    task_id='transform',
    # Calling Python function
    python_callable=transform_data,
    dag=dag,
)

# Python operator for data validation
validation_task = PythonOperator(
    task_id ='data-validation',
    # Calling Python function
    python_callable=validate_data,
    dag=dag
)

# Python operator for load task
load_task = PythonOperator(
    task_id='load',
    # Calling python function
    python_callable=store_data_in_db,
    dag=dag,
)

# Airflow task dependency
extract_task >> transform_task >> validation_task >> load_task

<Task(PythonOperator): load>