# Transforming data between bronze and silver layers - Data Lake

### Configuring access to the Minio/S3 object repository

In [None]:
! pip install -r requirements.txt

In [51]:
from datetime import datetime
from typing import Union

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

from toml import load
from minio import Minio
from io import BytesIO


with open("credentials.toml", "r") as toml_file:
    credentials = load(toml_file)

access_key = credentials.get("minio_credentials").get("access_key")
secret_key = credentials.get("minio_credentials").get("secret_key")
url_minio = credentials.get("minio_credentials").get("url_minio")

minio_client = Minio(
    url_minio,
    access_key=access_key,
    secret_key=secret_key,
    secure=False,
    
) 


def import_objects_bucket(bucket_name: str, path: str, extension:str="csv", sep: str=";") -> Union[object, bool]:
    """
    Import all files in the bucket
    """
    try:
        objects = minio_client.list_objects(bucket_name, prefix=f"{path}/")

        list_dataframes = []

        for obj in objects:
            file_name = obj.object_name

            data = minio_client.get_object(bucket_name, file_name)
            
            if extension == "parquet":
                tmp_dataframe = pd.read_parquet(BytesIO(data.read()))
            else:
                tmp_dataframe = pd.read_csv(BytesIO(data.read()), sep=sep)

            list_dataframes.append(tmp_dataframe)

        return pd.concat(list_dataframes, ignore_index=True)
    except Exception as _:
        return False

def put_dataframe_bucket(dataframe: object, bucket_name: str, path: str) -> bool:
    """
    Writes a dataframe in parquet format to the bucket
    """
    try:
        parquet_buffer = BytesIO()
        pq.write_table(pa.Table.from_pandas(dataframe), parquet_buffer)
        parquet_buffer.seek(0)
        
        minio_client.put_object(
            bucket_name,
            f"{path}/processing_obt_{datetime.now().strftime('%Y%m%d%H%M%S')}.parquet",
            parquet_buffer,
            len(parquet_buffer.getvalue())
        )
        return True
    except Exception as _:
        return False


### Importing bronze layer data for processing - CUSTOMER

In [2]:
customers_dataframe = import_objects_bucket("bronze", "customers")

In [None]:
customers_dataframe

### Importing bronze layer data for processing - SALES

In [4]:
sales_dataframe = import_objects_bucket("bronze", "sales")

In [None]:
sales_dataframe

### Importing bronze layer data for processing - LOGS

In [6]:
logs_dataframe = import_objects_bucket("bronze", "logs")

In [None]:
logs_dataframe

# Preparation of the data transformation environment

In [None]:
import duckdb

connection = duckdb.connect(database=":memory:", read_only=False)
#connection = duckdb.connect(database="data-lake.db", read_only=False)

# Registering dataframes as tables in DuckDB

connection.register("customers_table", customers_dataframe)
connection.register("sales_table", sales_dataframe)
connection.register("logs_table", logs_dataframe)

### Query execution test

In [None]:
query = """
SET memory_limit='10GB';
-- SELECT * FROM customers_table;
USE memory;
SHOW TABLES;
"""
result_query = connection.execute(query).df()
result_query

### Transformation of the sales base

In [None]:
query = (
"""
SELECT  customer_id,
        count(*) count_sales,
        min(sale_datetime) frist_sale_datetime,
        max(sale_datetime) last_sale_datetime,
        sum(sale_value) amount,
        sum(amount_sold) as amount_sold,
        sum(amount_pending) as amount_pending,
        sum(amount_cancelled) as amount_cancelled,
        string_agg(product_and_status, ', ') list_of_products_and_status
FROM (
    SELECT  customer_id,
            cast(concat(sale_date, ' ', lpad(sale_time, 5, '0'), ':00') as timestamp) sale_datetime,
            sale_value,
            case when sale_status = 'completed' then sale_value else 0.00 end as amount_sold,
            case when sale_status = 'pending' then sale_value else 0.00 end as amount_pending,
            case when sale_status = 'cancelled' then sale_value else 0.00 end as amount_cancelled,
            concat(product_sold, '-', sale_status) product_and_status
    FROM sales_table
    )
GROUP BY customer_id
ORDER BY 2 DESC;
"""
)

