# Marvik Technical Test 

```plaintext
+-------------------+        +--------------------+        +-------------------+        +-------------------+
|                   |        |                    |        |                   |        |                   |
| AWS S3 Bucket     |        | Data Transformation|        | Generate Report   |        | Local File        |
| (Input Data)      | -----> | (Filter by Quarter | -----> | (Per Quarter and  | -----> | (Output Report)   |
| - Customer Data   |        |  and Traffic       |        |  Traffic Source)  |        |                   |
| - Product Data    |        |  Source)           |        |                   |        |                   |
| - Order Data      |        |                    |        |                   |        |                   |
+-------------------+        +--------------------+        +-------------------+        +-------------------+

We build the solution for it to be able to bring any year by input, including multiple years or even a time period of years (always bringing Q1 results)

In [10]:
!pip install pyspark==3.5.0
!wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
!wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.710/aws-java-sdk-bundle-1.12.710.jar

Defaulting to user installation because normal site-packages is not writeable


# Imports

In [1]:
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.functions import col, sum as _sum, count, lag, round
from pyspark.sql.window import Window
from typing import List
import os

# Functions

We will work with Q1 data of the selected year, plus the previous Quarter (so we can calculate the changes in number of purchases and profits from the previous period). 
Example:
    We bring 2023 and 2024 data -> this will be Q1 data for 2023 and Q1 for 2024

In [None]:
def clean_data(df: DataFrame) -> DataFrame:
    df = df.dropna(subset=["created_at_year", "created_at_month", "sale_price", "product_cost"])
    df = df.filter((col("sale_price") >= 0) & (col("product_cost") >= 0))
    df = df.dropDuplicates()
    
    return df

In [None]:
def read_data_from_s3(spark: SparkSession, bucket_name: str, years: List[int]) -> DataFrame:
    """
    Read data from S3 bucket and filter by year and month.
    - Must be Q1 data
    - Years is a list of years inputted by user
    """
    base_path = f"s3a://{bucket_name}/"
    df = spark.read.parquet(base_path)
    df = df.filter(
        (col("created_at_year").isin(years)) & (col("created_at_month").between(1, 3))
    )
    
    # Print the number of rows grouped by created_at_year
    row_counts = df.groupBy("created_at_year").count()
    row_counts.show()

    df = clean_data(df)

    invalid_data = df.filter(
        (~col("created_at_year").isin(years)) | (~col("created_at_month").between(1, 3))
    )
    if invalid_data.count() > 0:
        raise ValueError("Process contains data outside Q1 or years not included in the years list.")

    return df

1. We group by each trafic source
2. We aggregate a sum of all items that we wanna calculate (total spent and profits)
3. We aggregate a count of all the items 
4. Calculate the percentage change in the number of purchases between the years period and the last
5. Calculate the percentage change in the profit between the years period and the last
We could calculate over time, month by month. Since problem asks for Quarter comparison, we leave it at that

In [3]:
def transform_data(df: DataFrame) -> DataFrame:
    """
    Transforms the input data to generate metrics per quarter and traffic source.
    Number of purchases and total spent are calculated for each traffic source.
    The function returns a DataFrame with the following columns:
    - quarter: The quarter (e.g., Q1, Q2, etc.)
    - traffic_source: The source of the traffic (e.g., Organic, YouTube, Email, etc.)
    - number of purchases: The total amount of purchases from that traffic source
    - profit: The profit made from that traffic source (total_spent - total_cost)
    - percentage_change_purchases: Percentage change in the number of purchases from the previous quarter
    - percentage_change_profit: Percentage change in the profit from the previous quarter
    
    Input dataset: Q1 of selected years
    """
    # Group by year and traffic source to calculate total metrics
    df = df.groupBy("created_at_year", "traffic_source").agg(
        round(_sum(col("sale_price") - col("product_cost")), 2).alias("profit"),  # Profit is sale_price - product_cost
        count("product_id").alias("number_of_purchases")
    )

    # Define a window partitioned by traffic_source and ordered by year
    window_spec = Window.partitionBy("traffic_source").orderBy("created_at_year")

    # Calculate percentage change in purchases and profit
    df = df.withColumn(
        "percentage_change_purchases",
        (col("number_of_purchases") - lag("number_of_purchases").over(window_spec)) /
        lag("number_of_purchases").over(window_spec) * 100
    ).withColumn(
        "percentage_change_profit",
        (col("profit") - lag("profit").over(window_spec)) /
        lag("profit").over(window_spec) * 100
    )
    # Round percentage change columns to 2 decimals
    df = df.withColumn("percentage_change_purchases", round(col("percentage_change_purchases"), 2)) \
           .withColumn("percentage_change_profit", round(col("percentage_change_profit"), 2))
   
    #  Lag() function accesses the previous row in the window, so we need to handle null values
    df = df.fillna(0, subset=["percentage_change_purchases", "percentage_change_profit"])
    
    return df

# Arguments

In [4]:
years = [2019, 2020, 2021, 2022, 2023, 2024]

# Process

In [5]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /content/hadoop-aws-3.3.4.jar,/content/aws-java-sdk-bundle-1.12.710.jar pyspark-shell'

In [6]:

spark = SparkSession.builder \
    .appName("Read Public S3 Parquet") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider') \
    .master("local[*]") \
    .getOrCreate()
# Set the logging level to ERROR
spark.sparkContext.setLogLevel("ERROR")  # If not, pyspark commonly prints any log that it finds

your 131072x1 screen size is bogus. expect trouble
25/03/30 23:54:38 WARN Utils: Your hostname, ibum resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/03/30 23:54:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ibum/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ibum/.ivy2/cache
The jars for the packages stored in: /home/ibum/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10be102f-cc9d-4cf7-ad73-9361f46aee5e;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 389ms :: artifacts dl 17ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	------------------------------------

In [7]:
bucket_name = "da-exercise/thelook_ecommerce/purchases/"

Q1 data goes from January 1st to March 31 (included)
We see that we indeed have data outside our scope (April's data). We filter for all rows from January (1) to March (3)

In [8]:
data_raw = read_data_from_s3(spark, bucket_name, years)
data_raw.show(5)

                                                                                

+-------+----------+---------+--------------------+---+------+-----------+--------+-------------+--------+----------+------------+----------------+------------------+--------------------+------------+----------+-------------------+----------------+---------------+--------------+--------------------+
|user_id|first_name|last_name|               email|age|gender|postal_code|    city|      country|order_id|product_id|product_cost|product_category|product_department|        product_name|order_status|sale_price|         created_at|created_at_month|created_at_year|traffic_source|          session_id|
+-------+----------+---------+--------------------+---+------+-----------+--------+-------------+--------+----------+------------+----------------+------------------+--------------------+------------+----------+-------------------+----------------+---------------+--------------+--------------------+
|  28950|     Scott|  Sanders|scottsanders@exam...| 26|     M|   359-0025|      所|        Japan| 

In [11]:
data = transform_data(data_raw)
data.show()



+---------------+--------------+---------+-------------------+---------------------------+------------------------+
|created_at_year|traffic_source|   profit|number_of_purchases|percentage_change_purchases|percentage_change_profit|
+---------------+--------------+---------+-------------------+---------------------------+------------------------+
|           2019|       Adwords|  1486.38|                 49|                        0.0|                     0.0|
|           2020|       Adwords| 18650.94|                615|                     1155.1|                 1154.79|
|           2021|       Adwords| 38513.75|               1217|                      97.89|                   106.5|
|           2022|       Adwords| 67450.07|               2170|                      78.31|                   75.13|
|           2023|       Adwords|115143.54|               3692|                      70.14|                   70.71|
|           2024|       Adwords|265011.25|               8332|          

                                                                                

In [None]:
# No other data cleaning processes are needed
data_cleaned = data.dropDuplicates()

In [12]:

output_path = "report_2019_2024.csv"
data_cleaned.write.csv(output_path, header=True)

print(f"Cleaned data saved to {output_path}")



Cleaned data saved to report_2019_2024.csv


                                                                                