In [None]:
import os

import pandas as pd
from sqlalchemy import create_engine

# Creating input files for the forecast application

This notebook creates the parquet files for the forecast application we explain in Chapter 7 of Getting Started with Taipy.

**In Chapter 7, this step is optional, so if you don't have a PostgreSQL database or don't want to reproduce this step.**

**This notebook's exit is a set of four parquet files. Those files are added to the repository if you don't want to reproduce this step. You can find them in `/src/data`.**

First, you need to create the database. In our case, we named it `adventure_works_dw`("dw" stand for "Data Warehouse").

The files to create the tables and insert all the necessary data are in a [dedicated GitHub repository](https://github.com/enarroied/AdventureWorksSimplified). You need to run two scripts (the insert script may take a little time, like minutes maybe). And that's it.

In ordet to make this notebook work, you'll need to add your credentials in the cell below. If your credentials are right and you installed all the necessary libraries from `requirements.txt`, this notebook should run fine!

In [None]:
# Parameters (add them here, you should not have to change any other cell in the notebook):

username = "postgres"  # Enter your username, here we used the default "postgres"
password = os.getenv(
    "PG_PASSWORD"
)  # Enter your password, we suggest using an environment variable
host = "localhost"
port = "5432"
database = "adventure_works_dw"

In [None]:
def create_connector(username, password, host, port, database):
    # SQLAlchemy connection string
    connection_string = (
        f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}"
    )

    engine = create_engine(connection_string)
    return engine


connector = create_connector(
    username=username,
    password=password,
    host=host,
    port=port,
    database=database,
)


def sql_to_df(query, connector=connector):
    df = pd.read_sql_query(query, con=connector)
    return df

## Import data for the Sales Forecast Application

First, retrive data from the database:

* We only want bikes, which have a product sub-category in 1 (mountain bikes), 2 (road bikes) and 3 (touring bikes).
* We retrieve the columns we need for the app, not more. We clean the data by trimming the columns with a `CHAR` type, and we transform the columns for better visualizations (day of week as 2-letter codes) and sub-categories as text instead of codes (*note that the original -complete_ AdventureWorksDW database has a subcategory table, here we just add a `CASE WHEN` statement to handle it*).


In [None]:
df_sales = sql_to_df(
    """
SELECT
	fulldatealternatekey AS date,
	CASE
		WHEN daynumberofweek = 1 THEN 'Su'
		WHEN daynumberofweek = 2 THEN 'Mo'
		WHEN daynumberofweek = 3 THEN 'Tu'
		WHEN daynumberofweek = 4 THEN 'We'
		WHEN daynumberofweek = 5 THEN 'Th'
		WHEN daynumberofweek = 6 THEN 'Fr'
		WHEN daynumberofweek = 7 THEN 'Sa'
		ELSE '??'
	END AS day,
	productalternatekey AS product,
	CASE 
		WHEN productsubcategorykey = 1 THEN 'Mountain'
		WHEN productsubcategorykey = 2 THEN 'Road'
		WHEN productsubcategorykey = 3 THEN 'Touring'
		ELSE 'UNKNOWN'
	END AS type,
	englishproductname AS name,
	color AS color,
	trim(style) AS style, 
	customeralternatekey AS customer,
	extract(year from birthdate) AS birth,
	CASE WHEN extract(year from birthdate) > 1980 THEN 'Millenial'
		WHEN extract(year from birthdate) BETWEEN 1966 AND 1980 THEN 'Gen X'
        WHEN extract(year from birthdate) BETWEEN 1945 AND 1965 THEN 'Boomers'
        WHEN extract(year from birthdate) < 1945 THEN 'Silent'
        ELSE 'Unknown'
    END AS generation,
	gender AS gender,
	unitprice AS unit_price,
	orderquantity AS items,
	unitprice * orderquantity AS sales
FROM
    factinternetsales
	JOIN dimproduct ON dimproduct.productkey = factinternetsales.productkey
	JOIN dimdate on  dimdate.datekey = factinternetsales.orderdatekey
	JOIN dimcustomer on dimcustomer.customerkey = factinternetsales.customerkey
WHERE 
	productsubcategorykey IN (1, 2, 3)"""
)

In [None]:
df_sales.sample(4)

In [None]:
df_sales.dtypes

In [None]:
# DataFrame size before type transformation
print(
    f"DataFrame size before changing data types: {round(df_sales.memory_usage(deep=True).sum()/(1024*1024), 2)} Mb"
)

In [None]:
df_sales = df_sales.astype(
    {
        "date": "datetime64[ns]",
        "day": "category",
        "product": "category",
        "type": "category",
        "name": str,
        "color": "category",
        "style": "category",
        "customer": "category",
        "birth": "int",
        "generation": str,
        "gender": "category",
        "unit_price": "float",
        "items": "int",
        "sales": "float",
    }
)

