# Step 1: Extract Data

In [119]:

import csv
import os

def extract(file_paths):
    data = []
    for file_path in file_paths:
        try:
            with open(file_path, mode='r') as file:
                reader = csv.DictReader(file)
                for row in reader:
                    data.append(row)
        except FileNotFoundError:
            print(f"Error: File {file_path} not found.")
        except csv.Error as e:
            print(f"Error reading {file_path}: {e}")
    return data
        

# Step 2: Transform Data

In [68]:
def clean_data(data):
    # Fill missing values and correct data types
    for row in data:
        for key, value in row.items():
            if value == '' or value is None:
                row[key] = 'Unknown'
    return data

In [116]:
def integrate_data(sales, products, customers, employees):
    integrated_data = []
    for sale in sales:
        product = next((p for p in products if p['product_id'] == sale['product_id']), {})
        customer = next((c for c in customers if c['customer_id'] == sale['customer_id']), {})
        employee = next((e for e in employees if e['employee_id'] == sale['employee_id']), {})
        
        integrated_row = {
            **sale,
            **product,
            **customer,
            **employee,
            'TotalSaleAmount': float(sale.get('Quantity', 0)) * float(product.get('Price', 0)),
            'ProfitMargin': float(product.get('ProfitMargin', 0))
        }
        integrated_data.append(integrated_row)
    return integrated_data

def create_date_dimension(data):
    for row in data:
        sale_date = datetime.strptime(row['SaleDate'], '%Y-%m-%d')
        row['Month'] = sale_date.month
        row['Quarter'] = (sale_date.month - 1) // 3 + 1
        row['Year'] = sale_date.year
    return data

# Step 3: Data Loading

In [108]:
def load_data(data, output_path):
    try:
        keys = data[0].keys()
        with open(output_path, 'w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=keys)
            writer.writeheader()
            writer.writerows(data)
        print(f"Data loading successful to {output_path}")
    except Exception as e:
        print(f"Error in data loading: {e}")


# Step 4: Analyze Data

In [109]:
def analyze_data(data):
    category_sales = defaultdict(float)
    for row in data:
        category_sales[row['Category']] += row['TotalSaleAmount']

    print("Total sales by product category:")
    for category, total in category_sales.items():
        print(f"{category}: {total}")

# ETL process

In [118]:
def etl_process():
    # Extract
    sales = extract_data('Sales.csv')
    products = extract_data('Product.csv')
    customers = extract_data('Customer.csv')
    employees = extract_data('Employee.csv')

    if not all([sales, products, customers, employees]):
        return

    # Transform
    sales = clean_data(sales)
    products = clean_data(products)
    customers = clean_data(customers)
    employees = clean_data(employees)

    integrated_data = integrate_data(sales, products, customers, employees)
    transformed_data = create_date_dimension(integrated_data)

    # Load
    load_data(transformed_data, 'Comprehensive_Sales_Report.csv')

    # Analyze
    analyze_data(transformed_data)

# Execute the ETL process
etl_process()

Data extraction successful for Sales.csv
Data extraction successful for Product.csv
Data extraction successful for Customer.csv
Data extraction successful for Employee.csv


KeyError: 'employee_id'