transformation_sales_result_dataframe = connection.execute(query).df()
transformation_sales_result_dataframe

### Transformation of the logs base

In [None]:
query = (
"""
-- SELECT * FROM logs_dataframe;

SELECT  customer_id,
        min(cast(concat(access_date, ' ', lpad(access_time, 5, '0'), ':00') as datetime)) frist_access,
        max(cast(concat(access_date, ' ', lpad(access_time, 5, '0'), ':00') as datetime)) last_access,
        string_agg(concat(replace(pages_visited, ' ', '_'), '-', device, '-', source_ip), ', ') list_of_access_origin,
        round((avg(session_duration) / 60), 0) avg_navigation_in_minutes
FROM logs_table
GROUP BY customer_id;
"""
)

transformation_logs_result_dataframe = connection.execute(query).df()
transformation_logs_result_dataframe

### Consolidation of customer bases, sales and logs into an OBT - One Big Table for silver tier

In [None]:
# Registering dataframes as new tables in DuckDB

connection.register("transformation_sales_result_table", transformation_sales_result_dataframe)
connection.register("transformation_logs_result_table", transformation_logs_result_dataframe)

In [None]:
query = (
"""
SELECT  t1.customer_id,
        t1.client_name,
        t1.email_address,
        t1.phone_number,
        t1.registration_date,
        t1.date_of_birth,
        t1.sex,
        t1.address,
        t1.city,
        coalesce(t1.state, 'Not Found') state,
        t1.customer_category,
        t2.count_sales,
        t2.frist_sale_datetime,
        t2.last_sale_datetime,
        t2.amount,
        t2.amount_sold,
        t2.amount_pending,
        t2.amount_cancelled,
        t2.list_of_products_and_status,
        t3.frist_access,
        t3.last_access,
        t3.list_of_access_origin,
        t3.avg_navigation_in_minutes
FROM customers_table t1
LEFT JOIN transformation_sales_result_table t2
ON t1.customer_id = t2.customer_id
LEFT JOIN transformation_logs_result_dataframe t3
ON t1.customer_id = t3.customer_id
"""
)

result_consolidation_obt_dataframe = connection.execute(query).df()
result_consolidation_obt_dataframe

# Writing the processing result to the silver layer

In [None]:
put_dataframe_bucket(result_consolidation_obt_dataframe, "silver", "processing_obt")

# Reading existing silver tier data from parquet files

In [14]:
processing_obt_dataframe = import_objects_bucket("silver", "processing_obt", extension="parquet")

In [None]:
processing_obt_dataframe

In [None]:
# Registering dataframes as new tables in DuckDB

connection.register("processing_obt_table", processing_obt_dataframe)

In [None]:
# Create a mailing list, where I want customers with pending sales who
# have already purchased something at some point, and their browsing time 
# is longer than 5 minutes and the pending amount is greater than 15,000.


# Other query examples

query_metadata = (
"""
-- SELECT  table_name,
--         column_name,
--         data_type        
-- FROM information_schema.columns WHERE table_name = 'processing_obt_table';
USE information_schema;
SHOW TABLEs;
"""
)

query = (
"""
SELECT  customer_id,
        client_name,
        email_address,
        phone_number,
        date_diff('year', cast(date_of_birth as date), today()) as idade
FROM processing_obt_table
WHERE avg_navigation_in_minutes >= 5
AND list_of_products_and_status like '%pending%'
AND amount_pending > 15000
AND amount_sold > 0;
"""
)

result_gold = connection.execute(query).df()
result_gold

In [None]:
put_dataframe_bucket(result_gold, "gold", "processing_mailing_list")

# Viewing the mailing list result on the gold layer

In [None]:
processing_mailing_list_dataframe = import_objects_bucket("gold", "processing_mailing_list", extension="parquet")
processing_mailing_list_dataframe