In [1]:
# Helper function to read the CSV file and store all data regardless of category
def read_csv(file):
    try:
        with open(file, 'r') as f:
            lines = f.readlines()  # Read all lines from the file
    except FileNotFoundError:
        return []  # If the file is not found, return an empty list

    headers = lines[0].strip().split(',')  # Split the first line to get the header names and remove the whitespaces
    data = []  # Initialize an empty list to hold all the product data
    for line in lines[1:]:  # Loop through each line after the header
        fields = line.strip().split(',')  # Split each line by commas
        data.append({
            'product_id': fields[headers.index('product_id')].lower(),  # Convert product ID to lowercase
            'product_name': fields[headers.index('product_name')].lower(),  # Convert product name to lowercase
            'category': fields[headers.index('category')].lower(),  # Convert category to lowercase
            'discounted_price': float(fields[headers.index('discounted_price $')]),
            'actual_price': float(fields[headers.index('actual_price $')]),
            'discounted_percentage': float(fields[headers.index('discount_percentage %')]),
            'rating': float(fields[headers.index('rating')]),
            'rating_count': int(fields[headers.index('rating_count')])
        })
    return data


# Function to filter the data by category (case-insensitive)
def filter_by_category(data, category):
    # Convert the category to lowercase to ensure case-insensitive comparison
    category_lower = category.lower()
    
    # Create a new list for the selected data filtered by the specified category
    selected_data = [product for product in data if product['category'].lower() == category_lower]
    return selected_data  # Return the filtered data by category


In [2]:
# Helper function to read the sales data from the TXT file
def read_txt(file):
    sales_data = {}  # Dictionary to hold sales data
    try:
        with open(file, 'r') as f:
            for line in f:
                year_part, *sales = line.strip().split(', ')  # Split each line by comma and spaces
                year = int(year_part.split(": ")[1])  # Extract the year value
                for product_sales in sales:
                    product_id, units_sold = product_sales.split(': ')  # Split product_id and units sold
                    product_id = product_id.strip().lower()  # Convert product ID to lowercase
                    units_sold = int(units_sold)  # Convert units sold to an integer
                    if product_id not in sales_data:
                        sales_data[product_id] = {}  # Initialize a new dictionary for this product ID
                    sales_data[product_id][year] = units_sold  # Add the sales data for this year
        return sales_data  # Return the dictionary containing sales data
    except FileNotFoundError:
        return {}  # Return an empty dictionary if the file is not found


In [3]:
# Set the selected category
category = 'computers&accessories' 

In [4]:
# Set the csv file and txt file
CSVfile = 'Amazon_products.csv'  # Adjust the path to the CSV file
TXTfile = 'Amazon_sales.txt'  # Adjust the path to the TXT file

In [5]:
# Task 1: Find the product with the highest and lowest discounted prices in the selected data (filtered by category)
def task_1(selected_data):
    if not selected_data:  # If no data is found, return None
        return [None, None]
    
    # Find the product with the maximum discounted price
    max_discounted_product = max(selected_data, key=lambda x: x['discounted_price'])
    # Find the product with the minimum discounted price
    min_discounted_product = min(selected_data, key=lambda x: x['discounted_price'])
    
    # Return the product IDs of the highest and lowest discounted products
    return [max_discounted_product['product_id'], min_discounted_product['product_id']]

