<h1>Step 1: Data Extraction </h1>

<h3 style = "color: blue;"> Read data from all four input CSV files</br>
    Handle potential errors such as missing files</h3>

In [None]:
import csv 
import os

files = {"Sales_data":r"./Data files/sales.csv",
        "Products_data":r"./Data files/Product.csv",
        "Customers_data":r"./Data files/Customer.csv",
        "Employees_data":r"./Data files/employee.csv"}

    
def read_csv(file_path):
    try:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"The file {file_path} is not present.")
    
        with open(file_path, "r") as file:
            csvReader = csv.reader(file)
            csv_data = list(csvReader)  
        return csv_data

    except FileNotFoundError as fnf_error:
        print(f"Error: {fnf_error}")
    except csv.Error as csv_error:
        print(f"Error reading CSV file: {csv_error}")

    
def extract_all_data(files): 
    all_data = {}
    for filename,path in files.items():
        csvRead = read_csv(path)
        if csvRead != None:
            colheader = csvRead[0]
            data = []
        
            for i in csvRead[1:]:
                record = {}
                for j in range(len(colheader)):
                    record[colheader[j]] = i[j]
                data.append(record)
    
        all_data[filename] = data
    
    return all_data

all_data = extract_all_data(files)

print(all_data)

<h1>Step 2: Data Transformation </h1>

<h3 style = "color: blue;">1.Clean the data by correcting data types</h3>

In [None]:
def correct_data_types(all_data):
    for data , content in all_data.items():
        for record in content:
            for key, value in record.items():    
                if value.isdigit():
                   record[key] = int(value)
                else:
                    try:
                        record[key] = float(value)
                    except ValueError:
                        pass

    
    return all_data

corrected_data_type_data = correct_data_types(all_data)

print(corrected_data_type_data)

<h3 style = "color: blue;">2.Clean the data by addressing missing values</h3>

In [None]:
def missing_values(corrected_data_type_data):
    for data , content in corrected_data_type_data.items():
        for record in content:
            for key, value in record.items():
                if value is None or value == '':
                    if key == 'quantity' or key == 'sale_id':
                        record[key] = 0
                    elif key == 'price' or key == 'cost_price' or key == 'salary':
                        record[key] = 0.0
                    else:
                        record[key] = None
                        
    return corrected_data_type_data

corrected_data = missing_values(corrected_data_type_data)

print(corrected_data)

<h3 style = "color: blue;">3.Integrate the Sales data with other data</h3>

In [None]:
product_dict = {product['product_id']: product for product in corrected_data['Products_data']}
customer_dict = {customer['customer_id']: customer for customer in corrected_data['Customers_data']}

integrated_data = []

for sale in corrected_data['Sales_data']:
    product_id = sale['product_id']
    product_info = product_dict.get(product_id, {})
    customer_id = sale['customer_id']
    customer_info = customer_dict.get(customer_id, {})
    
    
    ###Merge sale data with product info and customer_info
    
    integrated_record = {**sale, **product_info, **customer_info}
    integrated_data.append(integrated_record)
    
    
###Print the merged data

for record in integrated_data:
    print(record)


<h3 style = "color: blue;">4.Adding additional date dimensions in the Integrated data</h3>

In [None]:
import math

for data in integrated_data:
    ###convert the date column into month, day and year
    
    month, day, year = list(map(int, data['date'].split('/')))
    
    data['month'] = month
    data['day'] = day
    data['year'] = year
    data['quarter'] = math.ceil(month/3)

<h1>Step 3.1: Data Analysis</h1>

<h3 style = "color: blue;">1.Calculate total sales and profit by product category: </h3>

In [None]:
category_sales = {}

for data in integrated_data:
    if data['category'] not in category_sales.keys():
        category_sales[data['category']] = {'sales': int(data['quantity'])*float(data['price']),
                                            'total_cost_price': int(data['quantity'])*float(data['cost_price'])}
    else:
        category_sales[data['category']]['sales'] += int(data['quantity'])*float(data['price'])
        category_sales[data['category']]['total_cost_price'] += int(data['quantity'])*float(data['cost_price'])
        
for key, value in category_sales.items():
    print(f"Total sales in {key} category = {value['sales']:.2f} and profit percentage = {((value['sales']-value['total_cost_price'])/value['total_cost_price'])*100:.2f}")

<h3 style = "color: blue;">2.Identify Product wise Sales</h3>

In [None]:
prd_sales = {}

