In [1]:
!pip install requests pandas mysql-connector-python SQLAlchemy

Defaulting to user installation because normal site-packages is not writeable


In [3]:
import requests
import pandas as pd

# API Configuration
API_URL = "https://8b1gektg00.execute-api.us-east-1.amazonaws.com/default/engineer-test"
API_KEY = "GSJ3bdgMaoa8bHOXGs5YQHRYxmHMSx96WUmbVAjj"  
HEADERS = {"x-api-key": API_KEY}

# Define the request payload for January 2023 data
payload = {
    "start_date": "2023-01-01",
    "end_date": "2023-01-31"
}

def fetch_transactions():
    """Fetch transaction data from the API."""
    response = requests.post(API_URL, json=payload, headers=HEADERS)
    
    if response.status_code == 200:
        data = response.json()
        return pd.DataFrame(data)  # Convert to Pandas DataFrame
    else:
        print(f"Error: {response.status_code}, {response.text}")
        return pd.DataFrame()  # Return an empty DataFrame if request fails

# Fetch the data
df_raw = fetch_transactions()

# Display the first few rows
df_raw.head()


Unnamed: 0,customer_id,product_id,transaction_date,transaction_amount,transaction_type,spend_category,product_category
0,4,32,2023-01-01 13:07:24,168.44,Outgoing,Electronics,
1,6,32,2023-01-01 15:56:07,832.92,Outgoing,Electronics,
2,7,37,2023-01-01 03:08:20,67.06,Outgoing,Clothing,
3,3,34,2023-01-01 04:06:55,333.12,Outgoing,,
4,4,36,2023-01-01 11:30:27,348.9,Outgoing,Home,


In [5]:
# Convert 'transaction_date' to datetime format
df_raw['transaction_date'] = pd.to_datetime(df_raw['transaction_date'], errors='coerce')

# Drop rows where transaction_date is NaT (invalid date format)
df_raw = df_raw.dropna(subset=['transaction_date'])

# Drop duplicates
df_raw = df_raw.drop_duplicates()

# Filter out negative transaction amounts
df_cleaned = df_raw[df_raw['transaction_amount'] >= 0]

#Fix null values in product category by taking values from spend_category where it is null
df_cleaned['product_category'] = df_cleaned['product_category'].fillna(df_cleaned['spend_category'])

# Display cleaned data
df_cleaned.head()


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_cleaned['product_category'] = df_cleaned['product_category'].fillna(df_cleaned['spend_category'])


Unnamed: 0,customer_id,product_id,transaction_date,transaction_amount,transaction_type,spend_category,product_category
0,4,32,2023-01-01 13:07:24,168.44,Outgoing,Electronics,Electronics
1,6,32,2023-01-01 15:56:07,832.92,Outgoing,Electronics,Electronics
2,7,37,2023-01-01 03:08:20,67.06,Outgoing,Clothing,Clothing
3,3,34,2023-01-01 04:06:55,333.12,Outgoing,,
4,4,36,2023-01-01 11:30:27,348.9,Outgoing,Home,Home


In [7]:
def categorize_amount(amount):
    """Categorize transaction amount."""
    if amount < 50:
        return "Low"
    elif 50 <= amount <= 200:
        return "Medium"
    else:
        return "High"

# Apply categorization
df_cleaned['transaction_category'] = df_cleaned['transaction_amount'].apply(categorize_amount)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_cleaned['transaction_category'] = df_cleaned['transaction_amount'].apply(categorize_amount)


In [9]:
# Create a full independent copy
df_transformed = df_cleaned.copy()

# Apply categorization
df_transformed['transaction_category'] = df_transformed['transaction_amount'].apply(categorize_amount)

# Display the transformed DataFrame
df_transformed.head()


Unnamed: 0,customer_id,product_id,transaction_date,transaction_amount,transaction_type,spend_category,product_category,transaction_category
0,4,32,2023-01-01 13:07:24,168.44,Outgoing,Electronics,Electronics,Medium
1,6,32,2023-01-01 15:56:07,832.92,Outgoing,Electronics,Electronics,High
2,7,37,2023-01-01 03:08:20,67.06,Outgoing,Clothing,Clothing,Medium
3,3,34,2023-01-01 04:06:55,333.12,Outgoing,,,High
4,4,36,2023-01-01 11:30:27,348.9,Outgoing,Home,Home,High


