In [1]:
import pandas as pd
import pyodbc
from sqlalchemy import create_engine


In [2]:
pip install pymysql


Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install pyodbc

Note: you may need to restart the kernel to use updated packages.


# ETL Using Pandas

In [4]:
from sqlalchemy import create_engine
import pandas as pd

# MySQL connection details
username = "root"
password = "root"
host = "localhost"
port = "3306"
database = "companydb"

# Create a connection to MySQL database
engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}")

# Read data from the 'Employee' table into a Pandas DataFrame
df = pd.read_sql('SELECT * FROM Employee', con=engine)

# Convert date columns to datetime format
df['JoiningDate'] = pd.to_datetime(df['JoiningDate'])
df['Leaving_Org_Date'] = pd.to_datetime(df['Leaving_Org_Date'])

# Rename columns
df.rename(columns={'Leaving_Org_Date': 'Leaving_Org_Date'}, inplace=True)

# Add new columns
df['Annual_CTC'] = df['Salary'] * 12

df['Total_Months_Worked'] = ((df['Leaving_Org_Date'] - df['JoiningDate']).dt.days // 30)

df.to_sql('employee', con=engine, if_exists='replace', index=False)
df.to_csv('cleaned_data.csv', index=False)


In [5]:
df

Unnamed: 0,EmployeeID,Name,DepID,Salary,Age,JoiningDate,Leaving_Org_Date,manager_id,Annual_CTC,Total_Months_Worked
0,5,Emma,1,52000.0,32,2021-03-12,2025-06-15,1.0,624000.0,51
1,6,Frank,2,63000.0,29,2018-11-20,2023-02-28,2.0,756000.0,52
2,7,Grace,3,58000.0,33,2017-07-05,2022-09-10,3.0,696000.0,63
3,8,Hannah,1,49000.0,27,2022-01-30,2026-05-01,5.0,588000.0,51
4,9,Isaac,2,75000.0,42,2014-06-22,2019-10-15,,900000.0,64
5,10,Jack,3,57000.0,31,2019-02-15,2023-07-20,7.0,684000.0,53
6,11,Karen,1,51000.0,29,2020-09-10,2024-03-18,5.0,612000.0,42
7,12,Leo,2,72000.0,38,2016-12-05,2022-02-10,9.0,864000.0,63
8,13,Mia,3,75000.0,36,2015-04-25,2020-08-30,,900000.0,65
9,14,Noah,1,53000.0,30,2021-07-14,2024-12-01,11.0,636000.0,41


In [6]:
# Remove missing values
#df.dropna(inplace=True)

# Convert date column to datetime format
df['JoiningDate'] = pd.to_datetime(df['JoiningDate'])

df['Leaving_Org_Date'] = pd.to_datetime(df['Leaving_Org_Date'])

# Rename columns
df.rename(columns={'Leaving_Org_Date': 'Employee_Org_Leaving_Date'}, inplace=True)

# Add a new column
df['Annual_CTC'] = df['Salary'] * 12


In [7]:
df

Unnamed: 0,EmployeeID,Name,DepID,Salary,Age,JoiningDate,Employee_Org_Leaving_Date,manager_id,Annual_CTC,Total_Months_Worked
0,5,Emma,1,52000.0,32,2021-03-12,2025-06-15,1.0,624000.0,51
1,6,Frank,2,63000.0,29,2018-11-20,2023-02-28,2.0,756000.0,52
2,7,Grace,3,58000.0,33,2017-07-05,2022-09-10,3.0,696000.0,63
3,8,Hannah,1,49000.0,27,2022-01-30,2026-05-01,5.0,588000.0,51
4,9,Isaac,2,75000.0,42,2014-06-22,2019-10-15,,900000.0,64
5,10,Jack,3,57000.0,31,2019-02-15,2023-07-20,7.0,684000.0,53
6,11,Karen,1,51000.0,29,2020-09-10,2024-03-18,5.0,612000.0,42
7,12,Leo,2,72000.0,38,2016-12-05,2022-02-10,9.0,864000.0,63
8,13,Mia,3,75000.0,36,2015-04-25,2020-08-30,,900000.0,65
9,14,Noah,1,53000.0,30,2021-07-14,2024-12-01,11.0,636000.0,41


In [8]:
# Load data into MySQL database
df.to_sql('employee', con=engine, if_exists='replace', index=False)


20

In [9]:
df.to_csv('cleaned_data.csv', index=False)


In [10]:
pip install apache-airflow


Note: you may need to restart the kernel to use updated packages.


from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def extract():
    df = pd.read_csv('cleaned_data.csv')
    df.to_csv('extracted_data.csv', index=False)

def transform():
    df = pd.read_csv('extracted_data.csv')
    df.dropna(inplace=True)
    df.to_csv('transformed_data.csv', index=False)

def load():
    df = pd.read_csv('transformed_data.csv')
    df.to_sql('table_name', con=engine, if_exists='replace', index=False)

dag = DAG('etl_pipeline', schedule_interval='@daily', start_date=datetime(2024, 1, 1))

extract_task = PythonOperator(task_id='extract', python_callable=extract, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load, dag=dag)

extract_task >> transform_task >> load_task


In [12]:
import os
os.environ["NUMEXPR_MAX_THREADS"] = "16"  # Set to your CPU core count


In [13]:
from airflow import DAG
from airflow.operators.python import PythonOperator  # ✅ Corrected import
from datetime import datetime
import pandas as pd
from sqlalchemy import create_engine


# Define your MySQL connection details
username = "root"  # Your MySQL username
password = "root"  # Your MySQL password
host = "localhost"  # MySQL is running locally
port = "3306"  # Your MySQL port (default is 3306, but you are using 3307)
database = "companydb"  # Your database name

# Create a connection to MySQL database
engine = create_engine(f"mysql+pymysql://{username}:{password}@{host}:{port}/{database}")


def extract():
    df = pd.read_sql('SELECT * FROM Employee', con=engine)
    df.to_csv('extracted_data.csv', index=False)

def transform():
    # Convert date column to datetime format
    df['JoiningDate'] = pd.to_datetime(df['JoiningDate'])

    df['Leaving_Org_Date'] = pd.to_datetime(df['Leaving_Org_Date'])

    # Rename columns
    df.rename(columns={'Leaving_Org_Date': 'Employee_Org_Leaving_Date'}, inplace=True)

    # Add a new column
    df['Annual_CTC'] = df['Salary'] * 12
    
    df.to_csv('transformed_data.csv', index=False)

def load():
    # Load data into MySQL database
    df.to_sql('employee', con=engine, if_exists='replace', index=False)
    df.to_csv('cleaned_data.csv', index=False)

# ✅ Updated 'schedule_interval' to 'schedule'
dag = DAG(
    "etl_pipeline",
    schedule="@daily",  # ✅ Updated
    start_date=datetime(2024, 1, 1),
    catchup=False  # Optional: prevents backfilling old DAG runs
)

extract_task = PythonOperator(task_id="extract", python_callable=extract, dag=dag)
transform_task = PythonOperator(task_id="transform", python_callable=transform, dag=dag)
load_task = PythonOperator(task_id="load", python_callable=load, dag=dag)

extract_task >> transform_task >> load_task  # Set task dependencies


<Task(PythonOperator): load>