# Load Orders.csv

In [64]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [65]:
pip install requests


Note: you may need to restart the kernel to use updated packages.


In [66]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [67]:
pip install disutils

Note: you may need to restart the kernel to use updated packages.


In [68]:
pip install python3-distutils

Note: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement python3-distutils (from versions: none)
ERROR: No matching distribution found for python3-distutils


In [69]:
import os
from pyspark.sql import SparkSession
import requests

# Crear la sesión de Spark
spark = SparkSession.builder.appName("Spark").getOrCreate()

# URL del archivo CSV
url = "https://raw.githubusercontent.com/Digital-IFCO/data-engineering-test/refs/heads/main/resources/orders.csv"

# Descargar el archivo CSV a una ubicación local
csv_path = os.path.join(os.getcwd(), "orders.csv")
response = requests.get(url)
if response.status_code == 200:
    with open(csv_path, 'wb') as f:
        f.write(response.content)
    print(f"Archivo descargado en: {csv_path}")
else:
    print(f"Error al descargar el archivo. Código de estado: {response.status_code}")
    exit()

# Leer el archivo CSV directamente como un DataFrame de Spark, con el delimitador correcto
orders_df = spark.read.csv(csv_path, sep=';', header=True, inferSchema=True)

# Mostrar las primeras filas del DataFrame de Spark
orders_df.show(10, truncate=False)

Archivo descargado en: c:\Users\mouba\Desktop\orders.csv
+------------------------------------+--------+------------------------------------+---------------------+----------+---------------------------------------------------------------------------------------------------------------+------------------------------------------------------------+
|order_id                            |date    |company_id                          |company_name         |crate_type|contact_data                                                                                                   |salesowners                                                 |
+------------------------------------+--------+------------------------------------+---------------------+----------+---------------------------------------------------------------------------------------------------------------+------------------------------------------------------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d479|29.01.22|1e2b47e6-499e-41c6-91d3

In [70]:
# Since contact_data is a nested column, let's explore it
orders_df.select("contact_data").show(10, truncate=False)

+---------------------------------------------------------------------------------------------------------------+
|contact_data                                                                                                   |
+---------------------------------------------------------------------------------------------------------------+
|"[{ ""contact_name"":""Curtis"", ""contact_surname"":""Jackson"", ""city"":""Chicago"", ""cp"": ""12345""}]"   |
|"[{ ""contact_name"":""Maria"", ""contact_surname"":""Theresa"", ""city"":""Calcutta""}]"                      |
|"[{ ""contact_name"":""Para"", ""contact_surname"":""Cetamol"", ""city"":""Frankfurt am Oder"", ""cp"": 3934}]"|
|NULL                                                                                                           |
|NULL                                                                                                           |
|"[{ ""contact_name"":""John"", ""contact_surname"":""Krasinski"", ""city"":""New York""

In [71]:
from pyspark.sql.functions import regexp_replace, col, ArrayType, StructType, from_json
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

# Step 1: Replace escaped double quotes
orders_df = orders_df.withColumn("contact_data", 
                                 regexp_replace(col("contact_data"), r'""', '"'))

# Step 2: Replace the quotes at the beginning and end of the JSON string that might be incorrect
orders_df = orders_df.withColumn("contact_data", 
                                 regexp_replace(col("contact_data"), r'^"|"$', '')) 

# Step 3: Check if the changes to contact_data are correct
orders_df.select("contact_data").show(10, truncate=False)  # Display the first 10 rows of the 'contact_data' column to verify

# Step 4: Define the schema for contact_data
contact_data_schema = ArrayType(
    StructType([  # Define an array of structures for 'contact_data'
        StructField("contact_name", StringType(), True),  # 'contact_name' field of type String
        StructField("contact_surname", StringType(), True),  # 'contact_surname' field of type String
        StructField("city", StringType(), True),  # 'city' field of type String
        StructField("cp", StringType(), True)  # 'cp' field of type String
    ])
)


