In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, monotonically_increasing_id
from delta import *
from google.cloud import storage
from google.oauth2 import service_account
import pandas as pd
from support.alcmy_conn import engine, text

import os
import logging

from support.schemas import sales_schema, chn_group_schema

### Extracting data from the source (Bronze Layer)

All functions used the cell below:

In [33]:
def extract_source_data(
    path: str = "./source_data",
) -> list:
    """Basic function to list all basename files inside a directory.

    Args:
        path (str, optional): Path where we want to catch all files inside. Defaults to "./source_data".

    Returns:
        list: Return a list contanining all files basename on the source path inputed.
    """
    
    files = [os.path.join(path, x) for x in os.listdir(path)]

    return files


def load_to_gcs_data(files: list, path_bucket) -> None:
    """Function responsible for load infos to GCP bucket.

    Args:
        files (list): List which come all files basename where we want to 
        upload to bucket(Mainly comming infos from 'extract_source_data' function).

    Returns:
        String status with run sucessfully or not.
    """
    
    try:
        bucket_name = "poc_bucket_gcp"
        credentials = service_account.Credentials.from_service_account_file(
            "./support/key.json"
        )
        client = storage.Client(credentials=credentials)
        bucket = client.get_bucket(bucket_name)

        for file in files:
            print(file)
            logging.debug(f"File: {file} started the upload session")  #! Debug line
            file_name = os.path.basename(file)
            dest_path = os.path.join(path_bucket, file_name)

            blob = bucket.blob(dest_path)
            blob.upload_from_filename(file)
            logging.debug(f"File: {file} finished the upload session")  #! Debug line

        logging.info("Data files uploaded sucessfully")
        return "load_sucess"

    except Exception as error:
        logging.error(error)
        return "load_fail"

In [18]:
"""
Basically this cell get the data from the source and then make the ingestion on the GCP bucket
"""

files = extract_source_data()

load_to_gcs_data(files, "Bronze")

'load_sucess'

### Transform the data and prepare for the final layer (Silver Layer)

All functions used the cell below:

In [19]:
def get_data( client: object, bucket_name: str ,path: str, dest_path: str, mode: str = "r") -> None:
    """_summary_

    Args:
        client (object): _description_
        bucket_name (str): _description_
        path (str): _description_
        dest_path (str): _description_
        mode (str, optional): _description_. Defaults to "r".
    """
    
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=path)
    for blob in blobs:
        if blob.name.endswith(".csv"):
            file_name = os.path.join(dest_path, blob.name.split("/")[-1])
            blob.download_to_filename(file_name)
    return

def transform_sales_df(client: object, spark_instance: object) -> None:
    """_summary_

    Args:
        client (object): _description_
        spark_instance (object): _description_
        schema_model (object): _description_
    """
    
    file_path = [x for x in extract_source_data("./tmp/to_process/") if 'sales' in x][0]

    spark_df = (
        spark_instance.createDataFrame(
            pd.read_csv(file_path, sep="\t", encoding="utf-16le"),
            schema = sales_schema,
        )
    ).withColumn("DATE", to_date("DATE", "d/M/yyyy"))
        
    return spark_df


def transform_chnl_df(client: object, spark_instance: object) -> None:
    """_summary_

    Args:
        client (object): _description_
        spark_instance (object): _description_
        schema_model (object): _description_
    """
    
    file_path = [x for x in extract_source_data("./tmp/to_process/") if 'channel' in x][0]

    spark_df = (
        spark_instance
        .read
        .schema(chn_group_schema)
        .csv(file_path)
    )
        
    return spark_df

Silver Layer

In [20]:
credentials = service_account.Credentials.from_service_account_file(
    "support/key.json"
)
client = storage.Client(credentials=credentials)

spark = (
    SparkSession.builder.appName("Winair data process")
    .config("spark.python.profile.memory", True)
    .config("spark.executor.memory", "2g")
    .config("spark.driver.memory", "2g")
    .config("spark.sql.shuffle.partitions", "4")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
).getOrCreate()

In [21]:
get_data(client, bucket_name="poc_bucket_gcp", path="Bronze/", dest_path="./tmp/to_process/")

sales_df = transform_sales_df(client, spark)
chn_group_df = transform_chnl_df(client, spark)

In [22]:
for col in sales_df.columns:
    sales_df = (sales_df.withColumnRenamed(col, col.lower()))

for col in chn_group_df.columns:
    chn_group_df = (chn_group_df.withColumnRenamed(col, col.lower()))

sales_df.show(10)

+----------+-------------+-----------+-------------------+------------------+--------------------+-------+------------+-------------+--------+----+-----+------+
|      date|ce_brand_flvr|   brand_nm|btlr_org_lvl_c_desc|        chnl_group|     trade_chnl_desc|pkg_cat|pkg_cat_desc|  tsr_pckg_nm|$ volume|year|month|period|
+----------+-------------+-----------+-------------------+------------------+--------------------+-------+------------+-------------+--------+----+-----+------+
|2006-01-01|         3440|      LEMON|             CANADA|           LEISURE|         SPORT VENUE|   N20O|   20Z/600ML|.591L NRP 24L|   22.48|2006|    1|     1|
|2006-01-01|         3440|      LEMON|          NORTHEAST|            SUPERS|           SUPERETTE|   N20O|   20Z/600ML|  20Z NRP 24L|   100.0|2006|    1|     1|
|2006-01-01|         3554| STRAWBERRY|          SOUTHEAST|         WORKPLACE|      PLANT / OFFICE|   N20O|   20Z/600ML|  20Z NRP 24L|   66.14|2006|    1|     1|
|2006-01-01|         3441|  RASPBE

##### Channel group dimension creation:

In [23]:
chn_group_dim = chn_group_df.withColumn("id", monotonically_increasing_id())
chn_group_dim.show(5)

+-----------------+----------------+---------------+---+
|  trade_chnl_desc|trade_group_desc|trade_type_desc| id|
+-----------------+----------------+---------------+---+
|  TRADE_CHNL_DESC|TRADE_GROUP_DESC|TRADE_TYPE_DESC|  0|
|      SPORT VENUE|   ENTERTAINMENT|      ALCOHOLIC|  1|
|        SUPERETTE|        SERVICES|            MIX|  2|
|   PLANT / OFFICE|        SERVICES|            MIX|  3|
|MASS MERCHANDISER|         GROCERY|            MIX|  4|
+-----------------+----------------+---------------+---+
only showing top 5 rows



##### Pkg_cat dimension creation:

In [24]:
pkg_cat_dim = sales_df.select("pkg_cat", "pkg_cat_desc").distinct().withColumn("id", monotonically_increasing_id())
pkg_cat_dim.show()

+-------+------------+---+
|pkg_cat|pkg_cat_desc| id|
+-------+------------+---+
|   N56P|    500ML 6P|  0|
|   N20O|   20Z/600ML|  1|
|   N128|12Z/355M 8NR|  2|
+-------+------------+---+



##### Adjusting the fact table:

