In [5]:
import pandas as pd
import json
import csv
import unittest
from collections import defaultdict
import pandas.testing as pdt

In [7]:
def read_orders_from_csv(file_path):
    """
    Reads orders from a CSV file and returns a list of dictionaries.
    
    Each dictionary represents a row in the CSV with column names as keys.
    """
    with open(file_path, mode='r', encoding='utf-8') as csvfile:
        return list(csv.DictReader(csvfile, delimiter=';'))

def read_invoicing_data_from_json(file_path):
    """
    Reads invoicing data from a JSON file and returns a dictionary.
    
    The keys are the order IDs and the values are the corresponding invoice data.
    """
    with open(file_path, 'r', encoding='utf-8') as jsonfile:
        invoicing_data = json.load(jsonfile).get('data', {}).get('invoices', [])
        return {invoice['orderId']: invoice for invoice in invoicing_data}

In [None]:
Test 1: Distribution of Crate Type per Company

In [9]:
def standardize_company_name(name):
 
    #Standardize company_name for consistency and a better visualization
    name = name.lower()
    for suffix in [' co', ' co.', 'c.o', ' c.o.', ' inc', ' inc.', ' ltd', ' ltd.', ' gmbh',]:
        if name.endswith(suffix):
            return name[:-len(suffix)].strip()
    return name

def calculate_crate_distribution(orders):

    #Calculate crate distribution per company from the list of orders.
    distribution = {}
    for order in orders:
        company, crate = order.get('company_name'), order.get('crate_type')
        if company and crate:
            standardized_name = standardize_company_name(company)
            distribution.setdefault(standardized_name, {}).setdefault(crate, 0)
            distribution[standardized_name][crate] += 1
    return distribution

if __name__ == "__main__":
    orders = read_orders_from_csv('C:/Users/yilec/Downloads/orders.csv')
    df = pd.DataFrame.from_dict(calculate_crate_distribution(orders), orient='index').fillna(0).astype(int)
    df.index.name = 'Company'
    df.reset_index(inplace=True)
    display(df)

Unnamed: 0,Company,Plastic,Metal,Wood
0,fresh fruits,2,3,1
1,veggies,1,0,2
2,seafood supplier,3,3,0
3,meat packers,3,0,0
4,green veg,1,0,3
5,organic farms,1,2,0
6,tropical fruits,2,1,0
7,healthy snacks,2,0,2
8,fresh berries,1,0,0
9,veggies unlimited,1,0,0


In [11]:
class TestCrateDistribution(unittest.TestCase):

    def setUp(self):
        # Case 1: No orders
        self.empty_orders = []
        
        # Case 2: A single order for a specific company and crate type
        self.single_order = [{'company_name': 'Fresh Fruits Co', 'crate_type': 'Plastic'}]
        
        # Case 3: Multiple orders from the same company but with slight name variations and different crate types
        self.multiple_orders_same_company = [
            {'company_name': 'Fresh Fruits Co', 'crate_type': 'Plastic'},
            {'company_name': 'Fresh Fruits c.o.', 'crate_type': 'Metal'},
            {'company_name': 'FRESH FRUITS CO', 'crate_type': 'Plastic'}
        ]
        
        # Case 4: Multiple orders from different companies with various crate types
        self.multiple_orders_multiple_companies = [
            {'company_name': 'Fresh Fruits Co', 'crate_type': 'Plastic'},
            {'company_name': 'Veggies Inc', 'crate_type': 'Wood'},
            {'company_name': 'Fresh Fruits c.o.', 'crate_type': 'Metal'},
            {'company_name': 'Veggies Inc', 'crate_type': 'Wood'},
            {'company_name': 'Fresh Fruits Co', 'crate_type': 'Plastic'},
            {'company_name': 'Veggies Inc', 'crate_type': 'Plastic'}
        ]

    # Test that an empty list of orders returns an empty dictionary
    def test_empty_orders(self):
        self.assertEqual(calculate_crate_distribution(self.empty_orders), {})

    # Test that a single order is processed correctly with one crate type
    def test_single_order(self):
        self.assertEqual(calculate_crate_distribution(self.single_order), {'fresh fruits': {'Plastic': 1}})

    # Test that multiple orders with the same company (but with varied name formats) are normalized and aggregated correctly
    def test_multiple_orders_same_company(self):
        expected = {'fresh fruits': {'Plastic': 2, 'Metal': 1}}
        self.assertEqual(calculate_crate_distribution(self.multiple_orders_same_company), expected)

    # Test that multiple orders from different companies with different crate types are correctly grouped and counted
    def test_multiple_orders_multiple_companies(self):
        expected = {
            'fresh fruits': {'Plastic': 2, 'Metal': 1},
            'veggies': {'Wood': 2, 'Plastic': 1}
        }
        self.assertEqual(calculate_crate_distribution(self.multiple_orders_multiple_companies), expected)

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