for data in integrated_data:
    if data['product_id'] not in prd_sales.keys():
        prd_sales[data['product_id']] = {'sales': data['quantity']*data['price'],
                                         'product_name': data['product_name']}
    else:
        prd_sales[data['product_id']]['sales'] += data['quantity']*data['price']

for key,value in prd_sales.items():
    print(f"The product {value['product_name']} has sales of {value['sales']}")  

<h3 style = "color: blue;">3.Top selling and lowest selling product based on sales</h3>

In [None]:
prd_sales_sorted = dict(sorted(prd_sales.items(),key= lambda x: x[1]['sales'], reverse = True))

print(f"The top selling product is: {prd_sales_sorted[list(prd_sales_sorted.keys())[0]]['product_name']} and lowest selling product is: {prd_sales_sorted[list(prd_sales_sorted.keys())[-1]]['product_name']}")

<h3 style = "color: blue;">4.Identify Customer wise Sales</h3>

In [None]:
customer_sales = {}

for data in integrated_data:
    if data['customer_id'] not in customer_sales.keys():
        customer_sales[data['customer_id']] = {'sales': data['quantity']*data['price'],
                                         'customer_name': data['name']}
    else:
        customer_sales[data['customer_id']]['sales'] += data['quantity']*data['price']

for key,value in customer_sales.items():
    print(f"The Customer {value['customer_name']} has sales of {value['sales']}")  

<h3 style = "color: blue;">5.Top buying and lowest buying customer based on sales</h3>

In [None]:
customer_sales_sorted = dict(sorted(customer_sales.items(),key= lambda x: x[1]['sales'], reverse = True))

print(f"The most important customer is: {customer_sales_sorted[list(customer_sales_sorted.keys())[0]]['customer_name']} and the least important customer is: {customer_sales_sorted[list(customer_sales_sorted.keys())[-1]]['customer_name']}")

<h3 style="color: blue;">6.Daily sales analysis</h3>

In [None]:
date_wise_sales = {}

for data in integrated_data:
    if data['date'] not in date_wise_sales.keys():
        date_wise_sales[data['date']] = data['quantity']*data['price']                                 
    else:
        date_wise_sales[data['date']] += data['quantity']*data['price']

for key,value in date_wise_sales.items():
    print(f"{value:.2f} amount of sales happened on {key}")

<h3 style="color: blue;">7.Top selling and lowest selling dates based on sales</h3>

In [None]:
date_wise_sales_sorted = dict(sorted(date_wise_sales.items(),key= lambda x: x[1], reverse = True))

###top selling dates
highest_sales_date_wise = date_wise_sales_sorted[list(date_wise_sales_sorted.keys())[0]]
print(f'The dates having highest sales of amount {highest_sales_date_wise} are -')
for date, sales in date_wise_sales_sorted.items():
    if sales == highest_sales_date_wise:
        print(date)
        
###lowest selling dates
lowest_sales_date_wise = date_wise_sales_sorted[list(date_wise_sales_sorted.keys())[-1]]
print(f'The dates having lowest sales of amount {lowest_sales_date_wise} are -')
for date, sales in date_wise_sales_sorted.items():
    if sales == lowest_sales_date_wise:
        print(date)

<h3 style="color:blue;">8.Monthly sales analysis</h3>

In [None]:
month_wise_sales = {}

for data in integrated_data:
    if data['month'] not in month_wise_sales.keys():
        month_wise_sales[data['month']] = data['quantity']*data['price']                                 
    else:
        month_wise_sales[data['month']] += data['quantity']*data['price']
        
for key,value in month_wise_sales.items():
    print(f"{value:.2f} amount of sales happened on month {key}")

<h3 style="color:blue;">9.Top selling and lowest selling months based on sales</h3>

In [102]:
month_wise_sales_sorted = dict(sorted(month_wise_sales.items(),key= lambda x: x[1], reverse = True))

###Highest selling month
highest_sales_month_wise = list(month_wise_sales_sorted.keys())[0]
print(f'Highest selling month - {highest_sales_month_wise} with amount = {month_wise_sales_sorted[highest_sales_month_wise]}')

###Lowest selling month
lowest_sales_month_wise = list(month_wise_sales_sorted.keys())[-1]
print(f'Lowest selling month - {lowest_sales_month_wise} with amount = {month_wise_sales_sorted[lowest_sales_month_wise]}')

Highest selling month - 10 with amount = 4677.059999999999
Lowest selling month - 1 with amount = 589.92


<h3 style="color:blue;">10.Quarterly sales analysis</h3>

In [None]:
quarter_wise_sales = {}

