In [1]:
# Imports
import psycopg2
import pandas as pd
# Pipeline
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

In [2]:
# Pipeline
default_args = {
    'owner': 'Safaricom',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 10),
    'email': ['newton.kipngeno@student.moringaschool.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'equipment_pipeline',
    default_args=default_args,
    description='Pipeline for equipment data',
    schedule_interval=timedelta(days=1),
)

# Extract data
def extract_csv(filename):
  data = pd.read_csv(filename)
  return data

# transform
def transform_data(dataframe):
  # Remove duplicates
  dataframe = dataframe.drop_duplicates()
  # remove nulls
  dataframe = dataframe.dropna()
  return dataframe

def merge_equiment_reading(df1, df2):
    inner_join_df = pd.merge(df1,df2)
    return inner_join_df

#load equipment readings data to postgress
def load_equipment_readings(df):
    
    conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="data_engineering",
    user="newton",
    password="newton123"
)
    cur = conn.cursor()
    #Create table if it does not exist
    try:
        sql = "CREATE TABLE IF NOT EXISTS EQUIPMENT_READINGS (ID SERIAL PRIMARY KEY, equipment_id integer, date text, time text, network_sensor_reading text, equipment_sensor_reading text)"
        cur.execute(sql)
        conn.commit()
    except:
        return "Error creating table"
    #Insert data
    try:
        for index, row in df.iterrows():
            cur.execute("INSERT INTO EQUIPMENT_READINGS (equipment_id, date, time, network_sensor_reading, equipment_sensor_reading) VALUES(%s, %s, %s, %s, %s)", row['equipment_id'], row['date'], row['time'], row['network_sensor_reading'], row['equipment_sensor_reading'])
    except:
        return "Error inserting data"
    conn.close()
    
#load equipment maintenance data
def load_equipment_maintenance(df):
    
    conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="data_engineering",
    user="newton",
    password="newton123"
)
    cur = conn.cursor()
    #Create table if it does not exist
    try:
        sql = "CREATE TABLE IF NOT EXISTS EQUIPMENT_MAINTENANCE (ID SERIAL PRIMARY KEY, equipment_id integer, date text, time text, maintenance_type text)"
        cur.execute(sql)
        conn.commit()
    except:
        return "Error creating table"
    #Insert data
    try:
        for index, row in df.iterrows():
            cur.execute("INSERT INTO EQUIPMENT_MAINTENANCE (equipment_id, date, time, maintenance_type) VALUES(%s, %s, %s, %s)", row['equipment_id'], row['date'], row['time'], row['maintenance_type'])
    except:
        return "Error inserting data"
    conn.close()
    
extract = PythonOperator(
    task_id='extract_csv',
    python_callable=extract_csv,
    dag=dag,
)

transform = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

merge = PythonOperator(
    task_id='merge_equiment_reading',
    python_callable=merge_equiment_reading,
    dag=dag,
)

load1 = PythonOperator(
    task_id='load_equipment_readings',
    python_callable=load_equipment_readings,
    dag=dag,
)
load2 = PythonOperator(
    task_id='load_equipment_maintenance',
    python_callable=load_equipment_maintenance,
    dag=dag,
)

extract >> transform >> merge >> load1 >> load2