In [None]:
print(
    f"DataFrame size after changing data types: {round(df_sales.memory_usage(deep=True).sum()/(1024*1024), 2)} Mb"
)

## Explorations

We can plot some of the data to see how it's distributed, and look for anomalies.

In [None]:
df_sales[df_sales["sales"] < 0]

In [None]:
df_sales.isnull().any().any()

We don't have negative sales, and we don't have missing values, this dataset is rather clean.

Let's look at sales evolution:

In [None]:
def group_by_dimensions_and_facts(df, dimension_columns, orderby="sales"):
    """
    Groups a DataFrame by specified dimension columns, always aggregating:
        - Count by the "quantity" column.
        - Sum the "total_sale" column.

    Args:
        df (pd.DataFrame): The input DataFrame to group.
        dimension_columns (list of str): List of column names to group by (dimensions).
        orderby (str | list of str): columns name or list of columns to order the DataFrame. Defaults to "sales"


    Returns:
        pd.DataFrame: A grouped DataFrame with the sum of "total_sale" and count of "quantity" for each combination of dimensions.
    """
    df_copy = df.copy()

    df_grouped = df_copy.groupby(dimension_columns, observed=True).agg(
        sales=("sales", "sum"), items=("items", "count")
    )

    df_grouped = df_grouped.sort_values(by=orderby, ascending=False)

    # Format and return the DataFrame
    df_grouped = df_grouped.round(2)
    df_grouped = df_grouped.reset_index()
    return df_grouped

In [None]:
df_sales_by_date = group_by_dimensions_and_facts(df_sales, ["date"], orderby="date")
df_sales_by_date.head(2)

In [None]:
df_sales_by_date.plot(x="date", y="sales", kind="scatter")

## Create a simplified DataFrame

By creating a smaller DataFrame, we can increase the efficiency of the application.

We'll still use the bigger DataFrame to display data, in a table that doesn't update. But we can create a second subset, with less columns and some level of pre-aggregation to reduce the required effort to aggreate data.

We remove the following columns, that we won't use for aggregation:

* product (the product ID, we'll use the "name", it's less efficient, but better for end users).
* customer (the customer ID).
* birth (we won't aggregate by birthdate -not enough data-, but by "generation").

In [None]:
len(df_sales)

In [None]:
df_sales_simplified = group_by_dimensions_and_facts(
    df_sales,
    [
        "date",
        "day",
        "type",
        "name",
        "color",
        "style",
        "generation",
        "gender",
        "unit_price",
    ],
)
df_sales_simplified.head()

In [None]:
len(df_sales_simplified)

### Aggregate by customer type

This will only be used for table display, we'll use the simplified DataFrame for charts.

In [None]:
df_sales_by_customer = group_by_dimensions_and_facts(
    df_sales, ["customer", "birth", "generation", "gender"]
)


df_sales_by_customer.head()

### Aggregate by product type

This will only be used for table display, we'll use the simplified DataFrame for charts

In [None]:
df_sales_by_product = group_by_dimensions_and_facts(
    df_sales,
    [
        "product",
        "name",
        "type",
        "color",
        "style",
        "unit_price",
    ],
)
# We use the maximum unit_price value
df_sales_by_product = (
    df_sales_by_product.groupby(
        ["product", "name", "type", "color", "style"], observed=True
    )
    .agg({"unit_price": "max", "sales": "sum", "items": "sum"})
    .sort_values(by="sales", ascending=False)
    .round(2)
    .reset_index()
)

df_sales_by_product.head()

## Save DataFrames

We save each DataFrame:

* As CSV, for reference and so we can have a quick way to inspect data manually when we want to debug or as we build our dashboard, or to check some data with our clients.
* As a parquet file for our application, it's a more efficient format.

In [None]:
# Save csv files

df_sales.to_csv("./csv_files/sales.csv", index=False)
df_sales_simplified.to_csv("./csv_files/sales_simplified.csv", index=False)
df_sales_by_customer.to_csv("./csv_files/sales_by_customer.csv", index=False)
df_sales_by_product.to_csv("./csv_files/sales_by_product.csv", index=False)

In [None]:
os.makedirs("../src/data", exist_ok=True)

In [None]:
df_sales.to_parquet("../src/data/sales.parquet", index=False)
df_sales_simplified.to_parquet("../src/data/sales_simplified.parquet", index=False)
df_sales_by_customer.to_parquet("../src/data/sales_by_customer.parquet", index=False)
df_sales_by_product.to_parquet("../src/data/sales_by_product.parquet", index=False)