In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Challenge").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/03 19:13:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
products_df = spark.read.csv("../../data/products.csv", header=True)

products_df.show(5)

+---+---------+--------+-----+
| id|     name|category|price|
+---+---------+--------+-----+
|  1|Product_1|   Pants|92.55|
|  2|Product_2|  Shirts|43.11|
|  3|Product_3| Jackets|59.02|
|  4|Product_4|   Shoes|49.65|
|  5|Product_5|   Pants|44.59|
+---+---------+--------+-----+
only showing top 5 rows



In [3]:
orders_df = spark.read.csv("../../data/orders.csv", header=True).withColumnRenamed("id", "order_id")

# Show the DataFrame
orders_df.show(5)

+--------+----------+--------+------------+
|order_id|product_id|quantity|created_date|
+--------+----------+--------+------------+
|       1|        11|       1|'2024-12-01'|
|       2|        17|       2|'2024-12-01'|
|       3|        19|       1|'2024-12-01'|
|       4|        12|       1|'2024-12-01'|
|       5|        11|       2|'2024-12-01'|
+--------+----------+--------+------------+
only showing top 5 rows



In [4]:
data_df = orders_df.join(products_df, orders_df.product_id == products_df.id, "left")
data_df.show(5)

+--------+----------+--------+------------+---+----------+--------+-----+
|order_id|product_id|quantity|created_date| id|      name|category|price|
+--------+----------+--------+------------+---+----------+--------+-----+
|       1|        11|       1|'2024-12-01'| 11|Product_11| Dresses|69.06|
|       2|        17|       2|'2024-12-01'| 17|Product_17|  Shirts|98.51|
|       3|        19|       1|'2024-12-01'| 19|Product_19|  Shirts|51.94|
|       4|        12|       1|'2024-12-01'| 12|Product_12| Jackets|50.99|
|       5|        11|       2|'2024-12-01'| 11|Product_11| Dresses|69.06|
+--------+----------+--------+------------+---+----------+--------+-----+
only showing top 5 rows



In [5]:
data_df = data_df.withColumn("total_price_br", data_df.quantity * data_df.price)
data_df.show(5)

+--------+----------+--------+------------+---+----------+--------+-----+--------------+
|order_id|product_id|quantity|created_date| id|      name|category|price|total_price_br|
+--------+----------+--------+------------+---+----------+--------+-----+--------------+
|       1|        11|       1|'2024-12-01'| 11|Product_11| Dresses|69.06|         69.06|
|       2|        17|       2|'2024-12-01'| 17|Product_17|  Shirts|98.51|        197.02|
|       3|        19|       1|'2024-12-01'| 19|Product_19|  Shirts|51.94|         51.94|
|       4|        12|       1|'2024-12-01'| 12|Product_12| Jackets|50.99|         50.99|
|       5|        11|       2|'2024-12-01'| 11|Product_11| Dresses|69.06|        138.12|
+--------+----------+--------+------------+---+----------+--------+-----+--------------+
only showing top 5 rows



In [6]:
data_df = data_df.withColumnRenamed("created_date","order_created_date")
data_df = data_df.withColumnRenamed("name","product_name")
data_df.show(5)

+--------+----------+--------+------------------+---+------------+--------+-----+--------------+
|order_id|product_id|quantity|order_created_date| id|product_name|category|price|total_price_br|
+--------+----------+--------+------------------+---+------------+--------+-----+--------------+
|       1|        11|       1|      '2024-12-01'| 11|  Product_11| Dresses|69.06|         69.06|
|       2|        17|       2|      '2024-12-01'| 17|  Product_17|  Shirts|98.51|        197.02|
|       3|        19|       1|      '2024-12-01'| 19|  Product_19|  Shirts|51.94|         51.94|
|       4|        12|       1|      '2024-12-01'| 12|  Product_12| Jackets|50.99|         50.99|
|       5|        11|       2|      '2024-12-01'| 11|  Product_11| Dresses|69.06|        138.12|
+--------+----------+--------+------------------+---+------------+--------+-----+--------------+
only showing top 5 rows



In [7]:
import requests as req
import os
API_URL = "https://api.freecurrencyapi.com/v1/latest?currencies=USD&base_currency=BRL"
API_KEY = os.environ.get("API_KEY")

response = req.get(API_URL,headers={"apikey": API_KEY})
response.raise_for_status()
data = response.json()
usd_convert = data["data"]["USD"]
usd_convert

0.1842916856

In [8]:
from pyspark.sql.functions import lit
data_df = data_df.withColumn("total_price_us", data_df.total_price_br * lit(usd_convert))
data_df.show(5)

+--------+----------+--------+------------------+---+------------+--------+-----+--------------+-----------------+
|order_id|product_id|quantity|order_created_date| id|product_name|category|price|total_price_br|   total_price_us|
+--------+----------+--------+------------------+---+------------+--------+-----+--------------+-----------------+
|       1|        11|       1|      '2024-12-01'| 11|  Product_11| Dresses|69.06|         69.06|  12.727183807536|
|       2|        17|       2|      '2024-12-01'| 17|  Product_17|  Shirts|98.51|        197.02|  36.309147896912|
|       3|        19|       1|      '2024-12-01'| 19|  Product_19|  Shirts|51.94|         51.94|   9.572110150064|
|       4|        12|       1|      '2024-12-01'| 12|  Product_12| Jackets|50.99|         50.99|9.397033048744001|
|       5|        11|       2|      '2024-12-01'| 11|  Product_11| Dresses|69.06|        138.12|  25.454367615072|
+--------+----------+--------+------------------+---+------------+--------+-----

In [9]:
data_df.select(
    "order_created_date",
    "order_id",
    "product_name",
    "quantity",
    "total_price_br",
    "total_price_us"
).write.csv("fixed_order_full_information.csv", header=True)

In [10]:
from pyspark.sql.functions import max
df_mao = data_df.groupBy("order_created_date").count()
max_df_mao = df_mao.select(max("count").alias("count"))
date_max_amount_orders = df_mao.join(max_df_mao,
    df_mao["count"] == max_df_mao["count"], "inner"
).select("order_created_date")

date_max_amount_orders.show()

+------------------+
|order_created_date|
+------------------+
|      '2024-12-06'|
+------------------+



In [11]:
from pyspark.sql.functions import sum
df_group_p = data_df.groupBy("product_id").agg(
    sum("quantity").alias("quantity"),
    sum("price").alias("price")
)

max_df_group_p = df_group_p.select(max("quantity").alias("quantity"))
most_demanded_product = df_group_p.join(max_df_group_p,
    df_group_p["quantity"] == max_df_group_p["quantity"], "inner"
)

most_demanded_product = most_demanded_product.select("product_id","price")
most_demanded_product.show()


+----------+------+
|product_id| price|
+----------+------+
|         5|356.72|
+----------+------+



In [12]:
from pyspark.sql.functions import sum, col, collect_list, concat_ws
most_demanded_categories = data_df.groupBy("category").agg(
    sum("quantity").alias("quantity")
).orderBy(col("quantity").desc()).select("category").limit(3).select(
    concat_ws(",", collect_list("category")).alias("category")
)

most_demanded_categories.show()

+--------------------+
|            category|
+--------------------+
|Shirts,Jackets,Pants|
+--------------------+



In [14]:
date_max_amount_orders.join(
    most_demanded_product
).join(
    most_demanded_categories
).show()

+------------------+----------+------+--------------------+
|order_created_date|product_id| price|            category|
+------------------+----------+------+--------------------+
|      '2024-12-06'|         5|356.72|Shirts,Jackets,Pants|
+------------------+----------+------+--------------------+