....
----------------------------------------------------------------------
Ran 4 tests in 0.003s

OK


In [None]:
Test 2: DataFrame of Orders with Full Name of the Contact

In [13]:
def extract_contact_full_name(contact_data_str):
    # Returns a full name based on the contact data string.
    if not contact_data_str or not contact_data_str.strip():
        return 'John Doe'

    try:
        # Replace single quotes with double quotes and load the JSON.
        contact_data_str = contact_data_str.replace("'", '"')
        contact_data = json.loads(contact_data_str)
        # Ensure contact_data is a dictionary.
        if isinstance(contact_data, list) and contact_data:
            contact_data = contact_data[0]

        if not isinstance(contact_data, dict):
            return 'John Doe'

        # Get the contact's first name and surname.
        contact_name = contact_data.get('contact_name', '')
        contact_surname = contact_data.get('contact_surname', '')

        # Create the full name, returning 'John Doe' if empty.
        return f"{contact_name} {contact_surname}".strip() or 'John Doe'

    except json.JSONDecodeError:
        # Return the default name if there is a JSON error.
        return 'John Doe'

def create_contact_dataframe(orders):

    data = [{
        'order_id': order.get('order_id'),
        'contact_full_name': extract_contact_full_name(order.get('contact_data'))
    } for order in orders]

    return pd.DataFrame(data)

if __name__ == "__main__":

    orders = read_orders_from_csv('C:/Users/yilec/Downloads/orders.csv')
    df_contacts = create_contact_dataframe(orders)
    pd.set_option('display.max_rows', None)
    pd.set_option('display.max_columns', None)
    display(df_contacts)

Unnamed: 0,order_id,contact_full_name
0,f47ac10b-58cc-4372-a567-0e02b2c3d479,Curtis Jackson
1,f47ac10b-58cc-4372-a567-0e02b2c3d480,Maria Theresa
2,f47ac10b-58cc-4372-a567-0e02b2c3d481,Para Cetamol
3,f47ac10b-58cc-4372-a567-0e02b2c3d482,John Doe
4,f47ac10b-58cc-4372-a567-0e02b2c3d483,John Doe
5,f47ac10b-58cc-4372-a567-0e02b2c3d484,John Krasinski
6,f47ac10b-58cc-4372-a567-0e02b2c3d485,John Doe
7,f47ac10b-58cc-4372-a567-0e02b2c3d486,Jennifer Lopez
8,f47ac10b-58cc-4372-a567-0e02b2c3d487,Liav Ichenbaum
9,f47ac10b-58cc-4372-a567-0e02b2c3d488,Curtis Jackson


In [15]:
def extract_contact_full_name(contact_data_str):
    # Default name to return if no valid data is provided
    default_name = "John Doe"

    if not contact_data_str:
        return default_name

    try:
        contact_data = json.loads(contact_data_str.replace("'", '"'))
        # Check if data is a list, if so, get the first element, otherwise, use the object directly
        contact_info = contact_data[0] if isinstance(contact_data, list) else contact_data
        # Extract and concatenate name and surname, default name if empty
        return f"{contact_info.get('contact_name', '')} {contact_info.get('contact_surname', '')}".strip() or default_name
    except (json.JSONDecodeError, IndexError):

        return default_name

