In [1]:
import pandas as pd
# Create a DataFrame with mock data
data = {
    'transaction_id': [1, 2, 3],
    'customer_id': [101, 102, 103],
    'product_id': [1001, 1002, 1003],
    'quantity': [2, 1, 5],
    'timestamp': ['2024-10-31 10:00:00', '2024-10-31 11:00:00', '2024-10-31 12:00:00']
}
df = pd.DataFrame(data)
df.to_csv('sales_data.csv', index=False)

In [2]:
df = pd.read_csv('sales_data.csv')
print(df)

   transaction_id  customer_id  product_id  quantity            timestamp
0               1          101        1001         2  2024-10-31 10:00:00
1               2          102        1002         1  2024-10-31 11:00:00
2               3          103        1003         5  2024-10-31 12:00:00


In [3]:
df['sale_date'] = pd.to_datetime(df['timestamp'])
df.drop(columns=['timestamp'], inplace=True)
print(df)

   transaction_id  customer_id  product_id  quantity           sale_date
0               1          101        1001         2 2024-10-31 10:00:00
1               2          102        1002         1 2024-10-31 11:00:00
2               3          103        1003         5 2024-10-31 12:00:00


In [4]:
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
import time

# Create a connection to the database. Using SQLite for simplicity, an engine with a timeout & serialized mode can help manage multiple connections
engine = create_engine('sqlite:///sales.db', connect_args={'timeout': 60, 'check_same_thread': False})  

# Load the data into the 'sales' table. Retry Logic: Implement a retry mechanism to handle the database is locked error by retrying the operation after a short delay.
retries = 5
for attempt in range(retries):
    try:
        # Use a context manager to ensure the connection is closed properly
        with engine.connect() as connection:
            df.to_sql('sales', con=connection, if_exists='append', index=False)
        break
    except OperationalError as e:
        if 'database is locked' in str(e):
            time.sleep(5)  # Wait for 5 seconds before retrying
        else:
            raise

In [5]:
import os

# Use environment variables for configuration
db_url = os.getenv('DATABASE_URL', 'sqlite:///sales.db')
file_path = os.getenv('FILE_PATH', 'sales_data.csv')

# Create a connection to the database
engine = create_engine(db_url, connect_args={'timeout': 60, 'check_same_thread': False})

# Read and load the data
df = pd.read_csv(file_path)
df['sale_date'] = pd.to_datetime(df['timestamp'])
df.drop(columns=['timestamp'], inplace=True)

# Retry logic to handle database is locked error
retries = 5
for attempt in range(retries):
    try:
        # Use a context manager to ensure the connection is closed properly
        with engine.connect() as connection:
            df.to_sql('sales', con=connection, if_exists='append', index=False)
        break
    except OperationalError as e:
        if 'database is locked' in str(e):
            time.sleep(5)  # Wait for 5 seconds before retrying
        else:
            raise


In [8]:
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)

# Use environment variables for configuration
db_url = os.getenv('DATABASE_URL', 'sqlite:///sales.db')
file_path = os.getenv('FILE_PATH', 'sales_data.csv')

# Create a connection to the database with a timeout
engine = create_engine(db_url, connect_args={'timeout': 60, 'check_same_thread': False})

# Retry logic to handle database is locked error
retries = 5
for attempt in range(retries):
    try:
        logging.info("ETL process started")
        
        # Read and load the data
        df = pd.read_csv(file_path)
        df['sale_date'] = pd.to_datetime(df['timestamp'])
        df.drop(columns=['timestamp'], inplace=True)
        
        # Use a context manager to ensure the connection is closed properly
        with engine.connect() as connection:
            df.to_sql('sales', con=connection, if_exists='append', index=False)
        
        logging.info("ETL process completed successfully")
        break
    except OperationalError as e:
        if 'database is locked' in str(e):
            logging.warning(f"Database is locked, retrying... ({attempt + 1}/{retries})")
            time.sleep(5)  # Wait for 5 seconds before retrying
        else:
            logging.error(f"OperationalError occurred: {e}")
            raise
    except Exception as e:
        logging.error(f"Unexpected error occurred: {e}")
        raise

INFO:root:ETL process started
INFO:root:ETL process started
INFO:root:ETL process started
INFO:root:ETL process started
INFO:root:ETL process started


In [9]:
from cryptography.fernet import Fernet

# Generate a key for encryption
key = Fernet.generate_key()
cipher_suite = Fernet(key)

# Encrypt customer IDs
df['customer_id'] = df['customer_id'].apply(lambda x: cipher_suite.encrypt(str(x).encode()).decode())

In [10]:
# Check for new columns and handle them
expected_columns = {'transaction_id', 'customer_id', 'product_id', 'quantity', 'sale_date'}
new_columns = set(df.columns) - expected_columns

if new_columns:
    logging.warning(f"New columns detected: {new_columns}")
    # Handle new columns appropriately