In [1]:
#1.Data Extraction-Reading data from all four input CSV files and handling errors for missing files.
#Libraries required
import csv
import os

def read_csv(file_path):
    # Checking if the file exists
    if not os.path.isfile(file_path):
        print(f"Error: {file_path} not found.")
        return []
    # if file exists, open the file and read the data
    with open(file_path, mode='r') as file:
            reader = csv.DictReader(file)
            return list(reader)
   
        
#reading data from all the files and printing first 5 entries to check
sales_data = read_csv(r'E:\Hero Vired\Python ETL project\sales.csv')
print("Sales Data (first 5 entries):")
print(sales_data[0:5])

products_data = read_csv(r'E:\Hero Vired\Python ETL project\Product.csv')
print("\nProducts Data (first 5 entries):")
print(products_data[0:5])

customers_data = read_csv(r'E:\Hero Vired\Python ETL project\Customer.csv')
print("\nCustomers Data (first 5 entries):")
print(customers_data[0:5])

employees_data = read_csv(r'E:\Hero Vired\Python ETL project\employee.csv')
print("\nEmployees Data (first 5 entries):")
print(employees_data[0:5])

Sales Data (first 5 entries):
[{'sale_id': '1001', 'date': '01-09-2024', 'customer_id': 'C101', 'product_id': 'P201', 'quantity': '2', 'price': '29.99'}, {'sale_id': '1002', 'date': '02-09-2024', 'customer_id': 'C102', 'product_id': 'P202', 'quantity': '1', 'price': '49.99'}, {'sale_id': '1003', 'date': '02-09-2024', 'customer_id': 'C103', 'product_id': 'P203', 'quantity': '3', 'price': '15.99'}, {'sale_id': '1004', 'date': '03-09-2024', 'customer_id': 'C101', 'product_id': 'P204', 'quantity': '1', 'price': '99.99'}, {'sale_id': '1005', 'date': '04-09-2024', 'customer_id': 'C104', 'product_id': 'P201', 'quantity': '2', 'price': '29.99'}]