class TestContactFullNameExtraction(unittest.TestCase):
    def test_missing_contact_name(self):
        # Test case when contact name is missing, should return only the surname
        self.assertEqual(extract_contact_full_name('[{ "contact_surname":"Jackson"}]'), 'Jackson')

    def test_missing_contact_surname(self):
        # Test case when contact surname is missing, should return only the name
        self.assertEqual(extract_contact_full_name('[{ "contact_name":"Curtis"}]'), 'Curtis')

    def test_missing_both_names(self):
        # Test case when both name and surname are missing, should return default name
        self.assertEqual(extract_contact_full_name('[{ "city":"Chicago"}]'), 'John Doe')

if __name__ == '__main__':

    unittest.main(argv=['first-arg-is-ignored'], exit=False)

.......
----------------------------------------------------------------------
Ran 7 tests in 0.002s

OK


In [None]:
Test 3: DataFrame of Orders with Contact Address

In [17]:
def extract_contact_address(contact_data_str):
    
    city, postal_code = 'Unknown', 'UNK00'

    # Validate and process the contact data string
    if contact_data_str:
        try:
            # Convert the contact data string to JSON format
            contact_data = json.loads(contact_data_str.replace("'", '"'))
            # If contact_data is a list, retrieve the first element
            if isinstance(contact_data, list):
                contact_data = contact_data[0] if contact_data else {}

            city = contact_data.get('city', city)
            postal_code = str(contact_data.get('cp', postal_code))
        
        except json.JSONDecodeError:
 
            pass

    # Return formatted address with city and postal code
    return f"{city}, {postal_code}"

def create_contact_address_dataframe(orders):
    # Extract contact addresses and create a dataframe with order id and address
    data = [{'order_id': order.get('order_id'), 
             'contact_address': extract_contact_address(order.get('contact_data'))} 
            for order in orders]
    return pd.DataFrame(data)

if __name__ == "__main__":

    orders = read_orders_from_csv('C:/Users/yilec/Downloads/orders.csv')
    df_contact_addresses = create_contact_address_dataframe(orders)
    pd.set_option('display.max_rows', None)
    pd.set_option('display.max_columns', None)
    display(df_contact_addresses)

Unnamed: 0,order_id,contact_address
0,f47ac10b-58cc-4372-a567-0e02b2c3d479,"Chicago, 12345"
1,f47ac10b-58cc-4372-a567-0e02b2c3d480,"Calcutta, UNK00"
2,f47ac10b-58cc-4372-a567-0e02b2c3d481,"Frankfurt am Oder, 3934"
3,f47ac10b-58cc-4372-a567-0e02b2c3d482,"Unknown, UNK00"
4,f47ac10b-58cc-4372-a567-0e02b2c3d483,"Unknown, UNK00"
5,f47ac10b-58cc-4372-a567-0e02b2c3d484,"New York, 1203"
6,f47ac10b-58cc-4372-a567-0e02b2c3d485,"Unknown, UNK00"
7,f47ac10b-58cc-4372-a567-0e02b2c3d486,"Esplugues de Llobregat, UNK00"
8,f47ac10b-58cc-4372-a567-0e02b2c3d487,"Tel Aviv, UNK00"
9,f47ac10b-58cc-4372-a567-0e02b2c3d488,"Chicago, 12345"


In [19]:
class TestContactAddressExtraction(unittest.TestCase):
    
    # This function extracts the city and cp from a JSON string
    
    def extract_contact_address(self, contact_data_str):

        contact_data = json.loads(contact_data_str)[0]

        return f"{contact_data.get('city', 'Unknown')}, {contact_data.get('cp', 'UNK00')}"

    # Test case for when the city is missing in the input JSON
    def test_missing_city(self):

        self.assertEqual(self.extract_contact_address('[{ "cp": "12345"}]'), 'Unknown, 12345')

    # Test case for when the cp is missing in the input JSON
    def test_missing_cp(self):

        self.assertEqual(self.extract_contact_address('[{ "city": "Chicago"}]'), 'Chicago, UNK00')

    # Test case for when both city and cp are missing in the input JSON
    def test_missing_both_city_and_cp(self):

        self.assertEqual(self.extract_contact_address('[{}]'), 'Unknown, UNK00')


