In [1]:
from pipeline.config import INPUT_DIR, STATIC_DIR
print(INPUT_DIR)
print(STATIC_DIR)


/home/jovyan/work/data/input
/home/jovyan/work/data/statics


In [2]:
from pipeline.spark_session import create_spark_session

spark = create_spark_session()
spark


In [3]:
df = spark.range(0, 10)
df.show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [4]:
from pipeline.io_readers import read_customers, read_refunds, read_orders_for_date

customers_df = read_customers(spark)
refunds_df = read_refunds(spark)
orders_df = read_orders_for_date(spark, "2025-03-01")

customers_df.show(5)
refunds_df.show(5)
orders_df.show(5)


+-----------+----------+---------+-----------------+--------+---------+
|customer_id|first_name|last_name|            email|    city|is_active|
+-----------+----------+---------+-----------------+--------+---------+
|      C0001|     User1|    Test1|user1@example.com|  Nantes|     true|
|      C0002|     User2|    Test2|user2@example.com|Toulouse|     true|
|      C0003|     User3|    Test3|user3@example.com|Bordeaux|     true|
|      C0004|     User4|    Test4|user4@example.com|Bordeaux|     true|
|      C0005|     User5|    Test5|user5@example.com|    Lyon|     true|
+-----------+----------+---------+-----------------+--------+---------+
only showing top 5 rows

+---------+-------------+------+----------+-------------------+
|refund_id|     order_id|amount|    reason|         created_at|
+---------+-------------+------+----------+-------------------+
|  R000001|O202503010089| -6.68|     delay|2025-03-01 14:03:41|
|  R000002|O202503010038| -8.89|   gesture|2025-03-01 22:16:56|
|  R000

In [5]:
from pipeline.spark_session import create_spark_session
from pipeline.io_readers import read_customers, read_orders_for_date
from pipeline.transformations import filter_paid_orders, join_active_customers

spark = create_spark_session()

customers_df = read_customers(spark)
orders_df = read_orders_for_date(spark, "2025-03-01")

paid_orders_df = filter_paid_orders(orders_df)
joined_df = join_active_customers(paid_orders_df, customers_df)

joined_df.select(
    "order_id",
    "customer_id",
    "payment_status",
    "city",
    "is_active",
).show(10, truncate=False)

print("Orders count :", orders_df.count())
print("Paid orders count :", paid_orders_df.count())
print("After join with active customers :", joined_df.count())


+-------------+-----------+--------------+---------+---------+
|order_id     |customer_id|payment_status|city     |is_active|
+-------------+-----------+--------------+---------+---------+
|O202503010035|C0014      |paid          |Paris    |true     |
|O202503010079|C0016      |paid          |Lille    |true     |
|O202503010023|C0017      |paid          |Paris    |true     |
|O202503010050|C0045      |paid          |Bordeaux |true     |
|O202503010048|C0078      |paid          |Nice     |true     |
|O202503010097|C0103      |paid          |Marseille|true     |
|O202503010081|C0116      |paid          |Nice     |true     |
|O202503010030|C0127      |paid          |Nantes   |true     |
|O202503010036|C0135      |paid          |Paris    |true     |
|O202503010065|C0137      |paid          |Bordeaux |true     |
+-------------+-----------+--------------+---------+---------+
only showing top 10 rows

Orders count : 103
Paid orders count : 92
After join with active customers : 80


In [6]:
from pipeline.spark_session import create_spark_session
from pipeline.io_readers import read_customers, read_orders_for_date
from pipeline.transformations import (
    filter_paid_orders,
    join_active_customers,
    explode_items,
)

spark = create_spark_session()

customers_df = read_customers(spark)
orders_df = read_orders_for_date(spark, "2025-03-01")

paid_orders_df = filter_paid_orders(orders_df)
joined_df = join_active_customers(paid_orders_df, customers_df)
items_df = explode_items(joined_df)

items_df.show(10, truncate=False)
items_df.printSchema()


