# Mover información cruda a aumentada

## Importamos librerías

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import uuid

## Creamos variables claves del proyecto

In [0]:
# Variables de ubicación de archivos
dl_location = 'abfss://dataengineering@bidatareposig3kquinteroc.dfs.core.windows.net/'
raw_location = dl_location + 'RAW/'

uc_location_aumented = 'medallion_architecture.aumented.'

# Otras variables
date_format = 'dd/MM/yyyy'

## Creamos schemas para las diferentes tablas a leer

### Schema para las tablas transaccionales

In [0]:
df_transaction_schema = StructType(
  fields=[
    StructField('SHOP_WEEK', StringType(),True),
    StructField('SHOP_DATE', StringType(), True),
    StructField('SHOP_WEEKDAY',  StringType(), True),
    StructField('SHOP_HOUR', StringType(), True),
    StructField('QUANTITY', DoubleType(), True),
    StructField('SPEND', DoubleType(), True),
    StructField('PROD_CODE', StringType(), True),
    StructField('PROD_CODE_10', StringType(), True),
    StructField('PROD_CODE_20', StringType(), True),
    StructField('PROD_CODE_30', StringType(), True),
    StructField('PROD_CODE_40', StringType(), True),
    StructField('CUST_CODE', StringType(), True),
    StructField('seg_1', StringType(), True),
    StructField('seg_2', StringType(), True),
    StructField('BASKET_ID', StringType(), True),
    StructField('BASKET_SIZE', StringType(), True),
    StructField('BASKET_PRICE_SENSITIVITY', StringType(), True),
    StructField('BASKET_TYPE', StringType(), True),
    StructField('BASKET_DOMINANT_MISSION', StringType(), True),
    StructField('STORE_CODE', StringType(), True),
    StructField('STORE_FORMAT', StringType(), True),
    StructField('STORE_REGION', StringType(), True)
])

### Schema para las tablas de fecha

In [0]:
df_time_schema = StructType(
  fields=[
    StructField('shop_week', StringType(),True),
    StructField('date_from', StringType(), True),
    StructField('date_to',  StringType(), True)
])

## Unity Catalog

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS medallion_architecture.tracking
COMMENT 'En este schema va a ir guardada la informacion agregada'
;

In [0]:
%sql
CREATE TABLE IF NOT EXISTS medallion_architecture.tracking.summary(
    TRANSACTION_DATE TIMESTAMP,
    FOLDER_NAME STRING,
    UUID STRING
)

## Definición de funciones personalizadas

In [0]:
# Función para convertir las columnas de fecha a formato Date
def converting_to_date(df: DataFrame, colname: str, date_format: str) -> DataFrame:
    df = df.withColumn(
        colname,
        to_date(
            concat(
                substring(col(colname), 7, 2), 
                lit('/'),
                substring(col(colname), 5, 2),
                lit('/'),
                substring(col(colname), 1, 4)
            ),
            date_format
        ) 
    )
    return(df)

In [0]:
# Función para quitar los valores nulos
def cleaning_nulls(df: DataFrame) -> DataFrame:
    df = df.fillna({
        "CUST_CODE": "CUSTXXXXXXXXXX",
        "seg_1": "NA",
        "seg_2": "NA"
    })
    return(df)

In [0]:
def writing_info(df: DataFrame, table_name: str) -> None:
    df.write \
    .mode("append") \
    .format("delta") \
    .saveAsTable(f"{uc_location_aumented}{table_name}")

In [0]:
def  writing_tracking_table(raw_folders: list) -> None:
     
    raw_folders = [(folder,) for folder in raw_folders] 

    uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())
    
    df_tracking = spark.createDataFrame(raw_folders,schema=['FOLDER_NAME'])
    df_tracking = df_tracking.withColumn("UUID", uuid_udf())
    df_tracking = df_tracking.withColumn("TRANSACTION_DATE", current_timestamp())

    df_tracking.write \
        .mode("append") \
        .format("delta") \
        .option("mergeSchema", "true") \
        .saveAsTable("medallion_architecture.tracking.summary")

## Ejecución del código

In [0]:
folders = dbutils.fs.ls(raw_location)
print(folders)

In [0]:
raw_folders = dbutils.fs.ls(raw_location)
raw_folders = [folder[0] for folder in raw_folders]
raw_folders

In [0]:
df_folders = spark.sql("SELECT FOLDER_NAME FROM medallion_architecture.tracking.summary")
processed_folders = df_folders.select("FOLDER_NAME").rdd.flatMap(lambda x: x).collect()
processed_folders

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS medallion_architecture.aumented
COMMENT 'En este schema va a ir guardada la informacion agregada'
;

In [0]:
if not processed_folders:

    df_transactions = (
    spark
    .read
    .format('csv')
    .schema(df_transaction_schema)
    .option('header', True)
    .load(f"{raw_location}/*/transactions_*.csv")
    )

    df_time = (
    spark
    .read
    .format('csv')
    .schema(df_time_schema)
    .option('header', True)
    .load(f"{raw_location}/*/time.csv")
    )

    df_transactions = converting_to_date(df = df_transactions,colname = "SHOP_DATE",date_format = date_format)
    df_transactions = cleaning_nulls(df_transactions)

    writing_info(df=df_transactions, table_name="Transactions")
    writing_info(df=df_time, table_name="Time_table")
    writing_tracking_table(raw_folders)

else:
    for raw_folder in raw_folders:
        if raw_folder in processed_folders:
            print(f"Folder: {raw_folder} already processed")
            pass
        else:
            df_transactions = (
                spark
                .read
                .format('csv')
                .schema(df_transaction_schema)
                .option('header', True)
                .load(f"{raw_folder}transactions_*.csv")
                )

            df_time = (
                spark
                .read
                .format('csv')
                .schema(df_time_schema)
                .option('header', True)
                .load(f"{raw_folder}time.csv")
                )
            
            df_transactions = converting_to_date(df = df_transactions,colname = "SHOP_DATE",date_format = date_format)
            df_transactions = cleaning_nulls(df_transactions)

            writing_info(df=df_transactions, table_name="Transactions")
            writing_info(df=df_time, table_name="Time_table")
            writing_tracking_table([raw_folder])

display(df_transactions)

In [0]:
%sql
--DROP TABLE IF EXISTS medallion_architecture.aumented.transactions;
--DROP TABLE IF EXISTS medallion_architecture.aumented.time_table;
--DROP SCHEMA IF EXISTS medallion_architecture.raw;
--DROP TABLE IF EXISTS medallion_architecture.tracking.summary