In [1]:
## 1. Data Extraction

import csv
from datetime import datetime

# Function to read a CSV file and return its contents as a list of dictionaries
def read_csv_file(file_name):
    try:
        with open(file_name, mode='r', newline='') as file:
            reader = csv.DictReader(file)
            data = [row for row in reader]
        return data
    except FileNotFoundError:
        print(f"Error: The file {file_name} was not found.")
        return []
    except Exception as e:
        print(f"Error reading {file_name}: {e}")
        return []

# Extract data from the CSV files
sales_data = read_csv_file(r'D:\Dipesh\Hero Vired\Python\Project\2_Building simple ETL pipeline\CSV\sales.csv')
products_data = read_csv_file(r'D:\Dipesh\Hero Vired\Python\Project\2_Building simple ETL pipeline\CSV\products.csv')
customers_data = read_csv_file(r'D:\Dipesh\Hero Vired\Python\Project\2_Building simple ETL pipeline\CSV\customers.csv')
employees_data = read_csv_file(r'D:\Dipesh\Hero Vired\Python\Project\2_Building simple ETL pipeline\CSV\employees.csv')

In [3]:
## 2. Data Transformation

# If you need to clean sales data (handling missing values, fixing date formats)
def clean_sales_data(sales_data):
    cleaned_sales = []
    for sale in sales_data:
        # Fix date format if necessary (assuming sales.csv uses 'DD-MM-YYYY' format)
        sale['date'] = datetime.strptime(sale['date'], '%d-%m-%Y').strftime('%Y-%m-%d')
        # Convert quantity and price to correct types
        sale['quantity'] = int(sale['quantity']) if sale['quantity'] else 0
        sale['price'] = float(sale['price']) if sale['price'] else 0.0
        cleaned_sales.append(sale)
    return cleaned_sales

# Clean the sales data
cleaned_sales_data = clean_sales_data(sales_data)

# Function to integrate sales data with product and customer data (excluding employee data)
def integrate_sales_data(sales, products, customers):
    integrated_data = []
    for sale in sales:
        product = next((p for p in products if p['product_id'] == sale['product_id']), None)
        customer = next((c for c in customers if c['customer_id'] == sale['customer_id']), None)

        if product and customer:
            total_amount = int(sale['quantity']) * float(sale['price'])
            profit_margin = total_amount - (int(sale['quantity']) * float(product['cost_price']))

            integrated_data.append({
                'sale_id': sale['sale_id'],
                'product_name': product['product_name'],
                'category': product['category'],
                'customer_name': customer['name'],
                'quantity': sale['quantity'],
                'total_amount': total_amount,
                'profit_margin': profit_margin,
                'sale_date': sale['date'],
                'year': datetime.strptime(sale['date'], '%Y-%m-%d').year,
                'month': datetime.strptime(sale['date'], '%Y-%m-%d').month
            })
    return integrated_data

# Use the modified function for integrating sales data with product and customer data
integrated_sales_data = integrate_sales_data(cleaned_sales_data, products_data, customers_data)

In [5]:
## 3. Data Loading

# Function to write data to a CSV file
def write_csv_file(file_name, data, fieldnames):
    try:
        with open(file_name, mode='w', newline='') as file:
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data)
        print(f"Data written to {file_name} successfully.")
    except Exception as e:
        print(f"Error writing to {file_name}: {e}")

# Write the full integrated sales data to a CSV
fieldnames = ['sale_id', 'product_name', 'category', 'customer_name', 
              'quantity', 'total_amount', 'profit_margin', 'sale_date', 'year', 'month']
write_csv_file('integrated_sales_report.csv', integrated_sales_data, fieldnames)

# Generate a summary report (total sales by category)
summary_data = []
categories = set([sale['category'] for sale in integrated_sales_data])
for category in categories:
    total_sales = sum(sale['total_amount'] for sale in integrated_sales_data if sale['category'] == category)
    total_profit = sum(sale['profit_margin'] for sale in integrated_sales_data if sale['category'] == category)
    summary_data.append({'category': category, 'total_sales': total_sales, 'total_profit': total_profit})

# Write the summary report to a CSV
summary_fieldnames = ['category', 'total_sales', 'total_profit']
write_csv_file('sales_summary_report.csv', summary_data, summary_fieldnames)

Data written to integrated_sales_report.csv successfully.
Data written to sales_summary_report.csv successfully.


In [7]:
## 4. Data Analysis Functions
# Function to calculate total sales, transactions, and products sold
def calculate_total_sales(integrated_sales_data):
    total_sales = sum(sale['total_amount'] for sale in integrated_sales_data)
    total_transactions = len(integrated_sales_data)
    total_products_sold = sum(int(sale['quantity']) for sale in integrated_sales_data)
    return total_sales, total_transactions, total_products_sold