In [25]:
fact_table = (
    sales_df.join(
        chn_group_dim.withColumnRenamed("id", "trade_id"), 
        sales_df.trade_chnl_desc==chn_group_dim.trade_chnl_desc
    )
    .drop("trade_chnl_desc", "trade_group_desc", "trade_type_desc")
)

fact_table = (
    fact_table.join(
        pkg_cat_dim.withColumnRenamed("id", "pkg_id"),
        sales_df.pkg_cat==pkg_cat_dim.pkg_cat
    )
    .drop("pkg_cat", "pkg_cat_desc").withColumnRenamed("$ Volume", "volume")
)

In [26]:
fact_table.show(5)

+----------+-------------+-----------+-------------------+-----------------+-------------+------+----+-----+------+--------+------+
|      date|ce_brand_flvr|   brand_nm|btlr_org_lvl_c_desc|       chnl_group|  tsr_pckg_nm|volume|year|month|period|trade_id|pkg_id|
+----------+-------------+-----------+-------------------+-----------------+-------------+------+----+-----+------+--------+------+
|2006-01-01|         3440|      LEMON|             CANADA|          LEISURE|.591L NRP 24L| 22.48|2006|    1|     1|       1|     1|
|2006-01-01|         3440|      LEMON|          NORTHEAST|           SUPERS|  20Z NRP 24L| 100.0|2006|    1|     1|       2|     1|
|2006-01-01|         3554| STRAWBERRY|          SOUTHEAST|        WORKPLACE|  20Z NRP 24L| 66.14|2006|    1|     1|       3|     1|
|2006-01-01|         3441|  RASPBERRY|            MIDWEST|MASS MERCHANDISER|  20Z NRP 24L| 222.5|2006|    1|     1|       4|     1|
|2006-01-01|         3440|      LEMON|               WEST|MASS MERCHANDISER|

In [29]:
chn_group_dim.write.mode("overwrite").parquet("/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/chn_group_dim.parquet")
pkg_cat_dim.write.mode("overwrite").parquet("/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/pkg_cat_dim.parquet")
fact_table.write.mode("overwrite").parquet("/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/fact_table.parquet")

In [38]:
load_to_gcs_data(extract_source_data("/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/chn_group_dim.parquet"), "Silver/chn_group_dim")
load_to_gcs_data(extract_source_data("/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/pkg_cat_dim.parquet"), "Silver/pkg_cat_dim")
load_to_gcs_data(extract_source_data("/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/fact_table.parquet"), "Silver/fact_table")

/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/chn_group_dim.parquet/_SUCCESS
/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/chn_group_dim.parquet/part-00000-f49f18a5-61ca-47c8-948e-0ac551525c7c-c000.snappy.parquet
/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/chn_group_dim.parquet/._SUCCESS.crc
/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/chn_group_dim.parquet/.part-00000-f49f18a5-61ca-47c8-948e-0ac551525c7c-c000.snappy.parquet.crc
/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/pkg_cat_dim.parquet/part-00000-bb783ff5-83a0-4e98-a0d2-4d96fb998a13-c000.snappy.parquet
/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/pkg_cat_dim.parquet/_SUCCESS
/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/pkg_cat_dim.parquet/.part-00000-bb783ff5-83a0-4e98-a0d2-4d96fb998a13-c000.snappy.parquet.crc
/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/pkg_cat_dim.parquet/._SUCCESS.crc
/home/nathan/Projetos/pessoal/Inbev_Case/tmp/processed/fact_tabl

'load_sucess'

##### Insert data into postgres tables:

In [12]:
pd_chn_dim = chn_group_dim.toPandas()
pd_pkg_dim = pkg_cat_dim.toPandas()
pd_fact_tb = fact_table.toPandas()

In [13]:
pd_fact_tb.to_sql("fact_sales", engine, if_exists='replace', index=False)
pd_chn_dim.to_sql("trade_chnl_dim", engine, if_exists='replace', index=False)
pd_pkg_dim.to_sql("pkg_dim", engine, if_exists='replace', index=False)

2024-02-04 22:39:12,017 INFO sqlalchemy.engine.Engine select pg_catalog.version()


INFO:sqlalchemy.engine.Engine:select pg_catalog.version()


2024-02-04 22:39:12,018 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2024-02-04 22:39:12,019 INFO sqlalchemy.engine.Engine select current_schema()


INFO:sqlalchemy.engine.Engine:select current_schema()


2024-02-04 22:39:12,020 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2024-02-04 22:39:12,021 INFO sqlalchemy.engine.Engine show standard_conforming_strings


INFO:sqlalchemy.engine.Engine:show standard_conforming_strings


2024-02-04 22:39:12,022 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2024-02-04 22:39:12,024 INFO sqlalchemy.engine.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.Engine:BEGIN (implicit)


2024-02-04 22:39:12,025 INFO sqlalchemy.engine.Engine select 'Hello'


INFO:sqlalchemy.engine.Engine:select 'Hello'


2024-02-04 22:39:12,025 INFO sqlalchemy.engine.Engine [generated in 0.00145s] {}


INFO:sqlalchemy.engine.Engine:[generated in 0.00145s] {}


[('Hello',)]
2024-02-04 22:39:12,027 INFO sqlalchemy.engine.Engine ROLLBACK


INFO:sqlalchemy.engine.Engine:ROLLBACK


2024-02-04 22:39:12,029 INFO sqlalchemy.engine.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.Engine:BEGIN (implicit)


2024-02-04 22:39:12,035 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


2024-02-04 22:39:12,036 INFO sqlalchemy.engine.Engine [generated in 0.00154s] {'table_name': 'fact_sales', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00154s] {'table_name': 'fact_sales', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


2024-02-04 22:39:12,040 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


2024-02-04 22:39:12,041 INFO sqlalchemy.engine.Engine [cached since 0.005919s ago] {'table_name': 'fact_sales', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


INFO:sqlalchemy.engine.Engine:[cached since 0.005919s ago] {'table_name': 'fact_sales', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


2024-02-04 22:39:12,045 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s]) AND pg_catalog.pg_class.relpersistence != %(relpersistence_1)s AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s]) AND pg_catalog.pg_class.relpersistence != %(relpersistence_1)s AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


2024-02-04 22:39:12,046 INFO sqlalchemy.engine.Engine [generated in 0.00133s] {'param_1': 'r', 'param_2': 'p', 'relpersistence_1': 't', 'nspname_1': 'pg_catalog'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00133s] {'param_1': 'r', 'param_2': 'p', 'relpersistence_1': 't', 'nspname_1': 'pg_catalog'}


2024-02-04 22:39:12,050 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s]) AND pg_catalog.pg_class.relpersistence != %(relpersistence_1)s AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s]) AND pg_catalog.pg_class.relpersistence != %(relpersistence_1)s AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


2024-02-04 22:39:12,050 INFO sqlalchemy.engine.Engine [generated in 0.00083s] {'param_1': 'v', 'relpersistence_1': 't', 'nspname_1': 'pg_catalog'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00083s] {'param_1': 'v', 'relpersistence_1': 't', 'nspname_1': 'pg_catalog'}


