In [120]:
# ---- Read the sales_data.csv file ----- #

import pandas as pd
import sqlalchemy as db


sales_data = pd.read_csv('sales_data.csv') # provide actual path for the csv file


# Display the first few rows of the dataframe
print(sales_data.head())


# Transform before loading to database
sales_data.columns = [x.lower() for x in sales_data.columns]
sales_data['transaction_date'] = pd.to_datetime(sales_data['transaction_date'], format='mixed')


print(sales_data.dtypes)
print(sales_data.head(80))


   Transaction_ID  Product_ID  Quantity   Price Transaction_Date
0               1        1003         2    4.85       05/05/2022
1               2        1002         3   66.37       2022-12-06
2               3        1001        10  142.40       2022-08-06
3               4        1001        10   14.22       2022-12-29
4               5        1001         7  110.56       2022-10-20
transaction_id               int64
product_id                   int64
quantity                     int64
price                      float64
transaction_date    datetime64[ns]
dtype: object
    transaction_id  product_id  quantity   price transaction_date
0                1        1003         2    4.85       2022-05-05
1                2        1002         3   66.37       2022-12-06
2                3        1001        10  142.40       2022-08-06
3                4        1001        10   14.22       2022-12-29
4                5        1001         7  110.56       2022-10-20
..             ...       

In [96]:
# ---- Read the exchange_rates using API ----- #

import requests

url = 'https://postman.com/api/exchange_rates'# Replace with actual endpoint
headers = {'Authorization': 'YOUR_ACCESS_TOKEN'}  # Replace with actual token


response = requests.get(url, headers=headers)

if response.status_code == 200:
    exchange_rates = response.json()
    exchange_rates_df = pd.DataFrame(exchange_rates)
    print(exchange_rates_df.head())
else:
    print(f"Error: {response.status_code}")

    

# Transform before loading to database
exchange_rates_df.columns = [x.lower() for x in exchange_rates_df.columns]
exchange_rates_df['date'] = pd.to_datetime(exchange_rates_df['date'], format='%Y-%m-%d')

print(exchange_rates_df.head())



  Currency_Code  Exchange_Rate        Date
0           AUD         1.0467  2022-10-10
1           EUR         0.8946  2022-01-21
2           EUR         0.1472  2022-04-07
3           JPY         0.6749  2022-08-25
4           AUD         0.5259  2022-04-18
  currency_code  exchange_rate       date
0           AUD         1.0467 2022-10-10
1           EUR         0.8946 2022-01-21
2           EUR         0.1472 2022-04-07
3           JPY         0.6749 2022-08-25
4           AUD         0.5259 2022-04-18


In [97]:
# ---- Read the customer_data using API ----- #

import requests

url = 'https://postman.com/api/customer_data'  # Replace with actual endpoint
headers = {'Authorization': 'YOUR_ACCESS_TOKEN'}  # Replace with actual token

response = requests.get(url, headers=headers)

if response.status_code == 200:
    customer_data = response.json()
    customer_data_df = pd.DataFrame(customer_data)
    print(customer_data_df.head())

else:
    print(f"Error: {response.status_code}")
    

# Transform before loading to database
customer_data_df.columns = [x.lower() for x in customer_data_df.columns]
customer_data_df['date_joined'] = pd.to_datetime(customer_data_df['date_joined'], format='%Y-%m-%d')


print(customer_data_df.head())





   Customer_ID  Customer_Name   Age      Gender    Location Date_Joined
0            1       John Doe  95.0                 Unknown  2020-07-04
1            2  Charlie Brown  43.0                   Texas  2020-02-04
2            3  Alice Johnson  64.0      Female  California  2020-11-29
3            4       John Doe  89.0                 Florida  2020-01-01
4            5  Alice Johnson  18.0  Non-binary    New York  2020-05-22
   customer_id  customer_name   age      gender    location date_joined
0            1       John Doe  95.0                 Unknown  2020-07-04
1            2  Charlie Brown  43.0                   Texas  2020-02-04
2            3  Alice Johnson  64.0      Female  California  2020-11-29
3            4       John Doe  89.0                 Florida  2020-01-01
4            5  Alice Johnson  18.0  Non-binary    New York  2020-05-22


In [122]:
# ---- Read the products and transactions tables from Postgresql ----- #

import sqlalchemy as db

# Create a database engine
engine = db.create_engine('postgresql://postgres:18118011@localhost:5432/postgres')

# Query the data
products_df = pd.read_sql('SELECT * FROM products', engine)
transactions_df = pd.read_sql('SELECT * FROM transactions', engine)



print(products_df.head())
print(transactions_df.head())


# --- Write to PostgreSQL --- #

# sales_data.to_sql('sales', engine, if_exists='append', index=False)
# customer_data_df.to_sql('customers', engine, if_exists='append', index=False)
# exchange_rates_df.to_sql('exchange_rates', engine, if_exists='append', index=False)

   product_id product_name     category   price  stock_available
0           1    Product_1     Clothing  121.34              348
1           2    Product_2  Electronics  104.49              591
2           3    Product_3  Electronics  126.17              368
3           4    Product_4        Books   44.70              309
4           5    Product_5        Books  162.85              816
   transaction_id  customer_id  product_id  quantity transaction_date  \
0               1          168         813         3       2022-03-05   
1               2          759         250         5             None   
2               3          681         786         2       2022-12-29   
3               4          230         291         1       2022-08-12   
4               5          551         774         8       2022-11-30   

   total_amount  
0        242.07  
1         -8.30  
2        185.06  
3        -16.51  
4           NaN  


In [124]:


def fill_missing_values(df):
    for col in df:
        if df[col].dtype == int or df[col].dtype == float:
            df[col].fillna(0,inplace=True)
        elif df[col].dtype == 'datetime64[ns]':
            df.fillna('2000-01-01', inplace=True)
    return df

def remove_duplicates(df):
    return df.drop_duplicates()


def handle_abnormal_values(df):
    for col in df:
        if df[col].dtype == int or df[col].dtype == float:
            df[col] = df[col].abs()
    return df


def run_pipeline(sales_data,products_df,transactions_df):
    
    sales_data = fill_missing_values(sales_data)
    sales_data = handle_abnormal_values(sales_data)
    sales_data = remove_duplicates(sales_data)
    
    products_df = fill_missing_values(products_df)
    products_df = handle_abnormal_values(products_df)
    products_df = remove_duplicates(products_df)
    
    # Calculate 'Total_Amount' where it's missing using products table
    merged_df = transactions_df.merge(products_df[['product_id', 'price']], on='product_id', how='left')
    merged_df['total_amount'].fillna(merged_df['quantity'] * merged_df['price'], inplace=True)
    transactions_df = merged_df.drop(columns=['price'])
    
    transactions_df = fill_missing_values(transactions_df)
    transactions_df['quantity'] = transactions_df['quantity'].abs()
    transactions_df['total_amount'] = transactions_df['total_amount'].abs()
    transactions_df = remove_duplicates(transactions_df)
        
    return transactions_df



processed_transactions = run_pipeline(sales_data,products_df,transactions_df)
print(sales_data.head(80))
print(products_df.head(20))
print(processed_transactions)


# --- Write to Database --- #
# engine = db.create_engine('postgresql://username:password@localhost:5432/dbname')
# sales_data.to_sql('processed_sales', engine, if_exists='append', index=False)
# processed_transactions.to_sql('processed_transactions', engine, if_exists='append', index=False)
# products_df.to_sql('processed_products', engine, if_exists='append', index=False)

KeyError: 'transaction_date'