# Universidad Internacional de La Rioja  

### Máster Universitario en Visual Analytics and Big Data  

---

### **Predicción y Análisis de la Demanda y Suministro de Productos entre la Comunidad Andina y España**  
**Presentado por:** Danilo Andrés Beleño Villafañe  

---

### **Notebook 2: Paso de la Zona de Datos Crudos a la Zona de Datos Confiables**  


### Configuracion incial

In [1]:
import logging
from google.cloud import storage
from pyspark.sql.functions import expr
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, regexp_replace, trim, udf

In [2]:
logging.basicConfig(level=logging.INFO)

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
spark

In [5]:
CONFIG = {
    "raw_bucket" : "data-factory-1-raw-data-zone",
    "trusted_bucket" : "data-factory-2-trusted-zone",
    "data_base_path" : "data/datacomex/",
}

### Funciones generale

In [6]:
def get_files_from_gcs(bucket_name, path_prefix):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    files = [blob.name for blob in bucket.list_blobs(prefix=path_prefix) if blob.name.endswith(".csv")]
    return files[::-1]

In [7]:
def decode_binary_content(binary_data):
    return binary_data.decode("utf-16le")

def decode_df(binary_df, col_names):
    decode_udf = udf(decode_binary_content, StringType())
    
    df = binary_df.select(
        decode_udf(col("content")).alias("decoded_content")
    ).selectExpr(
        "split(decoded_content, '\n') as lines"
    ).selectExpr(
        "explode(lines) as line"
    ).filter(
        "line != ''"
    ).selectExpr(
        "split(line, '\t') as columns"
    ).select(
        [expr(f"columns[{i}]").alias(name) for i, name in enumerate(col_names)]
    ).filter(
        ~col(col_names[0]).contains(col_names[0])
    )
    
    df = df.select(
        [regexp_replace(trim(col(c)), r'\r', '').alias(c) for c in df.columns]
    )
    return df

### Ingesta Tabla clasificacion_taric

In [8]:
source_taric_path = f'gs://{CONFIG["raw_bucket"]}/data/datacomex/clasificacion_taric/clasificacion_taric.csv'
target_taric_path = f'gs://{CONFIG["trusted_bucket"]}/data/datacomex/clasificacion_taric'
col_names = ['cod_taric', 'nivel_taric', 'descripcion_taric']

In [9]:
df = spark.read.format("binaryFile").load(source_taric_path)
df = decode_df(df, col_names).withColumn("nivel_taric", col("nivel_taric").cast("integer"))

In [10]:
df.write.options(
    compression="snappy"
).mode(
    "overwrite"
).parquet(target_taric_path)


                                                                                

### Ingesta Tabla paises

In [11]:
source_country_path = f'gs://{CONFIG["raw_bucket"]}/data/datacomex/paises/paises.csv'
target_country_path = f'gs://{CONFIG["trusted_bucket"]}/data/datacomex/paises'
col_names = ['cod_pais', 'pais']

In [None]:
df = spark.read.format("binaryFile").load(source_country_path)
df = decode_df(df, col_names)

In [13]:
df.write.options(
    compression="snappy"
).mode(
    "overwrite"
).parquet(target_country_path)

                                                                                

### Ingesta Tabla provincias

In [14]:
source_province_path = f'gs://{CONFIG["raw_bucket"]}/data/datacomex/provincias/provincias.csv'
target_province_path = f'gs://{CONFIG["trusted_bucket"]}/data/datacomex/provincias'
col_names = ['cod_provincia', 'nombre_provincia', 'cod_comunidad', 'nombre_comunidad']

In [15]:
df = spark.read.format("binaryFile").load(source_province_path)
df = decode_df(df, col_names)

In [16]:
df.write.options(
    compression="snappy"
).mode(
    "overwrite"
).parquet(target_province_path)


[Stage 2:>                                                          (0 + 1) / 1]

                                                                                

### Ingesta Tabla Sectores

In [17]:
source_sector_path = f'gs://{CONFIG["raw_bucket"]}/data/datacomex/sectores/sectores.csv'
target_sector_path = f'gs://{CONFIG["trusted_bucket"]}/data/datacomex/sectores'
col_names = ["cod_sec", "nivel_sec", "sector"]

In [18]:
df = spark.read.format("binaryFile").load(source_sector_path)
df = decode_df(df, col_names)

In [19]:
df.write.options(
    compression="snappy"
).mode(
    "overwrite"
).parquet(target_sector_path)


[Stage 3:>                                                          (0 + 1) / 1]

                                                                                

### Ingesta Tabla comex

In [20]:
source_comex_data_path = f'{CONFIG["data_base_path"]}taric/'
target_comex_data_path = f'gs://{CONFIG["trusted_bucket"]}/data/datacomex/comex'
file_paths = get_files_from_gcs(CONFIG["raw_bucket"], source_comex_data_path)
col_names = ['tipo_movimiento', 'anio', 'mes', 'estado', 'cod_pais', 'cod_provincia', 'euros', 'dolares', 'nivel_taric_original', 'cod_taric', 'kilogramos']

In [21]:
for file_path in file_paths:
    try:
        logging.info(f"Procesando archivo: {file_path}")
        
        df = spark.read.format("binaryFile").load(f'gs://{CONFIG["raw_bucket"]}/{file_path}')
        df = decode_df(df, col_names)

        df = (
            df.withColumn("anio", col("anio").cast("integer"))
              .withColumn("mes", col("mes").cast("integer"))
              .withColumn("euros", regexp_replace(col("euros"), ",", ".").cast("decimal(18, 2)"))
              .withColumn("dolares", regexp_replace(col("dolares"), ",", ".").cast("decimal(18, 2)"))
              .withColumn("kilogramos", regexp_replace(col("kilogramos"), ",", ".").cast("decimal(18, 2)"))
        )

        df.write.options(
            compression="snappy"
        ).mode(
            "append"
        ).partitionBy(
            "anio", "mes"
        ).parquet(target_comex_data_path)

    except Exception as e:
        logging.error(f"Error procesando archivo {file_path}: {e}")

INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202406.csv
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202405.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202403.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202402.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202401.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202312.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202311.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202309.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202308.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202307.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202306.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_202305.csv       
INFO:root:Procesando archivo: data/

INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201510.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201509.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201508.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201507.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201506.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201505.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201504.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201503.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201502.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201501.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201412.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_201411.csv       
INFO:root:Procesando archivo

INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200609.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200608.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200607.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200605.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200604.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200603.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200602.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200601.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200512.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200511.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200510.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_200509.csv       
INFO:root:Procesando archivo

INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199803.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199802.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199801.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199712.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199711.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199710.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199707.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199706.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199705.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199704.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199703.csv       
INFO:root:Procesando archivo: data/datacomex/taric/comex_taric_199702.csv       
INFO:root:Procesando archivo