for data in integrated_data:
    if data['quarter'] not in quarter_wise_sales.keys():
        quarter_wise_sales[data['quarter']] = data['quantity']*data['price']                                 
    else:
        quarter_wise_sales[data['quarter']] += data['quantity']*data['price']
        
for key,value in quarter_wise_sales.items():
    print(f"{value:.2f} amount of sales happened in Quarter {key}")

<h3 style="color:blue;">11.Top selling and lowest selling quarter based on sales</h3>

In [110]:
quarter_wise_sales_sorted = dict(sorted(quarter_wise_sales.items(),key= lambda x: x[1], reverse = True))

###Highest selling quarter
highest_sales_quarter_wise = list(quarter_wise_sales_sorted.keys())[0]
print(f'Highest selling quarter - {highest_sales_quarter_wise} with amount = {quarter_wise_sales_sorted[highest_sales_quarter_wise]:.2f}')

###Lowest selling quarter
lowest_sales_quarter_wise = list(quarter_wise_sales_sorted.keys())[-1]
print(f'Lowest selling quarter - {lowest_sales_quarter_wise} with amount = {quarter_wise_sales_sorted[lowest_sales_quarter_wise]:.2f}')

Highest selling quarter - 4 with amount = 13511.25
Lowest selling quarter - 1 with amount = 589.92


<h3 style="color:blue;">12.Total sales amount</h3>

In [None]:
total_amt = 0

for data in integrated_data:
    total_amt += data['quantity']*data['price']

print(f"The total sales amount is: {total_amt}")

<h3 style="color:blue;">13.Total quantity sold</h3>

In [None]:
total_qty = 0

for data in integrated_data:
    total_qty += data['quantity']

print(f"Total quantity sold: {total_qty}")

<h3 style="color:blue;">14.Product wise quantity</h3>

In [None]:
prd_qty = {}

for data in integrated_data:
    if data['product_id'] not in prd_qty.keys():
        prd_qty[data['product_id']] = data['quantity']
    else:
        prd_qty[data['product_id']] += data['quantity']

for key,value in prd_qty.items():
    print(f"The product {key} has quantity of {value}") 

<h3 style="color:blue;">15.Product wise quantity</h3>

In [None]:
cust_qty = {}

for data in integrated_data:
    if data['customer_id'] not in cust_qty.keys():
        cust_qty[data['customer_id']] = int(data['quantity'])
    else:
        cust_qty[data['customer_id']] += int(data['quantity'])

for key,value in cust_qty.items():
    print(f"The quantity purchased by customer {key} is {value}")   

<h3 style="color:blue;">16.Product wise Cost of Good Sold(COGS)</h3>

In [None]:
prd_cogs_sales = {}

for data in integrated_data:
    if data['product_id'] not in prd_cogs_sales.keys():
        prd_cogs_sales[data['product_id']] = {'sales': data['quantity']*data['cost_price'],
                                         'product_name': data['product_name']}
    else:
        prd_cogs_sales[data['product_id']]['sales'] += data['quantity']*data['cost_price']

for key,value in prd_cogs_sales.items():
    print(f"The product {value['product_name']} has COGS of {value['sales']}")  
    

<h3 style="color:blue;">17.Product wise Gross-Profit</h3>

In [None]:
#Formula: (Sales of a product - COGS of a product)

for key in prd_sales.keys():
    product_name = prd_sales[key]['product_name']
    gross_profit = prd_sales[key]['sales'] - prd_cogs_sales[key]['sales']
    print(f'{product_name} has Gross profit amount = {gross_profit}')

<h3 style="color:blue;">18.Product wise cost price</h3>

In [None]:
prd_costs = {}

for data in integrated_data:
    if data['product_id'] not in prd_costs.keys():
        prd_costs[data['product_id']] = {'cost_price': data['quantity']*data['cost_price'],
                                         'product_name': data['product_name']}
    else:
        prd_costs[data['product_id']]['cost_price'] += data['quantity']*data['cost_price']

for key,value in prd_costs.items():
    print(f"The product {value['product_name']} has cost of {value['cost_price']}")  

<h3 style="color:blue;">19.Most and least costliest product based on cost price</h3>

In [None]:
prd_costs_sorted = dict(sorted(prd_costs.items(),key= lambda x: x[1]['cost_price'], reverse = True))


print(f"The costly product is: {prd_costs_sorted[list(prd_costs_sorted.keys())[0]]['product_name']} and least costly product is: {prd_costs_sorted[list(prd_costs_sorted.keys())[-1]]['product_name']}")

