In [2]:
import pandas as pd
import sqlite3
import os
from datetime import datetime

class DataPipeline:
    def __init__(self, data_dir='./data', db_path='./ecommerce.db'):
        self.data_dir = data_dir
        self.db_path = db_path
        self.log_file = 'pipelinelog.txt'

        if not os.path.exists(data_dir):
            os.makedirs(data_dir)

        self.log(f"Pipeline initialized at {datetime.now()}")

    def log(self, message):
        """Simple logging function"""
        with open(self.log_file, 'a') as f:
            f.write(f"{datetime.now}: {message}\n")
        print(message)

    def extract(self):
        """Extract data from source files"""
        self.log("Starting extraction phase")
        try:
            self.customers = pd.read_csv('customers.csv')
            self.products = pd.read_csv('products.csv')
            self.orders = pd.read_csv('orders.csv')
            self.order_items = pd.read_csv('order_items.csv')
            self.log(f"Extracted {len(self.customers)} customers, {len(self.products)} products, {len(self.orders)} orders")
            return True
        except Exception as e:
            self.log(f"Extraction error: {str(e)}")
            return False
        
    def transform(self):
        """Transform and clean the data"""
        self.log("Starting transformation phase")
        try:


            self.orders['order_date'] = pd.to_datetime(self.orders['order_date'])
            self.customers['registration_date'] = pd.to_datetime(self.customers['registration_date'])

            self.orders['order_month'] = self.orders['order_date'].dt.month
            self.orders['order_year'] = self.orders['order_date'].dt.year
            
            # Example 3: Create customer segments based on order total
            customer_totals = self.orders.groupby('customer_id')['total'].sum().reset_index()
            customer_totals.columns = ['customer_id', 'lifetime_value']
            
            # Merge back to customers
            self.customers = pd.merge(self.customers, customer_totals, on='customer_id', how='left')
            self.customers['lifetime_value'] = self.customers['lifetime_value'].fillna(0)
            
            # Create segments
            self.customers['segment'] = pd.cut(
                self.customers['lifetime_value'],
                bins=[0, 100, 500, 1000, float('inf')],
                labels=['Low', 'Medium', 'High', 'VIP']
            )
            
            self.log("Transformation completed successfully")
            return True
        except Exception as e:
            self.log(f"Transformation error: {str(e)}")
            return False
        
    def load(self):
        """Load data into SQLite database"""
        self.log("Starting load phase")
        try:
            # Create connection to SQLite
            conn = sqlite3.connect(self.db_path)
            
            # Save transformed data to database
            self.customers.to_sql('customers', conn, if_exists='replace', index=False)
            self.products.to_sql('products', conn, if_exists='replace', index=False)
            self.orders.to_sql('orders', conn, if_exists='replace', index=False)
            self.order_items.to_sql('order_items', conn, if_exists='replace', index=False)
            
            # Create some views for analytics
            conn.execute('''
                CREATE VIEW IF NOT EXISTS monthly_sales AS
                SELECT order_year, order_month, SUM(total) as monthly_revenue, COUNT(*) as order_count
                FROM orders
                GROUP BY order_year, order_month
                ORDER BY order_year, order_month
            ''')
            
            conn.execute('''
                CREATE VIEW IF NOT EXISTS product_performance AS
                SELECT p.product_id, p.name, p.category, 
                       COUNT(oi.order_id) as times_ordered,
                       SUM(oi.quantity) as units_sold,
                       SUM(oi.total) as revenue
                FROM products p
                LEFT JOIN order_items oi ON p.product_id = oi.product_id
                GROUP BY p.product_id
                ORDER BY revenue DESC
            ''')
            
            conn.close()
            self.log("Data loaded successfully into SQLite database")
            return True
        except Exception as e:
            self.log(f"Load error: {str(e)}")
            return False
    
    def run_pipeline(self):
        """Run the complete ETL pipeline"""
        self.log("Starting pipeline execution")
        
        if not self.extract():
            self.log("Pipeline failed at extraction phase")
            return False
        
        if not self.transform():
            self.log("Pipeline failed at transformation phase")
            return False
        
        if not self.load():
            self.log("Pipeline failed at load phase")
            return False
        
        self.log("Pipeline completed successfully")
        return True

# Run the pipeline
if __name__ == "__main__":
    pipeline = DataPipeline()
    pipeline.run_pipeline()

Pipeline initialized at 2025-04-13 14:11:37.499098
Starting pipeline execution
Starting extraction phase
Extracted 1000 customers, 100 products, 4204 orders
Starting transformation phase
Transformation completed successfully
Starting load phase
Data loaded successfully into SQLite database
Pipeline completed successfully