In [11]:
# Calculate total transaction value per customer
customer_totals = df_transformed.groupby("customer_id")["transaction_amount"].sum().reset_index()
customer_totals.rename(columns={"transaction_amount": "total_transaction_value"}, inplace=True)

# Merge back into the transformed dataset
df_final = df_transformed.merge(customer_totals, on="customer_id", how="left")

# Display transformed data
df_final.head()


Unnamed: 0,customer_id,product_id,transaction_date,transaction_amount,transaction_type,spend_category,product_category,transaction_category,total_transaction_value
0,4,32,2023-01-01 13:07:24,168.44,Outgoing,Electronics,Electronics,Medium,274222.9
1,6,32,2023-01-01 15:56:07,832.92,Outgoing,Electronics,Electronics,High,387381.7
2,7,37,2023-01-01 03:08:20,67.06,Outgoing,Clothing,Clothing,Medium,97124600.0
3,3,34,2023-01-01 04:06:55,333.12,Outgoing,,,High,124239000.0
4,4,36,2023-01-01 11:30:27,348.9,Outgoing,Home,Home,High,274222.9


In [21]:
# Check for missing categories(Data Quality check)
missing_categories = df_final[df_final['product_category'].isnull() | df_final['spend_category'].isnull()]

if not missing_categories.empty:
    print(f"Warning: {len(missing_categories)} rows have missing categories!")
    # Log missing rows to a file
    missing_categories.to_csv('missing_categories.csv', index=False)
    #Note: This check filters rows where either the product_category or spend_category column is missing and logs them to a CSV file for further review.





In [25]:
# Check for negative transaction amounts(Data Quality check)
negative_transactions = df_final[df_final['transaction_amount'] < 0]

if not negative_transactions.empty:
    print(f"Warning: {len(negative_transactions)} rows have negative transaction amounts!")
    # Log negative rows to a file
    negative_transactions.to_csv('negative_transactions.csv', index=False)
    #No rows have negative transaction amounts as those were taken care of in the transformation phase


In [27]:
# Validate that 'transaction_amount' is numeric(Data Quality check)
if not pd.to_numeric(df_final['transaction_amount'], errors='coerce').notnull().all():
    print("Error: Some rows have invalid transaction amounts.")
    # Log invalid rows to a file
    df_invalid = df_final[pd.to_numeric(df_final['transaction_amount'], errors='coerce').isnull()]
    df_invalid.to_csv('invalid_transaction_amounts.csv', index=False)



In [13]:
!pip install mysql-connector-python

Defaulting to user installation because normal site-packages is not writeable


In [19]:
import pandas as pd
import mysql.connector
import numpy as np
from concurrent.futures import ThreadPoolExecutor

# Ensure 'transaction_date' is in MySQL-compatible format
df_final['transaction_date'] = pd.to_datetime(df_final['transaction_date'], errors='coerce').dt.strftime('%Y-%m-%d %H:%M:%S')

# Function to load a chunk of data
def load_data_chunk(chunk):
    try:
        conn = mysql.connector.connect(
            host="127.0.0.1",
            user="root",
            password="Assesment!23",
            database="transactions_db"
        )
        cursor = conn.cursor()

        insert_query = """
        INSERT INTO Customer.TransactionsOptimized (customer_id, product_id, transaction_amount, transaction_date, transaction_category, total_transaction_value, product_category, spend_category)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """

        # Ensure that the chunk has the correct column order and no missing values
        data_tuples = [
            (row.customer_id, row.product_id, row.transaction_amount, row.transaction_date, 
             row.transaction_category, row.total_transaction_value, row.product_category, row.spend_category)
            for row in chunk.itertuples(index=False, name="Row")  # Using named tuples
            if None not in (row.customer_id, row.product_id, row.transaction_amount, row.transaction_date,
                            row.transaction_category, row.total_transaction_value, row.product_category, row.spend_category)  # Ensuring no missing values
        ]

        if data_tuples:  # Only execute if there's data
            cursor.executemany(insert_query, data_tuples)
            conn.commit()

        cursor.close()
        conn.close()
    
    except Exception as e:
        print(f"Error loading data chunk: {e}")

# Define batch size (e.g., 10,000 rows per batch)
batch_size = 10000
chunks = [df_final.iloc[i:i + batch_size] for i in range(0, len(df_final), batch_size)]

# Execute the loading process in parallel
with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(load_data_chunk, chunks)

print("Data successfully loaded into MySQL using parallel processing!")


Data successfully loaded into MySQL using parallel processing!
