In [1]:
import pandas as pd

In [2]:
import psycopg2

In [16]:
df = pd.read_csv("sales.csv", encoding="latin1")


#### Extract 

In [None]:
# Extracting the file
def extract():
    df = pd.read_csv("sales.csv", encoding = "latin1")
    return df

### Transform

In [29]:
# Transforming and cleaning the dataset
def transform(df):
    # drop unwanted columns
    columns_to_drop = ["ORDERLINENUMBER", "STATUS", "QTR_ID", "MONTH_ID", "YEAR_ID", "MSRP",
        "PHONE", "ADDRESSLINE1", "ADDRESSLINE2", "CITY", "STATE", "POSTALCODE",
        "COUNTRY", "TERRITORY", "CONTACTLASTNAME", "CONTACTFIRSTNAME", "DEALSIZE"]
    df = df.drop(columns=columns_to_drop, errors = "ignore")
    # removing duplicate
    df = df.drop_duplicates()
    # Handling missing values
    df = df.dropna(subset=["ORDERNUMBER", "ORDERDATE", "PRODUCTCODE", "CUSTOMERNAME", "PRICEEACH", "QUANTITYORDERED"])
    # convert date format
    df["ORDERDATE"] = pd.to_datetime(df["ORDERDATE"], errors="coerce")
    df = df[df["ORDERDATE"].notna()]  # drop rows where date conversion failed
    # Standardize text
    df["PRODUCTLINE"] = df["PRODUCTLINE"].str.title()
    df["CUSTOMERNAME"] = df["CUSTOMERNAME"].str.title()
    # create total revenue column
    df["TOTALREVENUE"] = df["QUANTITYORDERED"] * df["PRICEEACH"]
    # filter year 2004 only
    df = df[df["ORDERDATE"].dt.year == 2004]
    return df


In [42]:
clean_data = transform(df)
print(clean_data.shape)

(1345, 9)


### Load

In [None]:
# Load data cleaned
def load(df):
    connection = psycopg2.connect(
            host = "localhost",
            dbname = "Sales_ETL",
            user = "postgres",
            password = "xyz"
    )

    con = connection.cursor()

    for _, row in df.iterrows():
        con.execute("""
                    INSERT INTO Sales_data(
                    ORDERNUMBER, QUANTITYORDERED, PRICEEACH, SALES,
                    ORDERDATE, PRODUCTLINE, PRODUCTCODE, CUSTOMERNAME, TOTALREVENUE)
                    VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
                    ON CONFLICT (ORDERNUMBER, PRODUCTCODE) DO NOTHING """, tuple (row)
                    )
        
        connection.commit()
        con.close()
        connection.close()

### Combined 

In [None]:
#putting each processes in a function block
def extract():
    df = pd.read_csv("sales.csv", encoding = "latin1")
    return df

def transform(df):
    # drop unwanted columns
    columns_to_drop = ["ORDERLINENUMBER", "STATUS", "QTR_ID", "MONTH_ID", "YEAR_ID", "MSRP",
        "PHONE", "ADDRESSLINE1", "ADDRESSLINE2", "CITY", "STATE", "POSTALCODE",
        "COUNTRY", "TERRITORY", "CONTACTLASTNAME", "CONTACTFIRSTNAME", "DEALSIZE"]
    df = df.drop(columns=columns_to_drop, errors = "ignore")
    # removing duplicate
    df = df.drop_duplicates()
    # Handling missing values
    df = df.dropna(subset=["ORDERNUMBER", "ORDERDATE", "PRODUCTCODE", "CUSTOMERNAME", "PRICEEACH", "QUANTITYORDERED"])
    # convert date format
    df["ORDERDATE"] = pd.to_datetime(df["ORDERDATE"], errors="coerce")
    df = df[df["ORDERDATE"].notna()]  # drop rows where date conversion failed
    # Standardize text
    df["PRODUCTLINE"] = df["PRODUCTLINE"].str.title()
    df["CUSTOMERNAME"] = df["CUSTOMERNAME"].str.title()
    # create total revenue column
    df["TOTALREVENUE"] = df["QUANTITYORDERED"] * df["PRICEEACH"]
    # filter year 2004 only
    df = df[df["ORDERDATE"].dt.year == 2004]
    return df

def load(df):
    try:
        connection = psycopg2.connect(
                host = "localhost",
                dbname = "Sales_ETL",
                user = "postgres",
                password = "xyz"
        )

        con = connection.cursor()

        for _, row in df.iterrows():
            con.execute("""
                        INSERT INTO Sales_data(
                        ORDERNUMBER, QUANTITYORDERED, PRICEEACH, SALES,
                        ORDERDATE, PRODUCTLINE, PRODUCTCODE, CUSTOMERNAME, TOTALREVENUE)
                        VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) 
                        ON CONFLICT (ORDERNUMBER, PRODUCTCODE) DO NOTHING """,  
                        (
                            row["ORDERNUMBER"],
                            row["QUANTITYORDERED"],
                            row["PRICEEACH"],
                            row["SALES"],
                            row["ORDERDATE"],
                            row["PRODUCTLINE"],
                            row["PRODUCTCODE"],
                            row["CUSTOMERNAME"],
                            row["TOTALREVENUE"]
                        ))
            
        connection.commit()
        con.close()
        connection.close()

        print("Data loaded successfully into Sales_data table.")

    except Exception as e:
        print("Error", e)
    
if __name__ == "__main__":
    raw_data = extract()
    clean_data = transform(raw_data)
    load(clean_data)



Data loaded successfully into Sales_data table.