if __name__ == "__main__":

    unittest.main(argv=['first-arg-is-ignored'], exit=False)

..........
----------------------------------------------------------------------
Ran 10 tests in 0.004s

OK


In [None]:
Test 4: Calculation of Sales Team Commissions

In [21]:
def calculate_net_value(gross_value, vat):
    # Calculate the net value from gross value by removing the VAT percentage.
    return gross_value / (1 + vat / 100)

def calculate_commissions(orders, invoices):

    commissions = defaultdict(float)

    commission_rates = [0.06, 0.025, 0.0095]  

    # Iterate through each order to calculate commissions.
    for order in orders:

        order_id, salesowners_str = order.get('order_id'), order.get('salesowners')

        invoice = invoices.get(order_id)
        # Skip calculation if invoice or sales owners are missing.
        if not invoice or not salesowners_str:
            continue

        # Calculate the net value of the order by removing VAT from the gross value.
        net_value = calculate_net_value(int(invoice['grossValue']) / 100, float(invoice['vat']))

        salesowners = [owner.strip() for owner in salesowners_str.split(',') if owner.strip()]

        # Distribute the net value among sales owners based on the commission rates.
        for idx, owner in enumerate(salesowners[:len(commission_rates)]):
            commissions[owner] += net_value * commission_rates[idx]


    return pd.DataFrame(
        sorted({owner: round(amount, 2) for owner, amount in commissions.items()}.items(), 
               key=lambda x: x[1], reverse=True), 
        columns=['Sales Owner', 'Total Commission (€)']
    )

if __name__ == "__main__":

    orders = read_orders_from_csv('C:/Users/yilec/Downloads/orders.csv')

    invoices = read_invoicing_data_from_json('C:/Users/yilec/Downloads/invoicing_data.json')

    display(calculate_commissions(orders, invoices))

Unnamed: 0,Sales Owner,Total Commission (€)
0,Leonard Cohen,650.31
1,David Henderson,463.29
2,Luke Skywalker,377.61
3,David Goliat,214.96
4,Ammy Winehouse,209.52
5,Marianov Merschik,188.61
6,Yuri Gagarin,154.62
7,Chris Pratt,114.08
8,Vladimir Chukov,72.83
9,Marie Curie,70.52