+-----------------------------------------------------------------------------------------------+
|contact_data                                                                                   |
+-----------------------------------------------------------------------------------------------+
|[{ "contact_name":"Curtis", "contact_surname":"Jackson", "city":"Chicago", "cp": "12345"}]     |
|[{ "contact_name":"Maria", "contact_surname":"Theresa", "city":"Calcutta"}]                    |
|[{ "contact_name":"Para", "contact_surname":"Cetamol", "city":"Frankfurt am Oder", "cp": 3934}]|
|NULL                                                                                           |
|NULL                                                                                           |
|[{ "contact_name":"John", "contact_surname":"Krasinski", "city":"New York", "cp": "1203"}]     |
|NULL                                                                                           |
|[{ "contact_name":"

In [72]:
# Step 5: Parse the 'contact_data' column using the defined schema
orders_df = orders_df.withColumn("parsed_contact_data", from_json(col("contact_data"), contact_data_schema))  
# This step parses the 'contact_data' column into a structured format according to the defined schema

# Step 6: Verify the results of the parsing
orders_df.select("contact_data", "parsed_contact_data").show(10, truncate=False)  

# Display the original 'contact_data' and the parsed 'parsed_contact_data' columns for the first 10 rows to verify the parsing

+-----------------------------------------------------------------------------------------------+-------------------------------------------------+
|contact_data                                                                                   |parsed_contact_data                              |
+-----------------------------------------------------------------------------------------------+-------------------------------------------------+
|[{ "contact_name":"Curtis", "contact_surname":"Jackson", "city":"Chicago", "cp": "12345"}]     |[{Curtis, Jackson, Chicago, 12345}]              |
|[{ "contact_name":"Maria", "contact_surname":"Theresa", "city":"Calcutta"}]                    |[{Maria, Theresa, Calcutta, NULL}]               |
|[{ "contact_name":"Para", "contact_surname":"Cetamol", "city":"Frankfurt am Oder", "cp": 3934}]|[{Para, Cetamol, Frankfurt am Oder, 3934}]       |
|NULL                                                                                           |NULL           

The explode function is avoided as it doesn't handle empty rows in contact_data properly. Filtering rows before applying explode was considered, but using when with explode is not feasible.

In [73]:
from pyspark.sql import functions as F

# Step 7: Extract the first value from the arrays (in case there's only one value)
orders_df = orders_df.select(
    "order_id",  # Select the order_id column
    "date",  # Select the date column
    "company_id",  # Select the company_id column
    "company_name",  # Select the company_name column
    "crate_type",  # Select the crate_type column
    F.col("parsed_contact_data.contact_name")[0].alias("contact_name"),  # Extract the first contact_name from the array
    F.col("parsed_contact_data.contact_surname")[0].alias("contact_surname"),  # Extract the first contact_surname from the array
    F.col("parsed_contact_data.city")[0].alias("contact_city"),  # Extract the first city from the array
    F.col("parsed_contact_data.cp")[0].alias("contact_cp"),  # Extract the first cp (postal code) from the array
    "salesowners"  # Select the salesowners column
)

# Step 8: Display the result
orders_df.show(100)  # Show the first 100 rows of the transformed dataframe

+--------------------+--------+--------------------+--------------------+----------+------------+---------------+--------------------+----------+--------------------+
|            order_id|    date|          company_id|        company_name|crate_type|contact_name|contact_surname|        contact_city|contact_cp|         salesowners|
+--------------------+--------+--------------------+--------------------+----------+------------+---------------+--------------------+----------+--------------------+
|f47ac10b-58cc-437...|29.01.22|1e2b47e6-499e-41c...|     Fresh Fruits Co|   Plastic|      Curtis|        Jackson|             Chicago|     12345|Leonard Cohen, Lu...|
|f47ac10b-58cc-437...|21.02.22|0f05a8f1-2bdf-4be...|         Veggies Inc|      Wood|       Maria|        Theresa|            Calcutta|      NULL|Luke Skywalker, D...|
|f47ac10b-58cc-437...|03.04.22|1e2b47e6-499e-41c...|    Fresh Fruits c.o|     Metal|        Para|        Cetamol|   Frankfurt am Oder|      3934|      Luke Skywalker

##### Export orders_df.csv

In [90]:
import os
import pandas as pd

desktop_path = os.path.join(os.path.expanduser("~"), "Desktop", "orders_df.csv")

orders_df_export = orders_df.toPandas()
orders_df_export.to_csv(desktop_path, index=False, encoding="utf-8")


# Load Invoicing_data.json

In [75]:
# URL del archivo JSON
url = "https://raw.githubusercontent.com/Digital-IFCO/data-engineering-test/refs/heads/main/resources/invoicing_data.json"

# Descargar el archivo JSON a una ubicación local
json_path = os.path.join(os.getcwd(), "invoicing_data.json")
response = requests.get(url)
if response.status_code == 200:
    with open(json_path, 'wb') as f:
        f.write(response.content)
    print(f"Archivo descargado en: {json_path}")
else:
    print(f"Error al descargar el archivo. Código de estado: {response.status_code}")
    exit()

# Leer el archivo JSON directamente como un DataFrame de Spark
invoicing_df = spark.read.option("multiline", "true").json(json_path)

# Inspect the structure of the DataFrame by printing its schema
invoicing_df.printSchema()

# Mostrar las primeras filas del DataFrame de Spark
invoicing_df.show(10, truncate=False)

Archivo descargado en: c:\Users\mouba\Desktop\invoicing_data.json
root
 |-- data: struct (nullable = true)
 |    |-- invoices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- companyId: string (nullable = true)
 |    |    |    |-- grossValue: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- orderId: string (nullable = true)
 |    |    |    |-- vat: string (nullable = true)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [76]:
# Access the 'invoices' array inside the 'data' field and explode it into separate rows
invoicing_df = invoicing_df.selectExpr("data.invoices as invoices").selectExpr("explode(invoices) as invoice")

# Flatten the structure to access the fields directly from the 'invoice' object
invoicing_df = invoicing_df.select("invoice.*")

# Display the resulting DataFrame without truncating values
invoicing_df.show(truncate=False)

+------------------------------------+----------+------------------------------------+------------------------------------+---+
|companyId                           |grossValue|id                                  |orderId                             |vat|
+------------------------------------+----------+------------------------------------+------------------------------------+---+
|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|324222    |e1e1e1e1-e1e1-e1e1-e1e1-e1e1e1e1e1e1|f47ac10b-58cc-4372-a567-0e02b2c3d479|0  |
|0f05a8f1-2bdf-4be7-8c82-4c9b58f04898|193498    |e2e2e2e2-e2e2-e2e2-e2e2-e2e2e2e2e2e2|f47ac10b-58cc-4372-a567-0e02b2c3d480|19 |
|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|345498    |e3e3e3e3-e3e3-e3e3-e3e3-e3e3e3e3e3e3|f47ac10b-58cc-4372-a567-0e02b2c3d481|21 |
|1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6|245412    |e4e4e4e4-e4e4-e4e4-e4e4-e4e4e4e4e4e4|f47ac10b-58cc-4372-a567-0e02b2c3d482|34 |
|34538e39-cd2e-4641-8d24-3c94146e6f16|145467    |e5e5e5e5-e5e5-e5e5-e5e5-e5e5e5e5e5e5|f47ac10b-58cc-4372

In [77]:
# Remove duplicate rows based on the 'orderID' column
invoicing_df = invoicing_df.dropDuplicates(["orderID"])

# Display the resulting DataFrame without truncating values
invoicing_df.show(truncate=False)

+------------------------------------+----------+------------------------------------+------------------------------------+---+
|companyId                           |grossValue|id                                  |orderId                             |vat|
+------------------------------------+----------+------------------------------------+------------------------------------+---+
|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|324222    |e1e1e1e1-e1e1-e1e1-e1e1-e1e1e1e1e1e1|f47ac10b-58cc-4372-a567-0e02b2c3d479|0  |
|0f05a8f1-2bdf-4be7-8c82-4c9b58f04898|193498    |e2e2e2e2-e2e2-e2e2-e2e2-e2e2e2e2e2e2|f47ac10b-58cc-4372-a567-0e02b2c3d480|19 |
|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|345498    |e3e3e3e3-e3e3-e3e3-e3e3-e3e3e3e3e3e3|f47ac10b-58cc-4372-a567-0e02b2c3d481|21 |
|1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6|245412    |e4e4e4e4-e4e4-e4e4-e4e4-e4e4e4e4e4e4|f47ac10b-58cc-4372-a567-0e02b2c3d482|34 |
|34538e39-cd2e-4641-8d24-3c94146e6f16|145467    |e5e5e5e5-e5e5-e5e5-e5e5-e5e5e5e5e5e5|f47ac10b-58cc-4372

##### Export invoicing_df.csv

In [89]:
desktop_path = os.path.join(os.path.expanduser("~"), "Desktop", "invoicing_df.csv")

invoicing_df_export = invoicing_df.toPandas()
invoicing_df_export.to_csv(desktop_path, index=False, encoding="utf-8")

# Test 1: Distribution of Crate Type per Company
- Calculate the distribution of crate types per company (number of orders per type). Ensure to include unit tests to validate the correctness of your calculations.

In [79]:
# Step 1: Calculate the distribution of 'crate_type' by company (number of orders per crate type)
crate_distribution_df = orders_df.groupBy("company_name", "crate_type") \
    .agg(F.count("order_id").alias("order_count"))  # Group by company and crate type, then count the number of order_id per group

# Step 2: Display the result
crate_distribution_df.show(100, truncate=False)  # Show the first 100 rows of the distribution, with no truncation of columns

+---------------------+----------+-----------+
|company_name         |crate_type|order_count|
+---------------------+----------+-----------+
|Fresh Fruits Co      |Metal     |2          |
|healthy snacks c.o.  |Wood      |1          |
|Green World Ltd      |Plastic   |1          |
|Fresh Berries Inc    |Plastic   |1          |
|Seafood Global Inc   |Metal     |1          |
|Organic Veggies Ltd  |Plastic   |1          |
|Seafood Supplier GmbH|Metal     |3          |
|Farms Global Co      |Metal     |1          |
|Tropical Farms Ltd   |Wood      |1          |
|Fresh Fruits Co      |Wood      |1          |
|Veggie Haven Co      |Plastic   |1          |
|Healthy Choices Co   |Plastic   |1          |
|Healthy Snacks       |Plastic   |2          |
|Veggies Inc          |Plastic   |1          |
|Tropical Fruits Ltd  |Metal     |1          |
|Veggies Inc          |Wood      |2          |
|Seafood Supplier     |Plastic   |3          |
|Global Veggies Inc   |Metal     |1          |
|Tropical Fre

In [80]:
# For unit tests, we can check the following:
# 1. Verify that the total number of orders by company is correct
# 2. Ensure that there are no null values in the "company_name" or "crate_type" fields

# Test 1: Verify that there are no null values in "company_name" or "crate_type"
assert crate_distribution_df.filter(F.col("company_name").isNull()).count() == 0, "There are null values in 'company_name'"
assert crate_distribution_df.filter(F.col("crate_type").isNull()).count() == 0, "There are null values in 'crate_type'"

# Test 2: Verify that the distribution is calculated correctly for a specific company
# Assume we want to check the distribution of 'crate_type' for "Fresh Fruits Co"
company_name = "Fresh Fruits Co"
expected_distribution = crate_distribution_df.filter(F.col("company_name") == company_name).collect()

# Ensure that the distribution for "Fresh Fruits Co" is calculated
assert len(expected_distribution) > 0, f"No distribution found for company {company_name}"

# Test 2: DataFrame of Orders with Full Name of the Contact
- Provide a DataFrame (df_1) containing the following columns:

  order_id:	The order_id field must contain the unique identifier of the order.
  contact_full_name:	The contact_full_name field must contain the full name of the contact. In case this information is not available, the placeholder "John Doe" should be utilized.

  Include unit tests to verify that the full names are correctly extracted and the placeholder is used appropriately.

In [81]:
from pyspark.sql import functions as F

# Step 1: Create the DataFrame df_1 with order_id and contact_full_name
df_1 = orders_df.select(
    "order_id",
    # If contact_name or contact_surname is empty or null, use "John Doe"
    F.when(F.col("contact_name").isNull() | (F.col("contact_name") == ""), "John Doe")  # Check if contact_name is null or empty
     .otherwise(F.concat(F.col("contact_name"), F.lit(" "), F.col("contact_surname"))).alias("contact_full_name")  # Concatenate contact_name and contact_surname
)

# Step 2: Display the result
df_1.show(100, truncate=False)

# Unit tests:

# Test 1: Verify that the 'contact_full_name' column does not contain any null values
assert df_1.filter(F.col("contact_full_name").isNull()).count() == 0, "There are null values in 'contact_full_name'"

# Test 2: Verify that the placeholder "John Doe" is used when names or surnames are null or empty
# Check that "John Doe" is used when 'contact_name' and 'contact_surname' are empty
placeholder_count = df_1.filter(F.col("contact_full_name") == "John Doe").count()

assert placeholder_count > 0, "The placeholder 'John Doe' was not used correctly"


+------------------------------------+-----------------+
|order_id                            |contact_full_name|
+------------------------------------+-----------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d479|Curtis Jackson   |
|f47ac10b-58cc-4372-a567-0e02b2c3d480|Maria Theresa    |
|f47ac10b-58cc-4372-a567-0e02b2c3d481|Para Cetamol     |
|f47ac10b-58cc-4372-a567-0e02b2c3d482|John Doe         |
|f47ac10b-58cc-4372-a567-0e02b2c3d483|John Doe         |
|f47ac10b-58cc-4372-a567-0e02b2c3d484|John Krasinski   |
|f47ac10b-58cc-4372-a567-0e02b2c3d485|John Doe         |
|f47ac10b-58cc-4372-a567-0e02b2c3d486|Jennifer Lopez   |
|f47ac10b-58cc-4372-a567-0e02b2c3d487|Liav Ichenbaum   |
|f47ac10b-58cc-4372-a567-0e02b2c3d488|Curtis Jackson   |
|f47ac10b-58cc-4372-a567-0e02b2c3d489|Anthony Pap      |
|f47ac10b-58cc-4372-a567-0e02b2c3d490|Natalia Romanov  |
|f47ac10b-58cc-4372-a567-0e02b2c3d491|Bruce Wayne      |
|f47ac10b-58cc-4372-a567-0e02b2c3d492|Clark Kent       |
|f47ac10b-58cc-4372-a567-0e02b2

# Test 3: DataFrame of Orders with Contact Address
- Provide a DataFrame (df_2) containing the following columns:

order_id: The order_id field must contain the unique identifier of the order.
contact_address:	The field for contact_address should adhere to the following information and format: "city name, postal code". In the event that the city name is not available, the placeholder "Unknown" should be used. Similarly, if the postal code is not known, the placeholder "UNK00" should be used.

Ensure to include unit tests to validate the address formatting and placeholder logic.

In [82]:
from pyspark.sql import functions as F

# Step 1: Create the DataFrame df_2 with order_id and contact_address
df_2 = orders_df.select(
    "order_id",
    # Constructing the contact_address
    F.concat_ws(
        ", ",  # Concatenate city and postal code with a comma separator
        F.when(F.col("contact_city").isNull() | (F.col("contact_city") == ""), "Unknown")  # If contact_city is null or empty, use "Unknown"
         .otherwise(F.col("contact_city")),  # Otherwise, use the actual contact_city value
        F.when(F.col("contact_cp").isNull() | (F.col("contact_cp") == ""), "UNK00")  # If contact_cp is null or empty, use "UNK00"
         .otherwise(F.col("contact_cp"))  # Otherwise, use the actual contact_cp value
    ).alias("contact_address")  # Alias the result as "contact_address"
)

# Step 2: Display the result
df_2.show(100, truncate=False)  # Show the first 100 rows without truncating values

# Unit tests:

# Test 1: Verify that there are no null values in the 'contact_address' column
assert df_2.filter(F.col("contact_address").isNull()).count() == 0, "There are null values in 'contact_address'"

# Test 2: Verify that the placeholder "Unknown" is used when the city is unavailable
unknown_city_count = df_2.filter(F.col("contact_address").contains("Unknown")).count()
assert unknown_city_count > 0, "The placeholder 'Unknown' was not used correctly for the city"

# Test 3: Verify that the placeholder "UNK00" is used when the postal code is unavailable
unk_cp_count = df_2.filter(F.col("contact_address").contains("UNK00")).count()
assert unk_cp_count > 0, "The placeholder 'UNK00' was not used correctly for the postal code"

+------------------------------------+-----------------------------+
|order_id                            |contact_address              |
+------------------------------------+-----------------------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d479|Chicago, 12345               |
|f47ac10b-58cc-4372-a567-0e02b2c3d480|Calcutta, UNK00              |
|f47ac10b-58cc-4372-a567-0e02b2c3d481|Frankfurt am Oder, 3934      |
|f47ac10b-58cc-4372-a567-0e02b2c3d482|Unknown, UNK00               |
|f47ac10b-58cc-4372-a567-0e02b2c3d483|Unknown, UNK00               |
|f47ac10b-58cc-4372-a567-0e02b2c3d484|New York, 1203               |
|f47ac10b-58cc-4372-a567-0e02b2c3d485|Unknown, UNK00               |
|f47ac10b-58cc-4372-a567-0e02b2c3d486|Esplugues de Llobregat, UNK00|
|f47ac10b-58cc-4372-a567-0e02b2c3d487|Tel Aviv, UNK00              |
|f47ac10b-58cc-4372-a567-0e02b2c3d488|Chicago, 12345               |
|f47ac10b-58cc-4372-a567-0e02b2c3d489|Barcelona, 8023              |
|f47ac10b-58cc-4372-a567-0e02b2c3d

# Test 4: Calculation of Sales Team Commissions
The Sales Team requires your assistance in computing the commissions. It is possible for multiple salespersons to be associated with a single order, as they may have participated in different stages of the order. The salesowners field comprises a ranked list of the salespeople who have ownership of the order. The first individual on the list represents the primary owner, while the subsequent individuals, if any, are considered co-owners who have contributed to the acquisition process. The calculation of commissions follows a specific procedure:

- Main Owner: 6% of the net invoiced value.
- Co-owner 1 (second in the list): 2.5% of the net invoiced value.
- Co-owner 2 (third in the list): 0.95% of the net invoiced value.
- The rest of the co-owners do not receive anything.

Provide a list of the distinct sales owners and their respective commission earnings. The list should be sorted in order of descending performance, with the sales owners who have generated the highest commissions appearing first.

Hint: Raw amounts are represented in cents. Please provide euro amounts with two decimal places in the results.

Include unit tests to verify the correctness of the commission calculations and sorting order.

In [83]:
from pyspark.sql.functions import col, explode, split

# Step 1. Perform an inner join between invoices_df and orders_df using the 'order_id'
joined_df = invoicing_df.join(orders_df, invoicing_df["orderId"] == orders_df["order_id"], how="inner")
# The join is performed on the 'orderId' column from 'invoicing_df' and 'order_id' from 'orders_df'

# Step 2. Split 'salesowners' by commas to create an array and then explode the array into separate rows
joined_df = joined_df.select(
    col("orderId").alias("order_id"),  # Keep the 'order_id' column
    col("grossValue").cast("double").alias("gross_value"),  # Cast the grossValue to double
    split(col("salesowners"), ",").alias("salesowners"),  # Split 'salesowners' by commas into an array
    explode(split(col("salesowners"), ", ")).alias("salesowner"),  # Explode the array to create separate rows for each salesowner
    col("vat"),
    (col("grossValue") * ((100 - col("vat")) / 100)).cast("double").alias("net_value")  # Calculate the net value by subtracting VAT from the gross value
)

# Step 3: Display the first 100 rows of the resulting DataFrame without truncating the values
joined_df.show(100, truncate=False)

+------------------------------------+-----------+-----------------------------------------------------------------+-----------------+---+------------------+
|order_id                            |gross_value|salesowners                                                      |salesowner       |vat|net_value         |
+------------------------------------+-----------+-----------------------------------------------------------------+-----------------+---+------------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d479|324222.0   |[Leonard Cohen,  Luke Skywalker,  Ammy Winehouse]                |Leonard Cohen    |0  |324222.0          |
|f47ac10b-58cc-4372-a567-0e02b2c3d479|324222.0   |[Leonard Cohen,  Luke Skywalker,  Ammy Winehouse]                |Luke Skywalker   |0  |324222.0          |
|f47ac10b-58cc-4372-a567-0e02b2c3d479|324222.0   |[Leonard Cohen,  Luke Skywalker,  Ammy Winehouse]                |Ammy Winehouse   |0  |324222.0          |
|f47ac10b-58cc-4372-a567-0e02b2c3d480|193498.0   |[L

In [84]:
# Step 7: Assign roles based on the position of 'salesowner' in the list
role_df = joined_df.withColumn(
    "role", 
    when(col("salesowner") == col("salesowners").getItem(0), "main_owner")  # First owner
    .when(col("salesowner") == col("salesowners").getItem(1), "co_owner_1")  # Second owner
    .when(col("salesowner") == col("salesowners").getItem(2), "co_owner_2")  # Third owner
    .otherwise("no_commission")  # For all other owners
)

# Step 8: Calculate commissions based on roles
commission_df = role_df.withColumn(
    "commission", 
    when(col("role") == "main_owner", col("net_value") * 0.06)  # 6% for the main owner
    .when(col("role") == "co_owner_1", col("net_value") * 0.025)  # 2.5% for the first co-owner
    .when(col("role") == "co_owner_2", col("net_value") * 0.0095)  # 0.95% for the second co-owner
    .otherwise(0)  # No commission for other roles
)

# Step 9: Sum commissions by 'salesowner' and order in descending order
result_df = commission_df.groupBy("salesowner") \
    .agg(F.sum("commission").alias("total_commission")) \
    .orderBy(col("total_commission").desc())  # Sort in descending order of total commission

# Show the result with total commissions by 'salesowner'
result_df.show(100, truncate=False)

+-----------------+------------------+
|salesowner       |total_commission  |
+-----------------+------------------+
|Leonard Cohen    |47715.678         |
|David Henderson  |26790.954         |
|Luke Skywalker   |25780.608         |
|Marianov Merschik|17478.899999999998|
|Ammy Winehouse   |16178.331000000002|
|Yuri Gagarin     |13674.276         |
|David Goliat     |9718.315200000001 |
|Chris Pratt      |8728.02           |
|Markus Söder     |2191.86           |
|Leon Leonov      |0.0               |
|Marie Curie      |0.0               |
|Vladimir Chukov  |0.0               |
+-----------------+------------------+



In [85]:
# Step 10: Convert to euros with two decimal places
# We divide the 'total_commission' by 100 to convert from cents to euros and round the result to two decimal places.
result_df = result_df.withColumn("total_commission", round(col("total_commission") / 100, 2))

# Display the result with the total commissions in euros
result_df.show(100, truncate=False)

+-----------------+----------------+
|salesowner       |total_commission|
+-----------------+----------------+
|Leonard Cohen    |477.16          |
|David Henderson  |267.91          |
|Luke Skywalker   |257.81          |
|Marianov Merschik|174.79          |
|Ammy Winehouse   |161.78          |
|Yuri Gagarin     |136.74          |
|David Goliat     |97.18           |
|Chris Pratt      |87.28           |
|Markus Söder     |21.92           |
|Leon Leonov      |0.0             |
|Marie Curie      |0.0             |
|Vladimir Chukov  |0.0             |
+-----------------+----------------+



In [86]:
# Step 11: Collect the results into a list
result = result_df.collect()

# Step 12: Ensure the commissions are positive (or zero)
# Loop through each row to check if 'total_commission' is greater than or equal to zero
for row in result:
    assert row["total_commission"] >= 0, f"Negative commission found for {row['salesowner']}: {row['total_commission']}"

# Step 13: Verify that the total commission values by 'salesowner' are consistent
# Assuming the input data is correct, we just check the general logic
for i in range(1, len(result)):
    # Check that 'salesowner' is not null
    assert result[i]["salesowner"] is not None, "'salesowner' cannot be null"
    
    # Check that 'total_commission' is not null
    assert result[i]["total_commission"] is not None, "'total_commission' cannot be null"
    
    # If there are any null values, they must be processed correctly by the system
    assert isinstance(result[i]["salesowner"], str), f"The value of 'salesowner' for {result[i]['salesowner']} is not a string"
    assert isinstance(result[i]["total_commission"], (int, float)), f"The value of 'total_commission' for {result[i]['salesowner']} is not numeric"

# Step 14: Verify that the aggregation and sorting are correct
# Ensure the total commission for each 'salesowner' is sorted in descending order
for i in range(1, len(result)):
    assert result[i-1]["total_commission"] >= result[i]["total_commission"], \
        f"The 'salesowner' {result[i-1]['salesowner']} should have a commission greater than or equal to {result[i]['salesowner']}"

# Test 5: DataFrame of Companies with Sales Owners
Provide a DataFrame (df_3) containing the following columns:

- company_id:	The company_id field must contain the unique identifier of the company.
- company_name:	The company_name field must contain the name of the company.
- list_salesowners:	The list_salesowners field should contain a unique and comma-separated list of salespeople who have participated in at least one order of the company. 

Please ensure that the list is sorted in ascending alphabetical order of the first name.

Hint: Consider the possibility of duplicate companies stored under multiple IDs in the database. Take this into account while devising a solution to this exercise.

Ensure to include unit tests to validate the uniqueness and sorting of the sales owners list, and the handling of duplicate companies.

In [87]:
from pyspark.sql import functions as F

# Step 1: Split the 'salesowners' string into a list and explode it into individual rows for each salesowner
df_3 = orders_df \
    .withColumn("salesowners_list", F.split(orders_df["salesowners"], ",")) \
    .withColumn("salesowners_list", F.explode("salesowners_list")) \
    .select("company_id", "company_name", "salesowners_list") \
    .withColumn("salesowners_list", F.trim("salesowners_list")) \
    .distinct()  # Remove duplicate entries for the same company_id and salesowner

# Step 2: Group by 'company_id' and aggregate company_name and salesowners_list
df_3 = df_3 \
    .groupBy("company_id") \
    .agg(
        F.concat_ws(", ", F.collect_set("company_name")).alias("company_name"),  # Concatenate company names, removing duplicates
        F.collect_list("salesowners_list").alias("salesowners_list")  # Collect all salesowners for each company_id into a list
    )

# Step 3: Sort the salesowners_list and remove any duplicates
df_3 = df_3 \
    .withColumn("salesowners_list", F.array_sort(F.array_distinct("salesowners_list")))  # Sort the salesowners list and remove duplicates

# Step 4: Convert the salesowners_list into a comma-separated string
df_3 = df_3 \
    .withColumn("list_salesowners", F.concat_ws(", ", "salesowners_list"))  # Concatenate the salesowners into a single string

# Step 5: Select the final columns and sort the result by company_name
df_3 = df_3 \
    .select("company_id", "company_name", "list_salesowners") \
    .sort("company_name")  # Sort the DataFrame by company_name in ascending order

# Display the result
df_3.show(100, truncate=False)


+------------------------------------+--------------------------------------+---------------------------------------------------------------------------+
|company_id                          |company_name                          |list_salesowners                                                           |
+------------------------------------+--------------------------------------+---------------------------------------------------------------------------+
|8f1c5d4a-9045-4be5-bb38-7f587f478a92|Farm Fresh Co                         |Ammy Winehouse, Chris Pratt, Leonard Cohen                                 |
|2122bb43-1a5e-4f8d-8bb9-4987c9d4a8df|Farm Fresh Ltd                        |Leonard Cohen, Marie Curie, Yuri Gagarin                                   |
|4a7561b1-1de1-420a-93ed-2c12a5bbd1ab|Farms Global Co                       |David Goliat, Leonard Cohen                                                |
|4a98d9ec-65f6-438f-9a0c-0d9e1a6f7c65|Fresh Berries Inc                     

In [88]:
# Test 1: Verify that salesowners are unique for each company
for row in df_3.collect():
    salesowners_list = row["list_salesowners"].split(",")  # Split the list of salesowners into individual items
    assert len(salesowners_list) == len(set(salesowners_list)), f"Duplicates found in the salesowner list for {row['company_name']}."  # Check if there are any duplicates

# Test 2: Verify that the list of salesowners is sorted alphabetically
for row in df_3.collect():
    salesowners_list = row["list_salesowners"].split(", ")  # Split the salesowners string into a list
    sorted_salesowners_list = sorted(salesowners_list)  # Sort the list alphabetically
    assert salesowners_list == sorted_salesowners_list, f"Salesowners are not sorted alphabetically for {row['company_name']}."  # Ensure the list is sorted

# Test 3: Verify that there is no duplication of companies
# Count the occurrences of each company_id in the DataFrame
duplicate_companies = df_3.groupBy("company_id").count().filter("count > 1")  # Group by company_id and count occurrences

# Verify that there are no duplicate companies
assert duplicate_companies.count() == 0, "Duplicate companies found in the DataFrame."  # Ensure no duplicates exist
