In [5]:
import polars as pl
from deltalake import DeltaTable, write_deltalake

In [6]:
bronze_path = 'C:/Users/Usuario/Desktop/multiple_source_ETL/data/bronze_layer'
silver_path = 'C:/Users/Usuario/Desktop/multiple_source_ETL/data/silver_layer'
gold_path = 'C:/Users/Usuario/Desktop/multiple_source_ETL/data/gold_layer'

## **Converting everything to the same format (Parquet)**

### **Reading from bronze layer**

In [7]:
# Customers
df_customers_2023 = pl.read_parquet(f'{bronze_path}/2023/2023_customers.parquet')
df_customers_2024 = pl.read_csv(f'{bronze_path}/2024/customers.csv')
df_customers_2025 = pl.read_parquet(f'{bronze_path}/2025/customers.parquet')

# Employees
df_employees_2023 = pl.read_csv(f'{bronze_path}/2023/2023_employees.csv')
df_employees_2024 = pl.read_csv(f'{bronze_path}/2024/employees.csv')
df_employees_2025 = pl.read_csv(f'{bronze_path}/2025/employees.csv')

# Orders
df_orders_2023 = pl.read_parquet(f'{bronze_path}/2023/2023_orders.parquet')
df_orders_2024 = pl.read_csv(f'{bronze_path}/2024/orders.csv')
df_orders_2025 = pl.read_parquet(f'{bronze_path}/2025/orders.parquet')

# Products
df_products_2023 = pl.read_csv(f'{bronze_path}/2023/2023_products.csv')
df_products_2024 = pl.read_csv(f'{bronze_path}/2024/products.csv')
df_products_2025 = pl.read_csv(f'{bronze_path}/2025/products.csv')

### **Writing to silver layer**

In [8]:
# Customers
df_customers_2023.write_parquet(f'{silver_path}/2023/customers/customers_2023.parquet')
df_customers_2024.write_parquet(f'{silver_path}/2024/customers/customers_2024.parquet')
df_customers_2025.write_parquet(f'{silver_path}/2025/customers/customers_2025.parquet')

# Employees
df_employees_2023.write_parquet(f'{silver_path}/2023/employees/employees_2023.parquet')
df_employees_2024.write_parquet(f'{silver_path}/2024/employees/employees_2024.parquet')
df_employees_2025.write_parquet(f'{silver_path}/2025/employees/employees_2025.parquet')

# Orders
df_orders_2023.write_parquet(f'{silver_path}/2023/orders/orders_2023.parquet')
df_orders_2024.write_parquet(f'{silver_path}/2024/orders/orders_2024.parquet')
df_orders_2025.write_parquet(f'{silver_path}/2025/orders/orders_2025.parquet')

# Products
df_products_2023.write_parquet(f'{silver_path}/2023/products/products_2023.parquet')
df_products_2024.write_parquet(f'{silver_path}/2024/products/products_2024.parquet')
df_products_2025.write_parquet(f'{silver_path}/2025/products/products_2025.parquet')

### **Reading (as `Parquet`) from silver layer**

In [9]:
# Customers
df_customers_2023 = pl.read_parquet(f'{silver_path}/2023/customers/customers_2023.parquet')
df_customers_2024 = pl.read_parquet(f'{silver_path}/2024/customers/customers_2024.parquet')
df_customers_2025 = pl.read_parquet(f'{silver_path}/2025/customers/customers_2025.parquet')

# Employees
df_employees_2023 = pl.read_parquet(f'{silver_path}/2023/employees/employees_2023.parquet')
df_employees_2024 = pl.read_parquet(f'{silver_path}/2024/employees/employees_2024.parquet')
df_employees_2025 = pl.read_parquet(f'{silver_path}/2025/employees/employees_2025.parquet')

# Orders
df_orders_2023 = pl.read_parquet(f'{silver_path}/2023/orders/orders_2023.parquet')
df_orders_2024 = pl.read_parquet(f'{silver_path}/2024/orders/orders_2024.parquet')
df_orders_2025 = pl.read_parquet(f'{silver_path}/2025/orders/orders_2025.parquet')

# Products
df_products_2023 = pl.read_parquet(f'{silver_path}/2023/products/products_2023.parquet')
df_products_2024 = pl.read_parquet(f'{silver_path}/2024/products/products_2024.parquet')
df_products_2025 = pl.read_parquet(f'{silver_path}/2025/products/products_2025.parquet')