In [23]:
class TestCommissionCalculations(unittest.TestCase):

    def setUp(self):
        # Sample data setup for orders and invoices
        self.orders = [
            {'order_id': 'order1', 'salesowners': 'Alice, Bob, Charlie'},
            {'order_id': 'order2', 'salesowners': 'Bob, Alice'},
            {'order_id': 'order3', 'salesowners': 'Charlie'},
            {'order_id': 'order4', 'salesowners': 'Dana'},
            {'order_id': 'order5', 'salesowners': ''}
        ]
        self.invoices = {
            'order1': {'grossValue': '119000', 'vat': '19'},
            'order2': {'grossValue': '238000', 'vat': '19'},
            'order3': {'grossValue': '100000', 'vat': '0'},
            'order4': {'grossValue': '121000', 'vat': '21'},
            'order5': {'grossValue': '100000', 'vat': '19'}
        }

    def assert_commission_df_equal(self, commissions_df, expected_data):
        # Test to assert equality of two dataframes
        expected_df = pd.DataFrame(expected_data, columns=['Sales Owner', 'Total Commission (€)'])
        commissions_df = pd.DataFrame(commissions_df, columns=['Sales Owner', 'Total Commission (€)'])
        
        # Check if both DataFrames aren't empty before sorting to avoid errors
        if not expected_df.empty and not commissions_df.empty:
            expected_df = expected_df.sort_values(by='Sales Owner').reset_index(drop=True)
            commissions_df = commissions_df.sort_values(by='Sales Owner').reset_index(drop=True)
        
        pdt.assert_frame_equal(commissions_df, expected_df)

    def test_calculate_net_value(self):
        # Test for correct net value calculation from gross value and VAT
        gross_value, vat, expected_net = 119000, 19, 1000.00
        net_value = calculate_net_value(gross_value, vat) / 100
        self.assertAlmostEqual(net_value, expected_net)

    def test_calculate_commissions(self):
        # Test for calculating commissions with multiple sales owners per order
        expected_commissions = [
            {'Sales Owner': 'Bob', 'Total Commission (€)': 145.0},
            {'Sales Owner': 'Alice', 'Total Commission (€)': 110.0},
            {'Sales Owner': 'Charlie', 'Total Commission (€)': 69.5},
            {'Sales Owner': 'Dana', 'Total Commission (€)': 60.0}
        ]
        self.assert_commission_df_equal(calculate_commissions(self.orders, self.invoices), expected_commissions)

    def test_commission_calculation_accuracy(self):
        # Test accuracy of commission calculation with specific data for an order
        orders = [{'order_id': 'order1', 'salesowners': 'Alice, Bob, Charlie'}]
        invoices = {'order1': {'grossValue': '119000', 'vat': '19'}}
        expected_commissions = [
            {'Sales Owner': 'Alice', 'Total Commission (€)': 60.0},
            {'Sales Owner': 'Bob', 'Total Commission (€)': 25.0},
            {'Sales Owner': 'Charlie', 'Total Commission (€)': 9.5}
        ]
        self.assert_commission_df_equal(calculate_commissions(orders, invoices), expected_commissions)

    def test_commission_sorting_order(self):
        # Test to verify if the commissions are sorted in descending order
        commissions_df = calculate_commissions(self.orders, self.invoices)
        
        # Extract the Total Commission (€) column and check if it is sorted in descending order
        commission_values = commissions_df['Total Commission (€)'].values
        self.assertTrue(
            all(commission_values[i] >= commission_values[i + 1] for i in range(len(commission_values) - 1)),
        )

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)

..............
----------------------------------------------------------------------
Ran 14 tests in 0.009s

OK


In [None]:
Test 5: DataFrame of Companies with Sales Owners

In [25]:
def create_company_salesowners_dataframe(orders):

    company_dict = {}
    
    # Iterate through each order in the orders list
    for order in orders:
        company_id = order.get('company_id')
        company_name = order.get('company_name')
        salesowners = order.get('salesowners', '')

        standardized_name = standardize_company_name(company_name)
        
        # Get or create an entry in the dictionary for the company name
        company = company_dict.setdefault(standardized_name, {
            'company_id': company_id,
            'company_name': company_name,
            'salesowners_set': set()  
        })

        # Update to the company with the lowest id if current id is smaller
        if company_id < company['company_id']:
            company['company_id'], company['company_name'] = company_id, company_name
            
        # If salesowners exist, update the set with the unique owners
        if salesowners:
            company['salesowners_set'].update({owner.strip() for owner in salesowners.split(',') if owner.strip()})
            
    # Convert the dictionary to a list of dictionaries, sorted salesowners joined into a single string
    data = [
        {
            'company_id': company['company_id'],
            'company_name': company['company_name'],
            'list_salesowners': ', '.join(sorted(company['salesowners_set']))  
        }
        for company in company_dict.values()
    ]

    return pd.DataFrame(data) 

if __name__ == "__main__":

    orders = read_orders_from_csv('C:/Users/yilec/Downloads/orders.csv')

    df_3 = create_company_salesowners_dataframe(orders)
    pd.options.display.max_colwidth = None
    display(df_3)