Products Data (first 5 entries):
[{'product_id': 'P201', 'product_name': 'Wireless Mouse', 'category': 'Electronics', 'cost_price': '20'}, {'product_id': 'P202', 'product_name': 'LED Desk Lamp', 'category': 'Home & Office', 'cost_price': '35'}, {'product_id': 'P203', 'product_name': 'Notebook Set', 'category': 'Stationery', 'cost_price': '10'}, {'prod

In [30]:
#2. Data Transformation-To check for missing values
def check_for_missing_values(data, dataset_name):
    missing_found = False  # Flag to track if any missing values are found
    
    for i, row in enumerate(data):  # Iterate over each row in the dataset
        for key, value in row.items():  # Iterate over each key (column) in the row
            # Check for None or empty string
            if value is None or value == '':
                print(f"Missing value found in {dataset_name}, row {i + 1}, column '{key}': {value}")
                missing_found = True  # Set the flag to True if a missing value is found
    #print if there is no missing files
    if not missing_found:
        print(f"No missing values found in the dataset: {dataset_name}.")

# reading the files and calling the function for checking missing values
sales_data = read_csv(r'E:\Hero Vired\Python ETL project\sales.csv')
check_for_missing_values(sales_data, "sales_data")

products_data = read_csv(r'E:\Hero Vired\Python ETL project\Product.csv')
check_for_missing_values(products_data, "products_data")

customers_data = read_csv(r'E:\Hero Vired\Python ETL project\Customer.csv')
check_for_missing_values(customers_data, "customers_data")

employees_data = read_csv(r'E:\Hero Vired\Python ETL project\employee.csv')
check_for_missing_values(employees_data, "employees_data")


No missing values found in the dataset: sales_data.
No missing values found in the dataset: products_data.
No missing values found in the dataset: customers_data.
No missing values found in the dataset: employees_data.


In [32]:
#function to count missing values
def count_missing_values(data):
    missing_count = 0
    for row in data:
        for key in row:
            if row[key] == '':  # Check for empty strings
                missing_count += 1
    return missing_count

# Function to handle missing values
def handle_missing_values(data, default_value=None):
    """Replace missing values (empty strings) in the dataset with a default value."""
    for row in data:
        for key in row:
            if row[key] == '':  # Check for empty strings
                row[key] = default_value  # Replace with the default value
    return data

# Count missing values before handling
print("Missing values before handling:")
print(f"Sales Data: {count_missing_values(sales_data)}")
print(f"Products Data: {count_missing_values(products_data)}")
print(f"Customers Data: {count_missing_values(customers_data)}")
print(f"Employees Data: {count_missing_values(employees_data)}")

# Calling the function for handling missing values
sales_data = handle_missing_values(sales_data)
products_data = handle_missing_values(products_data)
customers_data = handle_missing_values(customers_data)
employees_data = handle_missing_values(employees_data)


# Count missing values after handling
print("\nMissing values after handling:")
print(f"Sales Data: {count_missing_values(sales_data)}")
print(f"Products Data: {count_missing_values(products_data)}")
print(f"Customers Data: {count_missing_values(customers_data)}")
print(f"Employees Data: {count_missing_values(employees_data)}")

Missing values before handling:
Sales Data: 0
Products Data: 0
Customers Data: 0
Employees Data: 0

Missing values after handling:
Sales Data: 0
Products Data: 0
Customers Data: 0
Employees Data: 0


In [35]:
#To correct data types through convert_value.
#Libraries required
from datetime import datetime
import csv

def convert_value(value, target_type):
    """Convert value to the specified target type."""
    if value is None or value == '' or value=='None': #if there is no value,or if it's an empty string or if a string like none is present
        return None #return value as none
    try:
        if target_type == int:
            return int(value)
        elif target_type == float:
            return float(value)
        elif target_type == datetime: #for all dates, conerting to datetime object
            if isinstance(value, datetime): #to check if the value is already a datetime object
                return value  #Return the existing date time object
            #print(f"Converting date: '{value}'") #debugging
            return datetime.strptime(value.strip(), '%d-%m-%Y') # to remove any leading or trailing spaces
        else:
            return str(value)  # Default to string for any other type
    except ValueError as e:
        print(f"Error converting value '{value}' to {target_type}: {e}")
        return None 
    
#Function to correct data types
def convert_data_types(sales_data, products_data, customers_data, employees_data):
    # Convert data types for sales_data
    for row in sales_data:
        row['sale_id'] = convert_value(row['sale_id'], int)  # sale_id is an integer
        row['customer_id'] = convert_value(row['customer_id'], str)  # customer_id is a string
        row['product_id'] = convert_value(row['product_id'], str)  # product_id is a string
        row['quantity'] = convert_value(row['quantity'], int) #qty is integer
        row['price'] = convert_value(row['price'], float) #price contains decimal
        row['date'] = convert_value(row['date'], datetime) 

    # Convert data types for products_data
    for row in products_data:
        row['product_id'] = convert_value(row['product_id'], str)  # product_id is a string
        row['product_name'] = convert_value(row['product_name'], str) # product_name is a string
        row['category'] = convert_value(row['category'], str) # category is a string
        row['cost_price'] = convert_value(row['cost_price'], float) #cost_price contains decimal

    # Convert data types for customers_data
    for row in customers_data:
        row['customer_id'] = convert_value(row['customer_id'], str)  #customer_id is a string
        row['name'] = convert_value(row['name'], str) #name is a string
        row['email'] = convert_value(row['email'], str) #email is a string
        row['address'] = convert_value(row['address'], str) #address is a string
        row['join_date'] = convert_value(row['join_date'], datetime)

    # Convert data types for employees_data
    for row in employees_data:
        row['employee_id'] = str(row['employee_id'])  # employee_id is a string
        row['name'] = convert_value(row['name'], str) #name is a string
        row['department'] = convert_value(row['department'], str) #department is a string
        row['hire_date'] = convert_value(row['hire_date'], datetime)
        row['salary'] = convert_value(row['salary'], float) #salary contains decimal


# Calling the function for correcting data types and printing first 5 rows to check
convert_data_types(sales_data, products_data, customers_data, employees_data)
print("First 5 rows of sales_data:")
for row in sales_data[:5]:
    print(row)
    
print("\nFirst 5 rows of products_data:")
for row in products_data[:5]:
    print(row)

print("\nFirst 5 rows of customers_data:")
for row in customers_data[:5]:
    print(row)

print("\nFirst 5 rows of employees_data:")
for row in employees_data[:5]:
    print(row)

First 5 rows of sales_data:
{'sale_id': 1001, 'date': datetime.datetime(2024, 9, 1, 0, 0), 'customer_id': 'C101', 'product_id': 'P201', 'quantity': 2, 'price': 29.99}
{'sale_id': 1002, 'date': datetime.datetime(2024, 9, 2, 0, 0), 'customer_id': 'C102', 'product_id': 'P202', 'quantity': 1, 'price': 49.99}
{'sale_id': 1003, 'date': datetime.datetime(2024, 9, 2, 0, 0), 'customer_id': 'C103', 'product_id': 'P203', 'quantity': 3, 'price': 15.99}
{'sale_id': 1004, 'date': datetime.datetime(2024, 9, 3, 0, 0), 'customer_id': 'C101', 'product_id': 'P204', 'quantity': 1, 'price': 99.99}
{'sale_id': 1005, 'date': datetime.datetime(2024, 9, 4, 0, 0), 'customer_id': 'C104', 'product_id': 'P201', 'quantity': 2, 'price': 29.99}

First 5 rows of products_data:
{'product_id': 'P201', 'product_name': 'Wireless Mouse', 'category': 'Electronics', 'cost_price': 20.0}
{'product_id': 'P202', 'product_name': 'LED Desk Lamp', 'category': 'Home & Office', 'cost_price': 35.0}
{'product_id': 'P203', 'product_name

In [36]:
#Integrating sales data with products_data and customers_data.Also, calculating derived fields like total sale amount and profit margin.
#function to integrate data
def integrate_data(sales_data, products_data, customers_data):
    # Create lookup dictionaries for products, customers, and employees
    product_lookup = {p['product_id']: p for p in products_data}
    customer_lookup = {c['customer_id']: c for c in customers_data}
#initializing empty dataset
    integrated_data = []
    for sale in sales_data:
        product = product_lookup.get(sale['product_id'], {})
        customer = customer_lookup.get(sale['customer_id'], {})
         # Employee info cant be retrieved as there is nothing common in sales and employee files.

        if product and customer:  # Ensure all mappings exist
            total_amount = int(sale['quantity']) * float(sale['price']) #formula to derive total amount
            cost_price = product['cost_price'] 
            profit_margin = (total_amount - (cost_price * int(sale['quantity']))) / total_amount #formula to derive profit margin
# add to the empty dataset
            integrated_data.append({
                "sale_id": sale['sale_id'],
                "date": sale['date'],
                "customer_id": sale['customer_id'],
                "quantity": int(sale['quantity']),
                "price": float(sale['price']),
                "product_id": sale['product_id'],
                "product_name": product['product_name'],
                "category": product['category'],
                "customer_name": customer['name'],
                "total_amount": total_amount,
                "profit_margin": profit_margin,
            })

    return integrated_data

# Integrate the data
integrated_sales_data = integrate_data(sales_data, products_data, customers_data)

# Print the first 5 entries of the integrated data
print("Integrated Sales Data (first 5 entries):")
for record in integrated_sales_data[:5]:  # Slicing to get the first 5 records
    print(record)

Integrated Sales Data (first 5 entries):
{'sale_id': 1001, 'date': datetime.datetime(2024, 9, 1, 0, 0), 'customer_id': 'C101', 'quantity': 2, 'price': 29.99, 'product_id': 'P201', 'product_name': 'Wireless Mouse', 'category': 'Electronics', 'customer_name': 'Alex Johnson', 'total_amount': 59.98, 'profit_margin': 0.3331110370123374}
{'sale_id': 1002, 'date': datetime.datetime(2024, 9, 2, 0, 0), 'customer_id': 'C102', 'quantity': 1, 'price': 49.99, 'product_id': 'P202', 'product_name': 'LED Desk Lamp', 'category': 'Home & Office', 'customer_name': 'Emily Davis', 'total_amount': 49.99, 'profit_margin': 0.2998599719943989}
{'sale_id': 1003, 'date': datetime.datetime(2024, 9, 2, 0, 0), 'customer_id': 'C103', 'quantity': 3, 'price': 15.99, 'product_id': 'P203', 'product_name': 'Notebook Set', 'category': 'Stationery', 'customer_name': 'Michael Brown', 'total_amount': 47.97, 'profit_margin': 0.37460913070669166}
{'sale_id': 1004, 'date': datetime.datetime(2024, 9, 3, 0, 0), 'customer_id': 'C1

In [37]:
#Creating date dimension with time based attributes
#Libraries required
from datetime import datetime, timedelta
import csv

#date dimension only for sales file
sales_data = read_csv(r'E:\Hero Vired\Python ETL project\sales.csv')

#function to create date dimension
def create_date_dimension(sales_data):
    # Extract the date range from sales data
    min_date = min(datetime.strptime(row['date'], '%d-%m-%Y') for row in sales_data)
    max_date = max(datetime.strptime(row['date'], '%d-%m-%Y') for row in sales_data)

    # Initialize an empty list to hold the date dimension
    date_dimension = []

    # Loop through each date in the range
    current_date = min_date
    while current_date <= max_date:
        # Extract attributes
        year = current_date.year
        month = current_date.month
        month_name = current_date.strftime('%B')
        quarter = (month - 1) // 3 + 1
        day = current_date.day
        day_of_week = current_date.weekday()  # Monday=0, Sunday=6
        day_name = current_date.strftime('%A')
        is_weekend = day_of_week >= 5  # Saturday=5, Sunday=6

        # Append the attributes as a dictionary
        date_dimension.append({
            'date': current_date.strftime('%d-%m-%Y'),  # Format date as string
            'year': year,
            'month': month,
            'month_name': month_name,
            'quarter': quarter,
            'day': day,
            'day_of_week': day_of_week,
            'day_name': day_name,
            'is_weekend': is_weekend
        })

        # Move to the next day
        current_date += timedelta(days=1)

    return date_dimension

# Create the date dimension only for sales file
date_dimension_list = create_date_dimension(sales_data)



# Print the first 5 entries of the date dimension
print("Date Dimension (first 5 entries):")
for entry in date_dimension_list[:5]:
    print (entry)

Date Dimension (first 5 entries):
{'date': '01-09-2024', 'year': 2024, 'month': 9, 'month_name': 'September', 'quarter': 3, 'day': 1, 'day_of_week': 6, 'day_name': 'Sunday', 'is_weekend': True}
{'date': '02-09-2024', 'year': 2024, 'month': 9, 'month_name': 'September', 'quarter': 3, 'day': 2, 'day_of_week': 0, 'day_name': 'Monday', 'is_weekend': False}
{'date': '03-09-2024', 'year': 2024, 'month': 9, 'month_name': 'September', 'quarter': 3, 'day': 3, 'day_of_week': 1, 'day_name': 'Tuesday', 'is_weekend': False}
{'date': '04-09-2024', 'year': 2024, 'month': 9, 'month_name': 'September', 'quarter': 3, 'day': 4, 'day_of_week': 2, 'day_name': 'Wednesday', 'is_weekend': False}
{'date': '05-09-2024', 'year': 2024, 'month': 9, 'month_name': 'September', 'quarter': 3, 'day': 5, 'day_of_week': 3, 'day_name': 'Thursday', 'is_weekend': False}


In [None]:
#3. Data Loading
# Function to write a CSV file
def write_csv(file_path, data, fieldnames):
    with open(file_path, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

# Generate comprehensive sales report
comprehensive_report_path = r'E:\Hero Vired\Python project\comprehensive_sales_report.csv'
write_csv(comprehensive_report_path, integrated_sales_data, integrated_sales_data[0].keys())
print(f"Comprehensive sales report generated at: {comprehensive_report_path}")

# Generate summary report
summary_report = []
summary_lookup = {}

for record in integrated_sales_data:
    customer_id = record['customer_id']
    product_id = record['product_id']
    total_amount = record['total_amount']
    quantity = record['quantity']

    # Create a unique key for each customer-product combination
    key = (customer_id, product_id)

    if key not in summary_lookup:
        summary_lookup[key] = {
            'customer_id': customer_id,
            'customer_name': record['customer_name'],
            'product_id': product_id,
            'product_name': record['product_name'],
            'total_sales': 0,
            'total_quantity': 0
        }

    summary_lookup[key]['total_sales'] += total_amount
    summary_lookup[key]['total_quantity'] += quantity

# Convert summary lookup to a list
for key, summary in summary_lookup.items():
    summary_report.append({
        'customer_id': summary['customer_id'],
        'customer_name': summary['customer_name'],
        'product_id': summary['product_id'],
        'product_name': summary['product_name'],
        'total_sales': summary['total_sales'],
        'total_quantity': summary['total_quantity']
    })

# Write summary report to CSV
summary_report_path = r'E:\Hero Vired\Python project\summary_sales_report.csv'
write_csv(summary_report_path, summary_report, summary_report[0].keys())
print(f"Summary sales report generated at: {summary_report_path}")

In [21]:
#4. Data Analysis
#Libraries and modules required
from collections import defaultdict
from datetime import datetime
# Create a lookup dictionary for products
product_lookup = {p['product_id']: p for p in products_data}

# 1. Calculate total sales and profit by product category
category_sales = defaultdict(lambda: {'total_sales': 0, 'total_profit': 0})

for record in integrated_sales_data:
    category = record['category']
    total_amount = record['total_amount']
    
    # Use the product_id to get the product details from the lookup dictionary
    product = product_lookup.get(record['product_id'], {})
    cost_price = float(product.get('cost_price', 0))  # Default to 0 if not found
    profit = total_amount - (cost_price * record['quantity'])

    category_sales[category]['total_sales'] += total_amount
    category_sales[category]['total_profit'] += profit

print("\nTotal Sales and Profit by Product Category:")
for category, values in category_sales.items():
    print(f"Category: {category}, Total Sales: {values['total_sales']:.2f}, Total Profit: {values['total_profit']:.2f}")

# 2. Identify top-selling products
product_sales = defaultdict(lambda: {'total_sales': 0, 'total_quantity': 0})

for record in integrated_sales_data:
    product_id = record['product_id']
    total_amount = record['total_amount']
    quantity = record['quantity']

    product_sales[product_id]['total_sales'] += total_amount
    product_sales[product_id]['total_quantity'] += quantity

top_selling_products = sorted(product_sales.items(), key=lambda x: x[1]['total_sales'], reverse=True)[:5]

print("\nTop-Selling Products:")
for product_id, values in top_selling_products:
    product_name = product_lookup.get(product_id, {}).get('product_name', 'Unknown')  # Use lookup dictionary
    print(f"Product: {product_name}, Total Sales: {values['total_sales']:.2f}, Total Quantity: {values['total_quantity']}")

# 2. Identify key customers
customer_sales = defaultdict(lambda: {'total_sales': 0, 'total_quantity': 0})

for record in integrated_sales_data:
    customer_id = record['customer_id']
    total_amount = record['total_amount']
    quantity = record['quantity']

    customer_sales[customer_id]['total_sales'] += total_amount
    customer_sales[customer_id]['total_quantity'] += quantity

key_customers = sorted(customer_sales.items(), key=lambda x: x[1]['total_sales'], reverse=True)[:5]

print("\nKey Customers:")
for customer_id, values in key_customers:
    customer_name = next((c['name'] for c in customers_data if c['customer_id'] == customer_id), "Unknown")
    print(f"Customer ID: {customer_id}, Name: {customer_name}, Total Sales: {values['total_sales']:.2f}, Total Quantity: {values['total_quantity']}")
# 3. Analyze sales trends over time (daily, monthly, quarterly).
    
from collections import defaultdict
from datetime import datetime

# Initialize dictionaries to hold sales data for trends
daily_sales = defaultdict(float)
monthly_sales = defaultdict(float)
quarterly_sales = defaultdict(float)

# Iterate through the integrated sales data to calculate sales trends
for record in integrated_sales_data:
    date = record.get('date')  # Safely get the date
    if date is None:
        print("Warning: Date is None for record:", record)
        continue  # Skip this record

    # Check if date is already a datetime object
    if isinstance(date, datetime):
        date_obj = date  # Use the existing datetime object
    else:
        # Convert date from dd-mm-yyyy to datetime object
        try:
            date_obj = datetime.strptime(date, '%d-%m-%Y')  # Assuming input is in dd-mm-yyyy format
        except ValueError as e:
            print(f"Error parsing date '{date}': {e}")
            continue  # Skip this record if date parsing fails

    # Format the date for daily sales
    formatted_date = date_obj.strftime('%Y-%m-%d')

    # Daily sales
    daily_sales[formatted_date] += record['total_amount']

    # Monthly sales
    month = formatted_date[:7]  # Extract YYYY-MM format
    monthly_sales[month] += record['total_amount']

    # Quarterly sales
    year = formatted_date[:4]
    month_num = int(formatted_date[5:7])
    quarter = (month_num - 1) // 3 + 1
    quarterly_key = f"{year}-Q{quarter}"
    quarterly_sales[quarterly_key] += record['total_amount']
    
print("\nBrief summary report")
# Print daily sales trends
print("\nDaily Sales Trends:")
for date, total in sorted(daily_sales.items()):
    print(f"Date: {date}, Total Sales: {total:.2f}")

# Print monthly sales trends
print("\nMonthly Sales Trends:")
for month, total in sorted(monthly_sales.items()):
    print(f"Month: {month}, Total Sales: {total:.2f}")

# Print quarterly sales trends
print("\nQuarterly Sales Trends:")
for quarter, total in sorted(quarterly_sales.items()):
    print(f"Quarter: {quarter}, Total Sales: {total:.2f}")


Total Sales and Profit by Product Category:
Category: Electronics, Total Sales: 12228.14, Total Profit: 3283.14
Category: Home & Office, Total Sales: 4649.07, Total Profit: 1394.07
Category: Stationery, Total Sales: 1439.10, Total Profit: 539.10

Top-Selling Products:
Product: Bluetooth Speaker, Total Sales: 9499.05, Total Quantity: 95
Product: LED Desk Lamp, Total Sales: 4649.07, Total Quantity: 93
Product: Wireless Mouse, Total Sales: 2729.09, Total Quantity: 91
Product: Notebook Set, Total Sales: 1439.10, Total Quantity: 90

Key Customers:
Customer ID: C123, Name: Jackson Carter, Total Sales: 999.90, Total Quantity: 10
Customer ID: C111, Name: William Young, Total Sales: 799.92, Total Quantity: 8
Customer ID: C107, Name: Matthew Clark, Total Sales: 599.94, Total Quantity: 6
Customer ID: C119, Name: Jacob Adams, Total Sales: 599.94, Total Quantity: 6
Customer ID: C103, Name: Michael Brown, Total Sales: 547.92, Total Quantity: 8

Brief summary report

Daily Sales Trends:
Date: 2024-0