+-------------+-----------+-------+-------------------+---------+------+---+----------+
|order_id     |customer_id|channel|created_at         |city     |sku   |qty|unit_price|
+-------------+-----------+-------+-------------------+---------+------+---+----------+
|O202503010001|C0793      |app    |2025-03-01 20:36:44|Toulouse |SKU001|4  |24.9      |
|O202503010002|C0676      |web    |2025-03-01 11:30:49|Marseille|SKU042|4  |7.5       |
|O202503010003|C0642      |web    |2025-03-01 07:27:00|Toulouse |SKU014|1  |5.0       |
|O202503010005|C0571      |web    |2025-03-01 22:29:42|Toulouse |SKU001|1  |2.5       |
|O202503010007|C0464      |app    |2025-03-01 15:50:48|Nantes   |SKU018|1  |24.9      |
|O202503010007|C0464      |app    |2025-03-01 15:50:48|Nantes   |SKU046|5  |9.9       |
|O202503010007|C0464      |app    |2025-03-01 15:50:48|Nantes   |SKU047|1  |5.0       |
|O202503010008|C0317      |app    |2025-03-01 20:56:15|Marseille|SKU039|2  |15.0      |
|O202503010008|C0317      |app  

In [7]:
from pipeline.transformations import (
    filter_paid_orders,
    join_active_customers,
    explode_items,
    filter_negative_prices,
)

spark = create_spark_session()

customers_df = read_customers(spark)
orders_df = read_orders_for_date(spark, "2025-03-01")

paid_df = filter_paid_orders(orders_df)
joined_df = join_active_customers(paid_df, customers_df)
items_df = explode_items(joined_df)

clean_items_df, rejected_items_df = filter_negative_prices(items_df)

print("Total items :", items_df.count())
print("Valid items :", clean_items_df.count())
print("Rejected items :", rejected_items_df.count())

clean_items_df.show(10, truncate=False)
rejected_items_df.show(10, truncate=False)


Total items : 199
Valid items : 198
Rejected items : 1
+-------------+-----------+-------+-------------------+---------+------+---+----------+
|order_id     |customer_id|channel|created_at         |city     |sku   |qty|unit_price|
+-------------+-----------+-------+-------------------+---------+------+---+----------+
|O202503010001|C0793      |app    |2025-03-01 20:36:44|Toulouse |SKU001|4  |24.9      |
|O202503010002|C0676      |web    |2025-03-01 11:30:49|Marseille|SKU042|4  |7.5       |
|O202503010003|C0642      |web    |2025-03-01 07:27:00|Toulouse |SKU014|1  |5.0       |
|O202503010005|C0571      |web    |2025-03-01 22:29:42|Toulouse |SKU001|1  |2.5       |
|O202503010007|C0464      |app    |2025-03-01 15:50:48|Nantes   |SKU018|1  |24.9      |
|O202503010007|C0464      |app    |2025-03-01 15:50:48|Nantes   |SKU046|5  |9.9       |
|O202503010007|C0464      |app    |2025-03-01 15:50:48|Nantes   |SKU047|1  |5.0       |
|O202503010008|C0317      |app    |2025-03-01 20:56:15|Marseille|

In [8]:
from pipeline.io_readers import read_customers, read_refunds, read_orders_for_date
from pipeline.transformations import (
    filter_paid_orders,
    join_active_customers,
    explode_items,
    filter_negative_prices,
)
from pipeline.aggregations import compute_daily_city_sales

spark = create_spark_session()

customers_df = read_customers(spark)
refunds_df = read_refunds(spark)
orders_df = read_orders_for_date(spark, "2025-03-01")

paid_df = filter_paid_orders(orders_df)
joined_df = join_active_customers(paid_df, customers_df)
items_df = explode_items(joined_df)
clean_items_df, rejected_items_df = filter_negative_prices(items_df)

daily_city_sales_df = compute_daily_city_sales(clean_items_df, refunds_df)

daily_city_sales_df.show(20, truncate=False)
daily_city_sales_df.printSchema()


+----------+---------+-------+------------+----------------+----------+------------------+------------------+------------------+
|date      |city     |channel|orders_count|unique_customers|items_sold|gross_revenue_eur |refunds_eur       |net_revenue_eur   |
+----------+---------+-------+------------+----------------+----------+------------------+------------------+------------------+
|2025-03-01|Bordeaux |app    |4           |4               |27        |325.4             |25.549999999999997|350.95            |
|2025-03-01|Paris    |web    |7           |7               |53        |776.8000000000001 |6.76              |783.5600000000001 |
|2025-03-01|Nantes   |web    |1           |1               |8         |77.4              |0.0               |77.4              |
|2025-03-01|Marseille|app    |9           |9               |69        |594.4             |62.45             |656.85            |
|2025-03-01|Nice     |app    |6           |6               |50        |733.6             |24.7000

In [9]:
from pipeline.writers import write_daily_summary_csv

output_path = write_daily_summary_csv(daily_city_sales_df, "2025-03-01")
print(output_path)


/home/jovyan/work/output/daily_summary/daily_summary_20250301.csv
