In [9]:
import pandas as pd 
import mysql.connector as sql

# 1. Extract Data from CSV

def extract_data(FilePath):
    df = pd.read_csv(FilePath)
    return df

# 2. Transformation data

def transform_data(df):
    #remove value
    df = df.dropna(subset = ['emp_id','email'])
    
    # Fill missing last names with 'Unknown'
    df['last_name'] = df['last_name'].fillna('unknown')
    
    # Fix email format (remove rows without @)
    df = df[df['email'].str.contains('@')]
    
    # Fill missing salary with average salary
    df['salary'] = df['salary'].fillna(df['salary'].mean())
    
    #all valaues make capital in column department
    df['department'] = df['department'].str.upper()
    
    #remove duplicate value
    df.drop_duplicates(inplace = True)
    
    return df

# 3. Load the data in mysql db

def connection(host,user,password,database):
    con = sql.connect(
        host=host,
        user=user,
        password=password,
        database=database
    )
    return con

def load_data_db(df,con):
    
    cursor=con.cursor()
    
    def insert(df):
        query = "insert into employee_details (emp_id,first_name,last_name,email,salary,department) values(%s,%s,%s,%s,%s,%s)"
        values = [
                    int(df['emp_id']),df['first_name'],df['last_name'],df['email'],float(df['salary']),df['department']
        ]
        cursor.execute(query,values)
        con.commit()
        #print(f'inserted {cursor.rowcount} record')
    
    def update(df):
        id = df['emp_id']
        query = f"update employee_details set first_name=%s , last_name=%s , email=%s , salary=%s , department=%s where emp_id=%s"
        values = [
            df['first_name'],df['last_name'],df['email'],float(df['salary']),df['department'],int(id)
        ]
        cursor.execute(query,values)
        con.commit()
        #print(f'updated {cursor.rowcount} record')
    
    count = 0;
    for x in range(len(df)):
        #print(df.iloc[x]['emp_id'])
        emp_id = df.iloc[x]['emp_id']
        query = f'SELECT count(1) FROM EMPLOYEE_DETAILS WHERE EMP_ID={emp_id}'
        #print(query)
        cursor.execute(query)
        (result,) = cursor.fetchone()

        if result == 0:
            insert(df.iloc[x])
            count = count + 1
            #print('insert')
        else:
            update(df.iloc[x])
            count = count + 1
            #print('update')
    
    print(f'{count : } records inserted/updated')
    
# 4. Main Etl function

def run_etl():
    host = "localhost"
    user="root"
    password = "Ankit@123"
    database = "employee"
    
    #File = "employees_2000.csv"
    File = "employees_data.csv"
    File_Path = f"D:\Ankit\Projects\Python\Files\{File}"
    
    conn = connection(host , user , password , database)
    ex_data=extract_data(File_Path)
    transformdata=transform_data(ex_data)
    #print(transformdata)    
    load_data_db(transformdata,conn)
    print("ETL pipeline completed successfully!")
    
if __name__ == "__main__":
    run_etl()

 1491 records inserted/updated
ETL pipeline completed successfully!