2024-02-04 22:39:12,053 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s]) AND pg_catalog.pg_class.relpersistence != %(relpersistence_1)s AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s]) AND pg_catalog.pg_class.relpersistence != %(relpersistence_1)s AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


2024-02-04 22:39:12,053 INFO sqlalchemy.engine.Engine [cached since 0.003646s ago] {'param_1': 'm', 'relpersistence_1': 't', 'nspname_1': 'pg_catalog'}


INFO:sqlalchemy.engine.Engine:[cached since 0.003646s ago] {'param_1': 'm', 'relpersistence_1': 't', 'nspname_1': 'pg_catalog'}


2024-02-04 22:39:12,058 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_attribute.attname AS name, pg_catalog.format_type(pg_catalog.pg_attribute.atttypid, pg_catalog.pg_attribute.atttypmod) AS format_type, (SELECT pg_catalog.pg_get_expr(pg_catalog.pg_attrdef.adbin, pg_catalog.pg_attrdef.adrelid) AS pg_get_expr_1 
FROM pg_catalog.pg_attrdef 
WHERE pg_catalog.pg_attrdef.adrelid = pg_catalog.pg_attribute.attrelid AND pg_catalog.pg_attrdef.adnum = pg_catalog.pg_attribute.attnum AND pg_catalog.pg_attribute.atthasdef) AS "default", pg_catalog.pg_attribute.attnotnull AS not_null, pg_catalog.pg_class.relname AS table_name, pg_catalog.pg_description.description AS comment, pg_catalog.pg_attribute.attgenerated AS generated, (SELECT json_build_object(%(json_build_object_2)s, pg_catalog.pg_attribute.attidentity = %(attidentity_1)s, %(json_build_object_3)s, pg_catalog.pg_sequence.seqstart, %(json_build_object_4)s, pg_catalog.pg_sequence.seqincrement, %(json_build_object_5)s, pg_catalog.pg_seque

INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_attribute.attname AS name, pg_catalog.format_type(pg_catalog.pg_attribute.atttypid, pg_catalog.pg_attribute.atttypmod) AS format_type, (SELECT pg_catalog.pg_get_expr(pg_catalog.pg_attrdef.adbin, pg_catalog.pg_attrdef.adrelid) AS pg_get_expr_1 
FROM pg_catalog.pg_attrdef 
WHERE pg_catalog.pg_attrdef.adrelid = pg_catalog.pg_attribute.attrelid AND pg_catalog.pg_attrdef.adnum = pg_catalog.pg_attribute.attnum AND pg_catalog.pg_attribute.atthasdef) AS "default", pg_catalog.pg_attribute.attnotnull AS not_null, pg_catalog.pg_class.relname AS table_name, pg_catalog.pg_description.description AS comment, pg_catalog.pg_attribute.attgenerated AS generated, (SELECT json_build_object(%(json_build_object_2)s, pg_catalog.pg_attribute.attidentity = %(attidentity_1)s, %(json_build_object_3)s, pg_catalog.pg_sequence.seqstart, %(json_build_object_4)s, pg_catalog.pg_sequence.seqincrement, %(json_build_object_5)s, pg_catalog.pg_sequence.seqmin, %(json_build

2024-02-04 22:39:12,059 INFO sqlalchemy.engine.Engine [generated in 0.00126s] {'json_build_object_2': 'always', 'attidentity_1': 'a', 'json_build_object_3': 'start', 'json_build_object_4': 'increment', 'json_build_object_5': 'minvalue', 'json_build_object_6': 'maxvalue', 'json_build_object_7': 'cache', 'json_build_object_8': 'cycle', 'attidentity_2': '', 'attnum_1': 0, 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00126s] {'json_build_object_2': 'always', 'attidentity_1': 'a', 'json_build_object_3': 'start', 'json_build_object_4': 'increment', 'json_build_object_5': 'minvalue', 'json_build_object_6': 'maxvalue', 'json_build_object_7': 'cache', 'json_build_object_8': 'cycle', 'attidentity_2': '', 'attnum_1': 0, 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


2024-02-04 22:39:12,070 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_type.typname AS name, pg_catalog.format_type(pg_catalog.pg_type.typbasetype, pg_catalog.pg_type.typtypmod) AS attype, NOT pg_catalog.pg_type.typnotnull AS nullable, pg_catalog.pg_type.typdefault AS "default", pg_catalog.pg_type_is_visible(pg_catalog.pg_type.oid) AS visible, pg_catalog.pg_namespace.nspname AS schema, domain_constraints.condefs, domain_constraints.connames 
FROM pg_catalog.pg_type JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_type.typnamespace LEFT OUTER JOIN (SELECT pg_catalog.pg_constraint.contypid AS contypid, array_agg(pg_catalog.pg_get_constraintdef(pg_catalog.pg_constraint.oid, %(pg_get_constraintdef_1)s)) AS condefs, array_agg(CAST(pg_catalog.pg_constraint.conname AS TEXT)) AS connames 
FROM pg_catalog.pg_constraint 
WHERE pg_catalog.pg_constraint.contypid != %(contypid_1)s GROUP BY pg_catalog.pg_constraint.contypid) AS domain_constraints ON pg_catalog.pg_type.

INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_type.typname AS name, pg_catalog.format_type(pg_catalog.pg_type.typbasetype, pg_catalog.pg_type.typtypmod) AS attype, NOT pg_catalog.pg_type.typnotnull AS nullable, pg_catalog.pg_type.typdefault AS "default", pg_catalog.pg_type_is_visible(pg_catalog.pg_type.oid) AS visible, pg_catalog.pg_namespace.nspname AS schema, domain_constraints.condefs, domain_constraints.connames 
FROM pg_catalog.pg_type JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_type.typnamespace LEFT OUTER JOIN (SELECT pg_catalog.pg_constraint.contypid AS contypid, array_agg(pg_catalog.pg_get_constraintdef(pg_catalog.pg_constraint.oid, %(pg_get_constraintdef_1)s)) AS condefs, array_agg(CAST(pg_catalog.pg_constraint.conname AS TEXT)) AS connames 
FROM pg_catalog.pg_constraint 
WHERE pg_catalog.pg_constraint.contypid != %(contypid_1)s GROUP BY pg_catalog.pg_constraint.contypid) AS domain_constraints ON pg_catalog.pg_type.oid = domain_constraints

2024-02-04 22:39:12,071 INFO sqlalchemy.engine.Engine [generated in 0.00114s] {'pg_get_constraintdef_1': True, 'contypid_1': 0, 'typtype_1': 'd'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00114s] {'pg_get_constraintdef_1': True, 'contypid_1': 0, 'typtype_1': 'd'}


2024-02-04 22:39:12,078 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_type.typname AS name, pg_catalog.pg_type_is_visible(pg_catalog.pg_type.oid) AS visible, pg_catalog.pg_namespace.nspname AS schema, lbl_agg.labels AS labels 
FROM pg_catalog.pg_type JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_type.typnamespace LEFT OUTER JOIN (SELECT pg_catalog.pg_enum.enumtypid AS enumtypid, array_agg(CAST(pg_catalog.pg_enum.enumlabel AS TEXT) ORDER BY pg_catalog.pg_enum.enumsortorder) AS labels 
FROM pg_catalog.pg_enum GROUP BY pg_catalog.pg_enum.enumtypid) AS lbl_agg ON pg_catalog.pg_type.oid = lbl_agg.enumtypid 
WHERE pg_catalog.pg_type.typtype = %(typtype_1)s ORDER BY pg_catalog.pg_namespace.nspname, pg_catalog.pg_type.typname


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_type.typname AS name, pg_catalog.pg_type_is_visible(pg_catalog.pg_type.oid) AS visible, pg_catalog.pg_namespace.nspname AS schema, lbl_agg.labels AS labels 
FROM pg_catalog.pg_type JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_type.typnamespace LEFT OUTER JOIN (SELECT pg_catalog.pg_enum.enumtypid AS enumtypid, array_agg(CAST(pg_catalog.pg_enum.enumlabel AS TEXT) ORDER BY pg_catalog.pg_enum.enumsortorder) AS labels 
FROM pg_catalog.pg_enum GROUP BY pg_catalog.pg_enum.enumtypid) AS lbl_agg ON pg_catalog.pg_type.oid = lbl_agg.enumtypid 
WHERE pg_catalog.pg_type.typtype = %(typtype_1)s ORDER BY pg_catalog.pg_namespace.nspname, pg_catalog.pg_type.typname


2024-02-04 22:39:12,080 INFO sqlalchemy.engine.Engine [generated in 0.00133s] {'typtype_1': 'e'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00133s] {'typtype_1': 'e'}


2024-02-04 22:39:12,083 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.oid, pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s AND pg_catalog.pg_class.relname IN (%(filter_names_1)s)


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.oid, pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s AND pg_catalog.pg_class.relname IN (%(filter_names_1)s)


2024-02-04 22:39:12,084 INFO sqlalchemy.engine.Engine [generated in 0.00105s] {'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00105s] {'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


2024-02-04 22:39:12,088 INFO sqlalchemy.engine.Engine SELECT attr.conrelid, array_agg(CAST(attr.attname AS TEXT) ORDER BY attr.ord) AS cols, attr.conname, min(attr.description) AS description, NULL AS extra 
FROM (SELECT con.conrelid AS conrelid, con.conname AS conname, con.conindid AS conindid, con.description AS description, con.ord AS ord, pg_catalog.pg_attribute.attname AS attname 
FROM pg_catalog.pg_attribute JOIN (SELECT pg_catalog.pg_constraint.conrelid AS conrelid, pg_catalog.pg_constraint.conname AS conname, pg_catalog.pg_constraint.conindid AS conindid, unnest(pg_catalog.pg_constraint.conkey) AS attnum, generate_subscripts(pg_catalog.pg_constraint.conkey, %(generate_subscripts_1)s) AS ord, pg_catalog.pg_description.description AS description 
FROM pg_catalog.pg_constraint LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_description.objoid = pg_catalog.pg_constraint.oid 
WHERE pg_catalog.pg_constraint.contype = %(contype)s AND pg_catalog.pg_constraint.conrelid IN (%(

INFO:sqlalchemy.engine.Engine:SELECT attr.conrelid, array_agg(CAST(attr.attname AS TEXT) ORDER BY attr.ord) AS cols, attr.conname, min(attr.description) AS description, NULL AS extra 
FROM (SELECT con.conrelid AS conrelid, con.conname AS conname, con.conindid AS conindid, con.description AS description, con.ord AS ord, pg_catalog.pg_attribute.attname AS attname 
FROM pg_catalog.pg_attribute JOIN (SELECT pg_catalog.pg_constraint.conrelid AS conrelid, pg_catalog.pg_constraint.conname AS conname, pg_catalog.pg_constraint.conindid AS conindid, unnest(pg_catalog.pg_constraint.conkey) AS attnum, generate_subscripts(pg_catalog.pg_constraint.conkey, %(generate_subscripts_1)s) AS ord, pg_catalog.pg_description.description AS description 
FROM pg_catalog.pg_constraint LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_description.objoid = pg_catalog.pg_constraint.oid 
WHERE pg_catalog.pg_constraint.contype = %(contype)s AND pg_catalog.pg_constraint.conrelid IN (%(oids_1)s)) AS con ON pg_

2024-02-04 22:39:12,089 INFO sqlalchemy.engine.Engine [generated in 0.00090s] {'generate_subscripts_1': 1, 'contype': 'p', 'oids_1': 16575}


INFO:sqlalchemy.engine.Engine:[generated in 0.00090s] {'generate_subscripts_1': 1, 'contype': 'p', 'oids_1': 16575}


2024-02-04 22:39:12,096 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname, pg_catalog.pg_constraint.conname, CASE WHEN (pg_catalog.pg_constraint.oid IS NOT NULL) THEN pg_catalog.pg_get_constraintdef(pg_catalog.pg_constraint.oid, %(pg_get_constraintdef_1)s) END AS anon_1, nsp_ref.nspname, pg_catalog.pg_description.description 
FROM pg_catalog.pg_class LEFT OUTER JOIN pg_catalog.pg_constraint ON pg_catalog.pg_class.oid = pg_catalog.pg_constraint.conrelid AND pg_catalog.pg_constraint.contype = %(contype_1)s LEFT OUTER JOIN pg_catalog.pg_class AS cls_ref ON cls_ref.oid = pg_catalog.pg_constraint.confrelid LEFT OUTER JOIN pg_catalog.pg_namespace AS nsp_ref ON cls_ref.relnamespace = nsp_ref.oid LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_description.objoid = pg_catalog.pg_constraint.oid JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3

INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname, pg_catalog.pg_constraint.conname, CASE WHEN (pg_catalog.pg_constraint.oid IS NOT NULL) THEN pg_catalog.pg_get_constraintdef(pg_catalog.pg_constraint.oid, %(pg_get_constraintdef_1)s) END AS anon_1, nsp_ref.nspname, pg_catalog.pg_description.description 
FROM pg_catalog.pg_class LEFT OUTER JOIN pg_catalog.pg_constraint ON pg_catalog.pg_class.oid = pg_catalog.pg_constraint.conrelid AND pg_catalog.pg_constraint.contype = %(contype_1)s LEFT OUTER JOIN pg_catalog.pg_class AS cls_ref ON cls_ref.oid = pg_catalog.pg_constraint.confrelid LEFT OUTER JOIN pg_catalog.pg_namespace AS nsp_ref ON cls_ref.relnamespace = nsp_ref.oid LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_description.objoid = pg_catalog.pg_constraint.oid JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param

2024-02-04 22:39:12,098 INFO sqlalchemy.engine.Engine [generated in 0.00140s] {'pg_get_constraintdef_1': True, 'contype_1': 'f', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00140s] {'pg_get_constraintdef_1': True, 'contype_1': 'f', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


2024-02-04 22:39:12,105 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_index.indrelid, cls_idx.relname AS relname_index, pg_catalog.pg_index.indisunique, pg_catalog.pg_constraint.conrelid IS NOT NULL AS has_constraint, pg_catalog.pg_index.indoption, cls_idx.reloptions, pg_catalog.pg_am.amname, CASE WHEN (pg_catalog.pg_index.indpred IS NOT NULL) THEN pg_catalog.pg_get_expr(pg_catalog.pg_index.indpred, pg_catalog.pg_index.indrelid) END AS filter_definition, pg_catalog.pg_index.indnkeyatts, pg_catalog.pg_index.indnullsnotdistinct, idx_cols.elements, idx_cols.elements_is_expr 
FROM pg_catalog.pg_index JOIN pg_catalog.pg_class AS cls_idx ON pg_catalog.pg_index.indexrelid = cls_idx.oid JOIN pg_catalog.pg_am ON cls_idx.relam = pg_catalog.pg_am.oid LEFT OUTER JOIN (SELECT idx_attr.indexrelid AS indexrelid, min(idx_attr.indrelid) AS min_1, array_agg(idx_attr.element ORDER BY idx_attr.ord) AS elements, array_agg(idx_attr.is_expr ORDER BY idx_attr.ord) AS elements_is_expr 
FROM (SELECT idx.in

INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_index.indrelid, cls_idx.relname AS relname_index, pg_catalog.pg_index.indisunique, pg_catalog.pg_constraint.conrelid IS NOT NULL AS has_constraint, pg_catalog.pg_index.indoption, cls_idx.reloptions, pg_catalog.pg_am.amname, CASE WHEN (pg_catalog.pg_index.indpred IS NOT NULL) THEN pg_catalog.pg_get_expr(pg_catalog.pg_index.indpred, pg_catalog.pg_index.indrelid) END AS filter_definition, pg_catalog.pg_index.indnkeyatts, pg_catalog.pg_index.indnullsnotdistinct, idx_cols.elements, idx_cols.elements_is_expr 
FROM pg_catalog.pg_index JOIN pg_catalog.pg_class AS cls_idx ON pg_catalog.pg_index.indexrelid = cls_idx.oid JOIN pg_catalog.pg_am ON cls_idx.relam = pg_catalog.pg_am.oid LEFT OUTER JOIN (SELECT idx_attr.indexrelid AS indexrelid, min(idx_attr.indrelid) AS min_1, array_agg(idx_attr.element ORDER BY idx_attr.ord) AS elements, array_agg(idx_attr.is_expr ORDER BY idx_attr.ord) AS elements_is_expr 
FROM (SELECT idx.indexrelid AS indexrelid, 

2024-02-04 22:39:12,106 INFO sqlalchemy.engine.Engine [generated in 0.00100s] {'attnum_1': 0, 'ord_1': 1, 'pg_get_indexdef_1': True, 'attnum_2': 0, 'generate_subscripts_1': 1, 'param_1': 'p', 'param_2': 'u', 'param_3': 'x', 'oids_1': 16575}


INFO:sqlalchemy.engine.Engine:[generated in 0.00100s] {'attnum_1': 0, 'ord_1': 1, 'pg_get_indexdef_1': True, 'attnum_2': 0, 'generate_subscripts_1': 1, 'param_1': 'p', 'param_2': 'u', 'param_3': 'x', 'oids_1': 16575}


2024-02-04 22:39:12,114 INFO sqlalchemy.engine.Engine SELECT attr.conrelid, array_agg(CAST(attr.attname AS TEXT) ORDER BY attr.ord) AS cols, attr.conname, min(attr.description) AS description, bool_and(pg_catalog.pg_index.indnullsnotdistinct) AS indnullsnotdistinct 
FROM (SELECT con.conrelid AS conrelid, con.conname AS conname, con.conindid AS conindid, con.description AS description, con.ord AS ord, pg_catalog.pg_attribute.attname AS attname 
FROM pg_catalog.pg_attribute JOIN (SELECT pg_catalog.pg_constraint.conrelid AS conrelid, pg_catalog.pg_constraint.conname AS conname, pg_catalog.pg_constraint.conindid AS conindid, unnest(pg_catalog.pg_constraint.conkey) AS attnum, generate_subscripts(pg_catalog.pg_constraint.conkey, %(generate_subscripts_1)s) AS ord, pg_catalog.pg_description.description AS description 
FROM pg_catalog.pg_constraint LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_description.objoid = pg_catalog.pg_constraint.oid 
WHERE pg_catalog.pg_constraint.contype

INFO:sqlalchemy.engine.Engine:SELECT attr.conrelid, array_agg(CAST(attr.attname AS TEXT) ORDER BY attr.ord) AS cols, attr.conname, min(attr.description) AS description, bool_and(pg_catalog.pg_index.indnullsnotdistinct) AS indnullsnotdistinct 
FROM (SELECT con.conrelid AS conrelid, con.conname AS conname, con.conindid AS conindid, con.description AS description, con.ord AS ord, pg_catalog.pg_attribute.attname AS attname 
FROM pg_catalog.pg_attribute JOIN (SELECT pg_catalog.pg_constraint.conrelid AS conrelid, pg_catalog.pg_constraint.conname AS conname, pg_catalog.pg_constraint.conindid AS conindid, unnest(pg_catalog.pg_constraint.conkey) AS attnum, generate_subscripts(pg_catalog.pg_constraint.conkey, %(generate_subscripts_1)s) AS ord, pg_catalog.pg_description.description AS description 
FROM pg_catalog.pg_constraint LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_description.objoid = pg_catalog.pg_constraint.oid 
WHERE pg_catalog.pg_constraint.contype = %(contype)s AND pg_ca

2024-02-04 22:39:12,115 INFO sqlalchemy.engine.Engine [generated in 0.00106s] {'generate_subscripts_1': 1, 'contype': 'u', 'oids_1': 16575}


INFO:sqlalchemy.engine.Engine:[generated in 0.00106s] {'generate_subscripts_1': 1, 'contype': 'u', 'oids_1': 16575}


2024-02-04 22:39:12,118 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname, pg_catalog.pg_description.description 
FROM pg_catalog.pg_class LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_class.oid = pg_catalog.pg_description.objoid AND pg_catalog.pg_description.objsubid = %(objsubid_1)s JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s AND pg_catalog.pg_class.relname IN (%(filter_names_1)s)


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname, pg_catalog.pg_description.description 
FROM pg_catalog.pg_class LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_class.oid = pg_catalog.pg_description.objoid AND pg_catalog.pg_description.objsubid = %(objsubid_1)s JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s AND pg_catalog.pg_class.relname IN (%(filter_names_1)s)


2024-02-04 22:39:12,118 INFO sqlalchemy.engine.Engine [generated in 0.00092s] {'objsubid_1': 0, 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00092s] {'objsubid_1': 0, 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


2024-02-04 22:39:12,121 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname, pg_catalog.pg_constraint.conname, CASE WHEN (pg_catalog.pg_constraint.oid IS NOT NULL) THEN pg_catalog.pg_get_constraintdef(pg_catalog.pg_constraint.oid, %(pg_get_constraintdef_1)s) END AS anon_1, pg_catalog.pg_description.description 
FROM pg_catalog.pg_class LEFT OUTER JOIN pg_catalog.pg_constraint ON pg_catalog.pg_class.oid = pg_catalog.pg_constraint.conrelid AND pg_catalog.pg_constraint.contype = %(contype_1)s LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_description.objoid = pg_catalog.pg_constraint.oid JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s AND pg_catalog.pg_class.relname IN (%(filter_names_1)s) ORDER

INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname, pg_catalog.pg_constraint.conname, CASE WHEN (pg_catalog.pg_constraint.oid IS NOT NULL) THEN pg_catalog.pg_get_constraintdef(pg_catalog.pg_constraint.oid, %(pg_get_constraintdef_1)s) END AS anon_1, pg_catalog.pg_description.description 
FROM pg_catalog.pg_class LEFT OUTER JOIN pg_catalog.pg_constraint ON pg_catalog.pg_class.oid = pg_catalog.pg_constraint.conrelid AND pg_catalog.pg_constraint.contype = %(contype_1)s LEFT OUTER JOIN pg_catalog.pg_description ON pg_catalog.pg_description.objoid = pg_catalog.pg_constraint.oid JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s AND pg_catalog.pg_class.relname IN (%(filter_names_1)s) ORDER BY pg_catalog.pg_class.

2024-02-04 22:39:12,122 INFO sqlalchemy.engine.Engine [generated in 0.00079s] {'pg_get_constraintdef_1': True, 'contype_1': 'c', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


INFO:sqlalchemy.engine.Engine:[generated in 0.00079s] {'pg_get_constraintdef_1': True, 'contype_1': 'c', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog', 'filter_names_1': 'fact_sales'}


2024-02-04 22:39:12,125 INFO sqlalchemy.engine.Engine 
DROP TABLE fact_sales


INFO:sqlalchemy.engine.Engine:
DROP TABLE fact_sales


2024-02-04 22:39:12,126 INFO sqlalchemy.engine.Engine [no key 0.00105s] {}


INFO:sqlalchemy.engine.Engine:[no key 0.00105s] {}


2024-02-04 22:39:12,131 INFO sqlalchemy.engine.Engine COMMIT


INFO:sqlalchemy.engine.Engine:COMMIT


InternalError: (psycopg2.errors.DependentObjectsStillExist) ERRO:  não pode remover tabela fact_sales porque outros objetos dependem dele
DETAIL:  visão trade_group_per_region depende de tabela fact_sales
visão brand_per_month depende de tabela fact_sales
visão brand_per_region depende de tabela fact_sales
HINT:  Utilize DROP ... CASCADE para remover os objetos dependentes também.

[SQL: 
DROP TABLE fact_sales]
(Background on this error at: https://sqlalche.me/e/20/2j85)

##### Creating views (summary tables):

In [None]:
create_trade_view = """
CREATE OR REPLACE VIEW Trade_Group_per_Region AS
    SELECT
        t_dim.trade_group_desc,
        sales.btlr_org_lvl_c_desc,
        SUM(sales.volume) AS volume
    FROM fact_sales sales
    INNER JOIN trade_chnl_dim t_dim ON
        sales.trade_id = t_dim.id
    GROUP BY
        t_dim.trade_group_desc,
        sales.btlr_org_lvl_c_desc
"""

create_brand_view = """
CREATE OR REPLACE VIEW Brand_per_Month AS
    SELECT
        brand_nm,
        month,
        SUM(volume) AS volume
    FROM fact_sales
    GROUP BY
        brand_nm,
        month
"""

create_brand_p_region_view = """
CREATE OR REPLACE VIEW Brand_per_Region AS
    SELECT
        brand_nm,
        btlr_org_lvl_c_desc,
        SUM(volume) AS volume
    FROM fact_sales
    GROUP BY
        brand_nm,
        btlr_org_lvl_c_desc
"""

with engine.connect() as conn:
    conn.execute(text(create_trade_view))
    conn.execute(text(create_brand_view))
    conn.execute(text(create_brand_p_region_view))
    conn.commit()

2024-02-04 22:33:29,832 INFO sqlalchemy.engine.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.Engine:BEGIN (implicit)


2024-02-04 22:33:29,834 INFO sqlalchemy.engine.Engine 
CREATE OR REPLACE VIEW Trade_Group_per_Region AS
    SELECT
        t_dim.trade_group_desc,
        sales.btlr_org_lvl_c_desc,
        SUM(sales.volume) AS volume
    FROM fact_sales sales
    INNER JOIN trade_chnl_dim t_dim ON
        sales.trade_id = t_dim.id
    GROUP BY
        t_dim.trade_group_desc,
        sales.btlr_org_lvl_c_desc



INFO:sqlalchemy.engine.Engine:
CREATE OR REPLACE VIEW Trade_Group_per_Region AS
    SELECT
        t_dim.trade_group_desc,
        sales.btlr_org_lvl_c_desc,
        SUM(sales.volume) AS volume
    FROM fact_sales sales
    INNER JOIN trade_chnl_dim t_dim ON
        sales.trade_id = t_dim.id
    GROUP BY
        t_dim.trade_group_desc,
        sales.btlr_org_lvl_c_desc



2024-02-04 22:33:29,836 INFO sqlalchemy.engine.Engine [generated in 0.00408s] {}


INFO:sqlalchemy.engine.Engine:[generated in 0.00408s] {}


2024-02-04 22:33:29,839 INFO sqlalchemy.engine.Engine 
CREATE OR REPLACE VIEW Brand_per_Month AS
    SELECT
        brand_nm,
        month,
        SUM(volume) AS volume
    FROM fact_sales
    GROUP BY
        brand_nm,
        month



INFO:sqlalchemy.engine.Engine:
CREATE OR REPLACE VIEW Brand_per_Month AS
    SELECT
        brand_nm,
        month,
        SUM(volume) AS volume
    FROM fact_sales
    GROUP BY
        brand_nm,
        month



2024-02-04 22:33:29,840 INFO sqlalchemy.engine.Engine [generated in 0.00110s] {}


INFO:sqlalchemy.engine.Engine:[generated in 0.00110s] {}


2024-02-04 22:33:29,842 INFO sqlalchemy.engine.Engine 
CREATE OR REPLACE VIEW Brand_per_Region AS
    SELECT
        brand_nm,
        btlr_org_lvl_c_desc,
        SUM(volume) AS volume
    FROM fact_sales
    GROUP BY
        brand_nm,
        btlr_org_lvl_c_desc



INFO:sqlalchemy.engine.Engine:
CREATE OR REPLACE VIEW Brand_per_Region AS
    SELECT
        brand_nm,
        btlr_org_lvl_c_desc,
        SUM(volume) AS volume
    FROM fact_sales
    GROUP BY
        brand_nm,
        btlr_org_lvl_c_desc



2024-02-04 22:33:29,842 INFO sqlalchemy.engine.Engine [generated in 0.00076s] {}


INFO:sqlalchemy.engine.Engine:[generated in 0.00076s] {}


2024-02-04 22:33:29,843 INFO sqlalchemy.engine.Engine COMMIT


INFO:sqlalchemy.engine.Engine:COMMIT


##### Question awnsers: 

Question 1 - What are the Top 3 Trade Groups (TRADE_GROUP_DESC) for each Region (Btlr_Org_LVL_C_Desc) in sales ($ Volume)? 

In [None]:
df_q1 = pd.read_sql("""
        WITH RankedTradeGroups AS (
        SELECT
            trade_group_desc,
            btlr_org_lvl_c_desc AS Region,
            SUM(volume) AS TotalSales,
            RANK() OVER (PARTITION BY btlr_org_lvl_c_desc ORDER BY SUM(volume) DESC) AS SalesRank
        FROM
            trade_group_per_region
        GROUP BY
            trade_group_desc, btlr_org_lvl_c_desc
        )

        SELECT
            trade_group_desc,
            Region,
            TotalSales
        FROM RankedTradeGroups
        WHERE
            SalesRank <= 3
    """,
    engine
)

2024-02-04 22:34:07,284 INFO sqlalchemy.engine.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.Engine:BEGIN (implicit)


2024-02-04 22:34:07,286 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


2024-02-04 22:34:07,287 INFO sqlalchemy.engine.Engine [cached since 168.1s ago] {'table_name': '\n        WITH RankedTradeGroups AS (\n        SELECT\n            trade_group_desc,\n            btlr_org_lvl_c_desc AS Region,\n            SUM(vol ... (274 characters truncated) ...       trade_group_desc,\n            Region,\n            TotalSales\n        FROM RankedTradeGroups\n        WHERE\n            SalesRank <= 3\n    ', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


INFO:sqlalchemy.engine.Engine:[cached since 168.1s ago] {'table_name': '\n        WITH RankedTradeGroups AS (\n        SELECT\n            trade_group_desc,\n            btlr_org_lvl_c_desc AS Region,\n            SUM(vol ... (274 characters truncated) ...       trade_group_desc,\n            Region,\n            TotalSales\n        FROM RankedTradeGroups\n        WHERE\n            SalesRank <= 3\n    ', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


2024-02-04 22:34:07,288 INFO sqlalchemy.engine.Engine 
        WITH RankedTradeGroups AS (
        SELECT
            trade_group_desc,
            btlr_org_lvl_c_desc AS Region,
            SUM(volume) AS TotalSales,
            RANK() OVER (PARTITION BY btlr_org_lvl_c_desc ORDER BY SUM(volume) DESC) AS SalesRank
        FROM
            trade_group_per_region
        GROUP BY
            trade_group_desc, btlr_org_lvl_c_desc
        )

        SELECT
            trade_group_desc,
            Region,
            TotalSales
        FROM RankedTradeGroups
        WHERE
            SalesRank <= 3
    


INFO:sqlalchemy.engine.Engine:
        WITH RankedTradeGroups AS (
        SELECT
            trade_group_desc,
            btlr_org_lvl_c_desc AS Region,
            SUM(volume) AS TotalSales,
            RANK() OVER (PARTITION BY btlr_org_lvl_c_desc ORDER BY SUM(volume) DESC) AS SalesRank
        FROM
            trade_group_per_region
        GROUP BY
            trade_group_desc, btlr_org_lvl_c_desc
        )

        SELECT
            trade_group_desc,
            Region,
            TotalSales
        FROM RankedTradeGroups
        WHERE
            SalesRank <= 3
    


2024-02-04 22:34:07,289 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2024-02-04 22:34:07,296 INFO sqlalchemy.engine.Engine ROLLBACK


INFO:sqlalchemy.engine.Engine:ROLLBACK


In [None]:
df_q1.head()

Unnamed: 0,trade_group_desc,region,totalsales
0,GROCERY,CANADA,168639.89
1,SERVICES,CANADA,83748.15
2,ACADEMIC,CANADA,53117.543
3,GROCERY,GREAT LAKES,380297.47
4,SERVICES,GREAT LAKES,157798.64


In [None]:
import plotly.express as px

fig = px.bar(df_q1, x='trade_group_desc', y='totalsales', color='region',
    title='Top 3 Trade Groups by Sales in Each Region',
    labels={'trade_group_desc': 'Trade Group', 'TotalSales': 'Total Sales ($)'}
)

# Show the plot
fig.show()

  sf: grouped.get_group(s if len(s) > 1 else s[0])


Question 2 - How much sales ($ Volume) each brand (BRAND_NM) achieved per month?

In [None]:
df_q2 = pd.read_sql("""
    SELECT
        month,
        brand_nm,
        SUM(volume) AS TotalSales
    FROM public.brand_per_month
    GROUP BY
        month,
        brand_nm
    ORDER BY
        month, 
        brand_nm
    """,
    engine
)

2024-02-04 22:34:10,300 INFO sqlalchemy.engine.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.Engine:BEGIN (implicit)


2024-02-04 22:34:10,303 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


2024-02-04 22:34:10,304 INFO sqlalchemy.engine.Engine [cached since 171.1s ago] {'table_name': '\n    SELECT\n        month,\n        brand_nm,\n        SUM(volume) AS TotalSales\n    FROM public.brand_per_month\n    GROUP BY\n        month,\n        brand_nm\n    ORDER BY\n        month, \n        brand_nm\n    ', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


INFO:sqlalchemy.engine.Engine:[cached since 171.1s ago] {'table_name': '\n    SELECT\n        month,\n        brand_nm,\n        SUM(volume) AS TotalSales\n    FROM public.brand_per_month\n    GROUP BY\n        month,\n        brand_nm\n    ORDER BY\n        month, \n        brand_nm\n    ', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


2024-02-04 22:34:10,307 INFO sqlalchemy.engine.Engine 
    SELECT
        month,
        brand_nm,
        SUM(volume) AS TotalSales
    FROM public.brand_per_month
    GROUP BY
        month,
        brand_nm
    ORDER BY
        month, 
        brand_nm
    


INFO:sqlalchemy.engine.Engine:
    SELECT
        month,
        brand_nm,
        SUM(volume) AS TotalSales
    FROM public.brand_per_month
    GROUP BY
        month,
        brand_nm
    ORDER BY
        month, 
        brand_nm
    


2024-02-04 22:34:10,308 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2024-02-04 22:34:10,312 INFO sqlalchemy.engine.Engine ROLLBACK


INFO:sqlalchemy.engine.Engine:ROLLBACK


In [None]:
df_q2.head()

Unnamed: 0,month,brand_nm,totalsales
0,1,LEMON,505846.1
1,1,RASPBERRY,392552.12
2,1,STRAWBERRY,315902.75
3,2,LEMON,550941.7
4,2,RASPBERRY,409662.03


In [None]:
fig = px.scatter(df_q2, x='month', y='totalsales', color='brand_nm',
    title='Monthly Sales Volume by Brand',
    trendline='ols',
    labels={'month': 'Month', 'TotalSales': 'Total Sales ($)'}
)

fig.show()





Question 3 - Which are the lowest brand (BRAND_NM) in sales ($ Volume) for each region (Btlr_Org_LVL_C_Desc)?

In [None]:
df_q3 = pd.read_sql(""" 
    WITH RankedBrandRegion AS (
	SELECT
		brand_nm,
		btlr_org_lvl_c_desc AS Region,
		SUM(volume) AS TotalSales,
		RANK() OVER (PARTITION BY btlr_org_lvl_c_desc ORDER BY SUM(volume) ASC) AS BrandSalesRank
	FROM
		brand_per_region
	GROUP BY
		brand_nm, btlr_org_lvl_c_desc
    )

    SELECT
        brand_nm,
        Region,
        TotalSales
    FROM RankedBrandRegion
    WHERE
        BrandSalesRank <= 1
    """, 
    engine
)

2024-02-04 22:34:14,427 INFO sqlalchemy.engine.Engine BEGIN (implicit)


INFO:sqlalchemy.engine.Engine:BEGIN (implicit)


2024-02-04 22:34:14,428 INFO sqlalchemy.engine.Engine SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


INFO:sqlalchemy.engine.Engine:SELECT pg_catalog.pg_class.relname 
FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_catalog.pg_namespace.oid = pg_catalog.pg_class.relnamespace 
WHERE pg_catalog.pg_class.relname = %(table_name)s AND pg_catalog.pg_class.relkind = ANY (ARRAY[%(param_1)s, %(param_2)s, %(param_3)s, %(param_4)s, %(param_5)s]) AND pg_catalog.pg_table_is_visible(pg_catalog.pg_class.oid) AND pg_catalog.pg_namespace.nspname != %(nspname_1)s


2024-02-04 22:34:14,429 INFO sqlalchemy.engine.Engine [cached since 175.3s ago] {'table_name': ' \n    WITH RankedBrandRegion AS (\n\tSELECT\n\t\tbrand_nm,\n\t\tbtlr_org_lvl_c_desc AS Region,\n\t\tSUM(volume) AS TotalSales,\n\t\tRANK() OVER (PAR ... (152 characters truncated) ...     )\n\n    SELECT\n        brand_nm,\n        Region,\n        TotalSales\n    FROM RankedBrandRegion\n    WHERE\n        BrandSalesRank <= 1\n    ', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


INFO:sqlalchemy.engine.Engine:[cached since 175.3s ago] {'table_name': ' \n    WITH RankedBrandRegion AS (\n\tSELECT\n\t\tbrand_nm,\n\t\tbtlr_org_lvl_c_desc AS Region,\n\t\tSUM(volume) AS TotalSales,\n\t\tRANK() OVER (PAR ... (152 characters truncated) ...     )\n\n    SELECT\n        brand_nm,\n        Region,\n        TotalSales\n    FROM RankedBrandRegion\n    WHERE\n        BrandSalesRank <= 1\n    ', 'param_1': 'r', 'param_2': 'p', 'param_3': 'f', 'param_4': 'v', 'param_5': 'm', 'nspname_1': 'pg_catalog'}


2024-02-04 22:34:14,430 INFO sqlalchemy.engine.Engine  
    WITH RankedBrandRegion AS (
	SELECT
		brand_nm,
		btlr_org_lvl_c_desc AS Region,
		SUM(volume) AS TotalSales,
		RANK() OVER (PARTITION BY btlr_org_lvl_c_desc ORDER BY SUM(volume) ASC) AS BrandSalesRank
	FROM
		brand_per_region
	GROUP BY
		brand_nm, btlr_org_lvl_c_desc
    )

    SELECT
        brand_nm,
        Region,
        TotalSales
    FROM RankedBrandRegion
    WHERE
        BrandSalesRank <= 1
    


INFO:sqlalchemy.engine.Engine: 
    WITH RankedBrandRegion AS (
	SELECT
		brand_nm,
		btlr_org_lvl_c_desc AS Region,
		SUM(volume) AS TotalSales,
		RANK() OVER (PARTITION BY btlr_org_lvl_c_desc ORDER BY SUM(volume) ASC) AS BrandSalesRank
	FROM
		brand_per_region
	GROUP BY
		brand_nm, btlr_org_lvl_c_desc
    )

    SELECT
        brand_nm,
        Region,
        TotalSales
    FROM RankedBrandRegion
    WHERE
        BrandSalesRank <= 1
    


2024-02-04 22:34:14,431 INFO sqlalchemy.engine.Engine [raw sql] {}


INFO:sqlalchemy.engine.Engine:[raw sql] {}


2024-02-04 22:34:14,437 INFO sqlalchemy.engine.Engine ROLLBACK


INFO:sqlalchemy.engine.Engine:ROLLBACK


In [None]:
df_q3

Unnamed: 0,brand_nm,region,totalsales
0,STRAWBERRY,CANADA,74009.57
1,STRAWBERRY,GREAT LAKES,208747.9
2,STRAWBERRY,MIDWEST,149206.78
3,STRAWBERRY,NORTHEAST,195581.55
4,RASPBERRY,SOUTHEAST,223958.52
5,GRAPE,SOUTHWEST,7.5
6,STRAWBERRY,WEST,134511.14


In [None]:
fig = px.bar(df_q3, x='brand_nm', y='totalsales', color='region',
    title='Lowest Sales Volume by Brand in Each Region',
    labels={'brand_nm': 'Brand', 'lowestsales': 'Lowest Sales ($)'}
)

fig.show()





### Roadmap to make the project more robust:

1. **Pipeline Orchestration:**
   - Integrate a pipeline orchestration tool, such as Apache Airflow, to facilitate the execution, monitoring and scheduling of pipeline steps.

2. **Monitoring and recording:**
   - Implement a monitoring and logging solution (has been started, but more robustness needs to be added) to track pipeline performance, identify possible failures and improve debugging capacity.

3. **Separation of Environments:**
   - Set up separate development, testing, and production environments to ensure robust testing before deploying changes to the production environment.

4. **Authentication and Authorization:**
   - Strengthen security by implementing authentication and authorization mechanisms at all layers of the pipeline.

5. **Code Version:**
   - Use of a version control system, such as Github.

6. **Dockerization:**
   - Consider “dockerizing” pipeline components to create more isolated environments and ensure consistency between different stages and environments.

7. **Error Handling:**
   - Improve error handling at all stages of the pipeline, implementing strategies to robustly handle failures and ensure workflow continuity.

8. **Partitioning and Performance Tuning:**
   - Use of efficient partitioning techniques in Spark to improve data processing performance, especially when dealing with large volumes (To be studied...).

9. **Metadata and Documentation:**
   - Maintain detailed metadata about the data and transformations performed. Document the pipeline to make it easier to understand and maintain in the future.

10. **(CI/CD) Workflow:**
    - Implement CI/CD practices to automate pipeline construction, testing and deployment, avoiding the risk of manually installed errors.

11. **Scalability:**
    - Design the pipeline to be scalable. This may involve implementing scalable Spark clusters and automatically scaling resources in the Cloud.

12. **Data Security:**
    - Ensure that sensitive data is protected at all stages of the pipeline, considering the application of encryption and data masking as necessary.