In [None]:
# %%

# imports and constants

from typing import Any, Union
import polars as pl
from pydantic import BaseModel
import requests
from datetime import date
from loguru import logger
import json

CATEGORY_TO_SALES_REP: dict[str, str]
CATEGORY_TO_SALES_REP_FILE_NAME: str = 'category_to_sales_rep.json'
with open(CATEGORY_TO_SALES_REP_FILE_NAME) as f:
    CATEGORY_TO_SALES_REP = json.load(f)
    logger.info(
        f'Loaded {CATEGORY_TO_SALES_REP_FILE_NAME} into CATEGORY_TO_SALES_REP constant')

In [None]:
# %%

# WRITE API WRAPPER
# DOCS about the API you can find under https://valueworks-case-study-service-ggaxa6fhg7apbsd6.germanywestcentral-01.azurewebsites.net/docs


class APIWrapper:
    API_URL: str = 'https://valueworks-case-study-service-ggaxa6fhg7apbsd6.germanywestcentral-01.azurewebsites.net'
    OBJECTS: dict[str, Any] = {
        'product': {
            'api_name': 'products',
            'columns': ['product_id', 'product_name', 'category', 'price']
        },
        'order': {
            'api_name': 'orders',
            'columns': ['order_id', 'customer_id', 'order_date']
        },
        'order_item': {
            'api_name': 'order_items',
            'columns': ['order_item_id', 'order_id', 'product_id', 'quantity']
        }
    }

    def __init__(self, username: str, password: str) -> None:
        self.session = requests.Session()
        self.session.auth = (username, password)

    def get_all_rows_of_object(self, object: str) -> list[dict[str, Any]]:
        # TODO: write the logic to retrieve all rows for a single object using the paginated endpoints

        length_data = "Should be a number: the number of rows for the object queried"
        total = "Should be a number: the number of rows for the object reported by the API"

        # this check should make sure all the data is returned
        if length_data != total:
            logger.error(
                f'Data length of retrieved data is not equal to total for {object}')
            return []
        logger.success(f'Successfully retrieved all data for {object}')

        # return the rows as a list of dictionaries
        # return out

In [None]:
# %%

# QUERY THE DATA


USERNAME: str = "I WILL GIVE YOU THE USERNAME"
PASSWORD: str = "I WILL GIVE YOU THE PASSWORD"

api = APIWrapper(username=USERNAME, password=PASSWORD)

# for each object in api.OBJECTS call the get_all_rows_of_object and store it in a dictionary called data
# data will look like this then:
# data = {
#     'product': [...the rows of products],
#     'order': [...the rows of orders],
#     'order_item': [...the rows of order_items],
# }

data: dict[str, list[dict[str, Any]]] = {}

In [None]:
# %%

# TRANSFORM THE DATA

# THIS IS THE DESIRED SCHEMA


class TransformedOrderItemsObject(BaseModel):
    month: date
    order_item_id: int
    order_id: int
    customer_id: int
    order_date: date
    product_id: int
    product_name: str
    # product_category = category from product renamed to product_category
    product_category: str
    # unit_price = price from product renamed to unit_price
    unit_price: float
    quantity: int
    # item_total = quantity * price from product
    item_total: float
    # sales_rep = for that there is the static mapping from product_category to sales rep specified in CATEGORY_TO_SALES_REP
    sales_rep: str


# load the polars dataframes
# TODO: uncomment the following three lines
# df_p = pl.DataFrame(data['product'])
# df_o = pl.DataFrame(data['order'])
# df_oi = pl.DataFrame(data['order_item'])


# TODO: Perform the join and transformation of existing fields
transformed_df = 'TODO'

In [None]:
# %%

# TODO: Add the column sales_rep info to df using CATEGORY_TO_SALES_REP which is product category to sales rep mapping

transformed_df = "TODO"

In [None]:
# %%

# THIS IS JUST A CHECK TO MAKE SURE THE DATA IS TRANSFORMED CORRECTLY
# TODO: uncomment the code and execute it
# transformed_data = transformed_df.to_dicts()


# Create a list of TransformedOrderItemsObject instances to make sure schema is correct
# it will fail in case the schema is incorrect
# TODO: uncomment the code and execute it

# transformed_objects = [TransformedOrderItemsObject(
#     **item) for item in transformed_data]

In [None]:
# %%

# Now perform the aggregation using Polars methods to achieve the same result
result_df = "TODO"

# TODO: uncomment the code and execute it
# print(result_df)