<h1>Step 3.2: Data analysis on Employee data(As employee data is not able to be integrated)</h1>

<h3 style="color:blue;">1.Total number of employees</h3>

In [None]:
print(f"Total number of employees: {len(corrected_data['Employees_data'])}")

<h3 style="color:blue;">2.Number of employees per department</h3>

In [None]:
emp_per_dept = {}

for data in corrected_data['Employees_data']:
    if data['department'] not in emp_per_dept.keys():
        emp_per_dept[data['department']] = 1
    else:
        emp_per_dept[data['department']] += 1
        
for key, value in emp_per_dept.items():
    print(f"{value} employees work under {key} department.")

<h3 style="color:blue;">3.Total salary per department</h3>

In [None]:
sal_per_dept = {}

for data in corrected_data['Employees_data']:
    if data['department'] not in sal_per_dept.keys():
        sal_per_dept[data['department']] = data['salary']
    else:
        sal_per_dept[data['department']] += data['salary']
        
for key, value in sal_per_dept.items():
    print(f"Total salary {value} spends for {key} department.")

<h3 style="color:blue;">4.Average salary per department</h3>

In [None]:
avg_sal_per_dept = {}

for data in corrected_data['Employees_data']:
    if data['department'] not in avg_sal_per_dept.keys():
        avg_sal_per_dept[data['department']] = {'salary': data['salary'], 'employees_count': 1}
    else:
        avg_sal_per_dept[data['department']]['salary'] += data['salary']
        avg_sal_per_dept[data['department']]['employees_count'] += 1

for key,value in avg_sal_per_dept.items():
    print(f"{key} department give average salary of {value['salary']/value['employees_count']:.2f}")

<h3 style="color:blue;">5.Departments paying highest and lowest average salary</h3>

In [None]:
avg_sal_per_dept_sorted = dict(sorted(avg_sal_per_dept.items(), key = lambda x: x[1]['salary']/x[1]['employees_count'], reverse= True))

print(f"{list(avg_sal_per_dept_sorted.keys())[0]} department paying highest average salary and {list(avg_sal_per_dept_sorted.keys())[-1]} department paying lowest average salary")

<h1> Step 4: Data Loading</h1>

<h3 style= "color: blue;">1.A comprehensive sales report combining information from all sources</h3>

In [None]:
output_file_path = "comprehensive_report_file.csv"

fieldnames = list(integrated_data[0].keys())

with open(output_file_path, mode='w',newline='') as file:
    cvsWriter = csv.writer(file)
    cvsWriter.writerow(fieldnames)

for data in integrated_data:
    values= [value for key,value in data.items()]
    with open(output_file_path, mode='a',newline='') as file:
        cvsWriter = csv.writer(file)
        cvsWriter.writerow(values)

print(f'Comprehensive sales report generated: {output_file_path}')

<h3 style= "color: blue;">2.A summary report with aggregated sales data</h3>

In [108]:
summary_output_file_path = "summary_sales_report.csv"

with open(summary_output_file_path,mode='w',newline = '') as file:
    csvWriter = csv.writer(file)
    csvWriter.writerow(["Total Sales Amount is:",total_amt])
    csvWriter.writerow(["Total Quantity Sold is:",total_qty])
    csvWriter.writerow(["Top selling product is:",prd_sales_sorted[list(prd_sales_sorted.keys())[0]]['product_name']])
    csvWriter.writerow(["Least selling product is:",prd_sales_sorted[list(prd_sales_sorted.keys())[-1]]['product_name']])
    csvWriter.writerow(["The most important customer is:",customer_sales_sorted[list(customer_sales_sorted.keys())[0]]['customer_name']])
    csvWriter.writerow(["The least important customer is:",customer_sales_sorted[list(customer_sales_sorted.keys())[-1]]['customer_name']])
    csvWriter.writerow(["Highest amount of sales on a single day as of now is:",highest_sales_date_wise])
    csvWriter.writerow(["Lowest amount of sales on a single day as of now is:",lowest_sales_date_wise])
    csvWriter.writerow(["Highest amount of sales on a month as of now is:",month_wise_sales_sorted[highest_sales_month_wise]])
    csvWriter.writerow(["Lowest amount of sales on a month as of now is:",month_wise_sales_sorted[lowest_sales_month_wise]])
    csvWriter.writerow(["Highest amount of sales on a quarter as of now is:",quarter_wise_sales_sorted[highest_sales_quarter_wise]])
    csvWriter.writerow(["Lowest amount of sales on a quarter as of now is:",quarter_wise_sales_sorted[lowest_sales_quarter_wise]])