# Function to calculate total sales by product category
def sales_by_category(integrated_sales_data):
    category_sales = {}
    for sale in integrated_sales_data:
        category = sale['category']
        if category in category_sales:
            category_sales[category] += sale['total_amount']
        else:
            category_sales[category] = sale['total_amount']
    return category_sales

# Function to find top-selling products by revenue
def top_selling_products(integrated_sales_data, top_n=5):
    product_sales = {}
    for sale in integrated_sales_data:
        product = sale['product_name']
        if product in product_sales:
            product_sales[product] += sale['total_amount']
        else:
            product_sales[product] = sale['total_amount']
    
    # Sort products by total sales and return the top N
    sorted_products = sorted(product_sales.items(), key=lambda x: x[1], reverse=True)
    return sorted_products[:top_n]

# Function to find key customers by total purchase amount
def key_customers(integrated_sales_data, top_n=5):
    customer_sales = {}
    for sale in integrated_sales_data:
        customer = sale['customer_name']
        if customer in customer_sales:
            customer_sales[customer] += sale['total_amount']
        else:
            customer_sales[customer] = sale['total_amount']
    
    # Sort customers by total sales and return the top N
    sorted_customers = sorted(customer_sales.items(), key=lambda x: x[1], reverse=True)
    return sorted_customers[:top_n]

# Function to calculate monthly sales trends
def sales_trends_by_month(integrated_sales_data):
    monthly_sales = {}
    for sale in integrated_sales_data:
        month_year = f"{sale['month']}-{sale['year']}"
        if month_year in monthly_sales:
            monthly_sales[month_year] += sale['total_amount']
        else:
            monthly_sales[month_year] = sale['total_amount']
    
    # Sort by month and year for chronological order
    sorted_trends = sorted(monthly_sales.items(), key=lambda x: datetime.strptime(x[0], '%m-%Y'))
    return sorted_trends

# Function to calculate total profit
def calculate_total_profit(integrated_sales_data):
    total_profit = sum(sale['profit_margin'] for sale in integrated_sales_data)
    return total_profit

In [9]:
## 5. Generate Textual Report
def generate_report(integrated_sales_data):
    # Calculate core metrics
    total_sales, total_transactions, total_products_sold = calculate_total_sales(integrated_sales_data)
    category_sales = sales_by_category(integrated_sales_data)
    top_products = top_selling_products(integrated_sales_data)
    top_customers = key_customers(integrated_sales_data)
    sales_trends = sales_trends_by_month(integrated_sales_data)
    total_profit = calculate_total_profit(integrated_sales_data)
    
    # Generate the textual report
    report = f"--- Sales Analysis Report: September 2024 - December 2024 ---\n\n"
    
    # Overview of Data
    report += f"**Overview of the Data**\n"
    report += f"Total Sales: ${total_sales:.2f}\n"
    report += f"Total Transactions: {total_transactions}\n"
    report += f"Total Products Sold: {total_products_sold}\n\n"

    # Sales by Category
    report += f"**Sales by Product Category**\n"
    for category, sales in category_sales.items():
        report += f"- {category}: ${sales:.2f}\n"
    
    # Top-Selling Products
    report += f"\n**Top-Selling Products**\n"
    for product, sales in top_products:
        report += f"- {product}: ${sales:.2f}\n"
    
    # Key Customers
    report += f"\n**Top Customers**\n"
    for customer, sales in top_customers:
        report += f"- {customer}: ${sales:.2f}\n"

    # Sales Trends by Month
    report += f"\n**Sales Trends by Month**\n"
    for month_year, sales in sales_trends:
        report += f"- {month_year}: ${sales:.2f}\n"

    # Profit Analysis
    report += f"\n**Profit Analysis**\n"
    report += f"Total Profit: ${total_profit:.2f}\n"

    return report

In [11]:
## 6. Save or Print the Report
# Generate the report based on the integrated sales data
report = generate_report(integrated_sales_data)

# Print the report to the console
print(report)

# Optional: Save the report to a text file
with open('sales_analysis_report.txt', 'w') as report_file:
    report_file.write(report)

--- Sales Analysis Report: September 2024 - December 2024 ---

**Overview of the Data**
Total Sales: $18316.31
Total Transactions: 125
Total Products Sold: 369

**Sales by Product Category**
- Electronics: $12228.14
- Home & Office: $4649.07
- Stationery: $1439.10

**Top-Selling Products**
- Bluetooth Speaker: $9499.05
- LED Desk Lamp: $4649.07
- Wireless Mouse: $2729.09
- Notebook Set: $1439.10

**Top Customers**
- Jackson Carter: $999.90
- William Young: $799.92
- Matthew Clark: $599.94
- Jacob Adams: $599.94
- Michael Brown: $547.92

**Sales Trends by Month**
- 9-2024: $4215.14
- 10-2024: $4677.06
- 11-2024: $4541.10
- 12-2024: $4293.09
- 1-2025: $589.92

**Profit Analysis**
Total Profit: $5216.31