In [6]:
# Helper function for Task 2: Calculate the mean, median, and mean absolute deviation (MAD) of the actual prices in the selected data
def task_2(selected_data):
    # Filter products with a rating count greater than 1000
    filtered = [x['actual_price'] for x in selected_data if x['rating_count'] > 1000]
    
    if not filtered:  # If no data is found, return zeros for mean, median, and MAD
        return [0, 0, 0]

    # Calculate the mean
    mean = sum(filtered) / len(filtered)
    
    # Calculate the median
    sorted_prices = sorted(filtered)
    median = sorted_prices[len(filtered) // 2]
    
    # Calculate the mean absolute deviation (MAD)
    mad = sum([abs(x - mean) for x in filtered]) / len(filtered)

    # Return the mean, median, and MAD rounded to 4 decimal places
    return [round(mean, 4), round(median, 4), round(mad, 4)]


In [7]:
# Helper function for Task 3: Calculate Standard Deviation of Discounted Percentages for each category
def task_3(data):
    # Create a dictionary to store discounted percentages by category
    category_data = {}

    # Group discounted percentages by category where the rating is between 3.3 and 4.3
    for product in data:
        if 3.3 <= product['rating'] <= 4.3:
            category = product['category']
            if category not in category_data:  # If this category is not already in the category_data dictionary
                category_data[category] = []   # Create an empty list for this category in the dictionary
            category_data[category].append(product['discounted_percentage'])

    # Create a list to store the standard deviations for each category
    stddev_list = []

    # Calculate the standard deviation for each category
    for category, percentages in category_data.items():
        if len(percentages) > 1:  # Ensure there are enough records to calculate stddev
            mean = sum(percentages) / len(percentages)
            stddev = (sum([(x - mean) ** 2 for x in percentages]) / (len(percentages) - 1)) ** 0.5
            stddev_list.append(round(stddev, 4))  # Round to 4 decimal places

    # Sort the list of standard deviations in descending order
    return sorted(stddev_list, reverse=True)


In [8]:
# Helper function for Task 4: Calculate the correlation coefficient (case-insensitive product IDs)
def task_4(product_ids, sales_data):
    if not product_ids or not sales_data:
        return 0  # Return 0 if there are no product IDs or sales data

    # Convert product IDs to lowercase to handle case-insensitive comparison
    product_id_max = product_ids[0].lower()
    product_id_min = product_ids[1].lower()

    # Extract the sales data for the product with the highest discounted price
    max_sales_data = sales_data.get(product_id_max, {})
    # Extract the sales data for the product with the lowest discounted price
    min_sales_data = sales_data.get(product_id_min, {})

    # Get all the years mentioned in the sales.txt file (union of all available years in sales_data)
    all_years = set()
    for product, yearly_data in sales_data.items():
        all_years.update(yearly_data.keys())

    # Prepare lists for sales data, using 0 for missing sales in certain years
    max_sales = []
    min_sales = []

    # Populate the sales lists, using 0 for missing years
    for year in sorted(all_years):  # Sort to ensure chronological order
        max_sales.append(max_sales_data.get(year, 0))  # Use 0 if no sales for max product
        min_sales.append(min_sales_data.get(year, 0))  # Use 0 if no sales for min product

    # If all sales are 0 for both products, return 0
    if all(s == 0 for s in max_sales) and all(s == 0 for s in min_sales):
        return 0

    # Calculate the mean sales for both products
    mean_max = sum(max_sales) / len(max_sales)
    mean_min = sum(min_sales) / len(min_sales)

    # Calculate the numerator (Σ(x_i - mean_max)(y_i - mean_min))
    numerator = sum([(max_sales[i] - mean_max) * (min_sales[i] - mean_min) for i in range(len(max_sales))])

    # Calculate the denominator sqrt(Σ(x_i - mean_max)^2 * Σ(y_i - mean_min)^2)
    denominator_max = sum([(x - mean_max) ** 2 for x in max_sales])
    denominator_min = sum([(y - mean_min) ** 2 for y in min_sales])

    denominator = (denominator_max * denominator_min) ** 0.5

    # Check if the denominator is zero to avoid division by zero
    if denominator == 0:
        return 0  # If no variance, return correlation of 0

    # Calculate the correlation coefficient
    correlation = numerator / denominator

    # Round to 4 decimal places and return the result
    return round(correlation, 4)


In [9]:
# Define the main function
def main(CSVfile, TXTfile, category):
    # Step 1: Read all data from the CSV file
    data = read_csv(CSVfile)

    # Step 2: Filter data by the given category for Task 1 and Task 2
    selected_data = filter_by_category(data, category)

    # Task 1: Highest and lowest discounted prices
    OP1 = task_1(selected_data)

    # Task 2: Calculate mean, median, and MAD
    OP2 = task_2(selected_data)

    # Task 3: Calculate standard deviation for all categories
    OP3 = task_3(data)

    # Task 4: Read sales data and calculate correlation coefficient
    sales_data = read_txt(TXTfile)
    OP4 = task_4(OP1, sales_data)

    # Return the outputs from all tasks
    return OP1, OP2, OP3, OP4


In [10]:
# Call the main function to get the outputs
OP1, OP2, OP3, OP4 = main(CSVfile, TXTfile, category)

In [11]:
OP1

['b07vtfn6hm', 'b08y5kxr6z']

In [12]:
OP2

[2018.8, 800.0, 2132.48]

In [13]:
OP3

[0.297, 0.2654, 0.2311, 0.198, 0.1701, 0.1596, 0.0071]

In [14]:
OP4

-0.0232