Unnamed: 0,company_id,company_name,list_salesowners
0,1e2b47e6-499e-41c6-91d3-09d12dddfbbd,Fresh Fruits Co,"Ammy Winehouse, David Henderson, Leon Leonov, Leonard Cohen, Luke Skywalker"
1,0f05a8f1-2bdf-4be7-8c82-4c9b58f04898,Veggies Inc,"David Goliat, Leon Leonov, Leonard Cohen, Luke Skywalker"
2,1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6,Seafood Supplier,"Ammy Winehouse, David Goliat, Leonard Cohen, Markus Söder"
3,34538e39-cd2e-4641-8d24-3c94146e6f16,Meat Packers Ltd,"Chris Pratt, David Henderson, Leon Leonov, Marianov Merschik"
4,fa14c3ed-3c48-49f4-bd69-4d7f5b5f4b1b,Green Veg Co,"Ammy Winehouse, Chris Pratt, David Henderson, Leonard Cohen, Luke Skywalker"
5,acdb6f30-764f-404e-8b8e-7e7e3e6fa1a9,Organic Farms,"Chris Pratt, David Henderson, Leon Leonov, Leonard Cohen"
6,5f0bdbdf-1d84-4c23-957c-8bb8c0ddc89d,Tropical Fruits Ltd,"David Goliat, David Henderson, Yuri Gagarin"
7,20dfef10-8f4e-45a1-82fc-123f4ab2a4a5,healthy snacks c.o.,"Ammy Winehouse, Chris Pratt, Luke Skywalker, Marianov Merschik, Marie Curie, Vladimir Chukov"
8,4a98d9ec-65f6-438f-9a0c-0d9e1a6f7c65,Fresh Berries Inc,"Ammy Winehouse, Leonard Cohen"
9,f3b70f8d-bb74-4d96-b1f2-88e5c0c68b14,Meat Express Ltd,"Chris Pratt, Marie Curie, Yuri Gagarin"


In [27]:
class TestCompanySalesownersDataframe(unittest.TestCase):

    def setUp(self):
        
        # Define a list of order dictionaries, each containing order_id, company_id, company_name, and salesowners
        self.orders = [
            {'order_id': 'order1', 'company_id': 'id1', 'company_name': 'Acme Co', 'salesowners': 'Alice, Bob'},
            {'order_id': 'order2', 'company_id': 'id2', 'company_name': 'Acme Co.', 'salesowners': 'Charlie'},
            {'order_id': 'order3', 'company_id': 'id3', 'company_name': 'Beta Inc', 'salesowners': 'Dana'},
            {'order_id': 'order4', 'company_id': 'id4', 'company_name': 'Gamma Ltd', 'salesowners': 'Eve, Frank'},
            {'order_id': 'order5', 'company_id': 'id5', 'company_name': 'Acme c.o.', 'salesowners': 'George, Alice'},
            {'order_id': 'order6', 'company_id': 'id6', 'company_name': 'Gamma ltd', 'salesowners': 'Heidi'}
        ]
        self.df = create_company_salesowners_dataframe(self.orders)

    def test_dataframe_creation_and_content(self):
        # Test the length of the dataframe and the unique company names
        self.assertEqual(len(self.df), 3)  
         # Define expected unique company names for validation
        expected_company_names = {'Acme Co', 'Beta Inc', 'Gamma Ltd'}
    
        self.assertEqual(set(self.df['company_name']), expected_company_names)

        acme_row = self.df[self.df['company_name'] == 'Acme Co'].iloc[0]
        self.assertEqual(acme_row['company_id'], 'id1')
        self.assertEqual(acme_row['list_salesowners'], 'Alice, Bob, Charlie, George')

        gamma_row = self.df[self.df['company_name'] == 'Gamma Ltd'].iloc[0]
        self.assertEqual(gamma_row['list_salesowners'], 'Eve, Frank, Heidi')

    def test_no_duplicate_company_names(self):
        # Check that there are no duplicate company names 
        company_names = self.df['company_name'].tolist()

        self.assertEqual(len(company_names), len(set(company_names)))

if __name__ == '__main__':

    unittest.main(argv=['first-arg-is-ignored'], exit=False)

................
----------------------------------------------------------------------
Ran 16 tests in 0.009s

OK
