<a href="https://colab.research.google.com/github/PRONAY-gh/Hero-Vired/blob/main/ETL_Project_5th_Jan_2025.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [22]:
# Import
import csv

def read_csv(file_path):
    """Reads data from a CSV file into a list of dictionaries."""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:  # Specify UTF-8 encoding
            reader = csv.DictReader(file)
            data = list(reader)
            return data
    except FileNotFoundError:
        print(f"Error: File not found - {file_path}")
        return None
    except Exception as e:
        print(f"Error reading {file_path}: {e}")
        return None

# File paths
sales_file = '/content/sample_data/sales.csv'
products_file = '/content/sample_data/Product.csv'
customers_file = '/content/sample_data/Customer.csv'
employees_file = '/content/sample_data/employee.csv'

# Extract data
sales = read_csv(sales_file)
products = read_csv(products_file)
customers = read_csv(customers_file)
employees = read_csv(employees_file)

# Print the first 5 rows of each list of dictionaries
def print_first_5_rows(data, data_name):
    print(f"\n{data_name} data:")
    if data:
        for i in range(min(5, len(data))):
            print(data[i])
    else:
        print(f"No data available for {data_name}")

print_first_5_rows(sales, "Sales")
print_first_5_rows(products, "Products")
print_first_5_rows(customers, "Customers")
print_first_5_rows(employees, "Employees")

def clean_sales_data(sales):
    """Cleans sales data by removing duplicates and handling missing values"""
    seen_ids = set()
    cleaned_data = []
    for sale in sales:
        sale_id = sale.get('\ufeffsale_id')  # Use the correct key for sale_id
        # Check if sale_id is valid, not seen before, and all values in the sale are present
        if sale_id and sale_id not in seen_ids and all(sale.values()):
            seen_ids.add(sale_id)
            cleaned_data.append(sale)
    return cleaned_data

# Clean sales data
cleaned_sales = clean_sales_data(sales)

# Print the first 5 rows of the cleaned sales data
print("\nCleaned sales data:")
if cleaned_sales:
    for i in range(min(5, len(cleaned_sales))):
        print(cleaned_sales[i])
else:
    print("No cleaned sales data available.")

def integrate_data(sales, products, customers):
    """Integrates sales data with product and customer information."""
    product_lookup = {p['\ufeffproduct_id']: p for p in products}
    customer_lookup = {c['\ufeffcustomer_id']: c for c in customers}

    integrated_data = []
    for sale in sales:
        product = product_lookup.get(sale['product_id'])
        customer = customer_lookup.get(sale['customer_id'])

        if product and customer:
            try:
                sale_date = datetime.strptime(sale['date'], '%m/%d/%Y')
                integrated_data.append({
                    "sale_id": sale['\ufeffsale_id'],
                    "date": sale['date'],
                    "customer_id": sale['customer_id'],
                    "product_id": sale['product_id'],
                    "quantity": int(sale["quantity"]),
                    "price": float(sale['price']),
                    "product_name": product['product_name'],
                    "category": product['category'],
                    "customer_name": customer['name'],
                    "total_amount": int(sale['quantity']) * float(sale['price']),
                    "day_of_week": sale_date.strftime('%A'),
                    "month": sale_date.strftime('%B'),
                    "quarter": (sale_date.month - 1) // 3 + 1
                })
            except ValueError as e:
                # Replace f-string with regular string and str.format()
                print("Error processing data for sale ID: {}, Error: {}".format(sale.get('\ufeffsale_id'), e))
    return integrated_data

# Transform data
transformed_sales = integrate_data(cleaned_sales, products, customers)

# Print the first 5 rows of the transformed sales data
print("\nTransformed sales data:")
if transformed_sales:
    for i in range(min(5, len(transformed_sales))):
        print(transformed_sales[i])
else:
    print("No transformed sales data available.")



def write_csv(file_path, data, headers):
    """Writes a list of dictionaries to a CSV file."""
    try:
        with open(file_path, 'w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=headers)
            writer.writeheader()
            writer.writerows(data)
        print(f"Data successfully written to {file_path}")
    except Exception as e:
        print(f"Error writing to {file_path}: {e}")



# Write transformed data to CSV file
output_headers = transformed_sales[0].keys() if transformed_sales else []
write_csv('combined_sales_report.csv', transformed_sales, output_headers)


Sales data:
{'\ufeffsale_id': '1001', 'date': '9/1/2024', 'customer_id': 'C101', 'product_id': 'P201', 'quantity': '2', 'price': '29.99'}
{'\ufeffsale_id': '1002', 'date': '9/2/2024', 'customer_id': 'C102', 'product_id': 'P202', 'quantity': '1', 'price': '49.99'}
{'\ufeffsale_id': '1003', 'date': '9/2/2024', 'customer_id': 'C103', 'product_id': 'P203', 'quantity': '3', 'price': '15.99'}
{'\ufeffsale_id': '1004', 'date': '9/3/2024', 'customer_id': 'C101', 'product_id': 'P204', 'quantity': '1', 'price': '99.99'}
{'\ufeffsale_id': '1005', 'date': '9/4/2024', 'customer_id': 'C104', 'product_id': 'P201', 'quantity': '2', 'price': '29.99'}

Products data:
{'\ufeffproduct_id': 'P201', 'product_name': 'Wireless Mouse', 'category': 'Electronics', 'cost_price': '20'}
{'\ufeffproduct_id': 'P202', 'product_name': 'LED Desk Lamp', 'category': 'Home & Office', 'cost_price': '35'}
{'\ufeffproduct_id': 'P203', 'product_name': 'Notebook Set', 'category': 'Stationery', 'cost_price': '10'}
{'\ufeffprodu