Now, all the files have the same format: **(Parquet)**

---
## **Employees Transformations**

1. Creating the column "level" based on employees years of experience

2. Updating senior employees (with salaries lower than 5000) salary by 10%

In [10]:
def transform_employees(df: pl.DataFrame) -> pl.DataFrame:
    return (

      # 1. Creating the column "level" based on employees YOE
        df.with_columns([
            pl.when(pl.col("years_of_experience") < 2)
              .then(pl.lit("Junior"))
              .when((pl.col("years_of_experience") >= 2) & (pl.col("years_of_experience") < 5))
              .then(pl.lit("Semi Senior"))
              .otherwise(pl.lit("Senior"))
              .alias("level")
        ])

        # 2. Updating senior employees (with salaries lower than 5000) salary by 10%
        .with_columns([
            pl.when((pl.col("level") == "Senior") & (pl.col("salary") < 5000))
              .then(pl.col("salary") * 1.1)
              .otherwise(pl.col("salary"))
              .alias("salary")
        ])
    )

df_employees_2023 = transform_employees(df_employees_2023)
df_employees_2024 = transform_employees(df_employees_2024)
df_employees_2025 = transform_employees(df_employees_2025)

---
## **Orders Transformations**

Adding the column "`total`" as (`quantity` * `amount`)

In [11]:
def transform_orders(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns([
    (pl.col("quantity") * pl.col("amount")).alias("total")
])

df_orders_2023 = transform_orders(df_orders_2023)
df_orders_2024 = transform_orders(df_orders_2024)
df_orders_2025 = transform_orders(df_orders_2025)

---
## **Customers Transformations**

Assigning a category to each customer based on their amount of orders (`order_id`)

In [12]:
def transform_customers(df_customers: pl.DataFrame, df_orders: pl.DataFrame) -> pl.DataFrame:

    # 1. Count orders by client
    orders_count = df_orders.group_by("customer_id").agg(
        pl.count("order_id").alias("num_orders")
    )
    
    # 2. Join with clients
    df = df_customers.join(orders_count, on="customer_id", how="left")
    
    # 3. Replace nulls with 0
    df = df.with_columns([
        pl.col("num_orders").fill_null(0)
    ])
    
    # 4. Categorize clients by amount of orders
    df = df.with_columns([
        pl.when(pl.col("num_orders") < 5)
          .then(pl.lit("bronze"))
          .when((pl.col("num_orders") >= 5) & (pl.col("num_orders") < 10))
          .then(pl.lit("silver"))
          .when((pl.col("num_orders") >= 10) & (pl.col("num_orders") < 20))
          .then(pl.lit("gold"))
          .otherwise(pl.lit("diamond"))
          .alias("category")
    ])

    return df

df_customers_2023 = transform_customers(df_customers_2023, df_orders_2023)
df_customers_2024 = transform_customers(df_customers_2024, df_orders_2024)
df_customers_2025 = transform_customers(df_customers_2025, df_orders_2025)

## **Writing clean data to Gold Layer**

In [13]:
df_employees_2023.write_parquet(f"{gold_path}/employees/employees_2023.parquet")
df_employees_2024.write_parquet(f"{gold_path}/employees/employees_2024.parquet")
df_employees_2025.write_parquet(f"{gold_path}/employees/employees_2025.parquet")

df_customers_2023.write_parquet(f"{gold_path}/customers/customers_2023.parquet")
df_customers_2024.write_parquet(f"{gold_path}/customers/customers_2024.parquet")
df_customers_2025.write_parquet(f"{gold_path}/customers/customers_2025.parquet")

df_products_2023.write_parquet(f"{gold_path}/products/products_2023.parquet")
df_products_2024.write_parquet(f"{gold_path}/products/products_2024.parquet")
df_products_2025.write_parquet(f"{gold_path}/products/products_2025.parquet")

df_orders_2023.write_parquet(f"{gold_path}/orders/orders_2023.parquet")
df_orders_2024.write_parquet(f"{gold_path}/orders/orders_2024.parquet")
df_orders_2025.write_parquet(f"{gold_path}/orders/orders_2025.parquet")