# **Segmentos del script principal**

### Configuración de parametros y spark

In [1]:
import time
import argparse
import logging
import math
import boto3
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, DoubleType, IntegerType
from pyspark.sql.functions import substring


logger = logging.getLogger("FromScript")
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

logger.info("Starting spark configuration...")

# ###############
# # SPARK CONFS #
# ###############
logger.info("Starting spark configuration...")

conf = SparkConf()

conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory")
conf.set("hive.metastore.schema.verification", "false")
conf.set("spark.sql.parquet.compression.codec", "snappy")
conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")

# S3A Configuration
conf.set("spark.hadoop.fs.s3a.connection.timeout", "30000")  # 30 segundos
conf.set("spark.hadoop.fs.s3a.connection.establish.timeout", "30000")  # 30 segundos
conf.set("spark.hadoop.fs.s3a.socket.timeout", "30000")  # 30 segundos
conf.set("spark.hadoop.fs.s3a.attempts.maximum", "10")  # Intentar 10 veces
conf.set("spark.hadoop.fs.s3a.retry.limit", "10")  # Límite de reintentos
conf.set("spark.hadoop.fs.s3a.retry.interval", "5000")  # Intervalo de reintento de 5 segundos
conf.set("spark.hadoop.fs.s3a.retry.throttle.delay", "5000")  # Retraso de aceleración de reintento de 5 segundos

# Aumentar timeout de broadcast y ajustar broadcast join threshold
conf.set("spark.sql.broadcastTimeout", "600")  # Aumentar timeout de broadcast a 600 segundos
conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")  # Deshabilitar broadcast join

spark = (
    SparkSession
    .builder
    .appName("SparkTest")
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()
)

sc = spark.sparkContext
sc.setLogLevel("INFO")

# logger.info("Spark confs: %s", str(sc.getConf().getAll()))
logger.info("Ended spark configuration...")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,,pyspark,idle,,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2024-08-16 13:07:40,631 - FromScript - INFO - Starting spark configuration...
2024-08-16 13:07:40,631 - FromScript - INFO - Starting spark configuration...
2024-08-16 13:07:40,666 - FromScript - INFO - Ended spark configuration...

In [None]:
# #########################
# ## ARGS FROM ENVs HOST ##
# #########################

# parser = argparse.ArgumentParser(description="Query inputs and outputs")
# parser.add_argument("--source-bucket", type=str)
# parser.add_argument("--artifacts-bucket", type=str)
# parser.add_argument("--feature-group-name", type=str)
# parser.add_argument("--date", type=str)
# parser.add_argument("--dominio", type=str)
# parser.add_argument("--database", type=str)
# parser.add_argument("--previous-months", type=str)
# parser.add_argument('--record-event-time-name', type=str, default='EventTime')
# args, _ = parser.parse_known_args()
# logger.info("Arguments: ")
# logger.info(args)

SOURCE_BUCKET = 'ibk-fs-source-data-173811917907'
ARTIFACTS_BUCKET = 'ibk-fs-artifacts-173811917907'
FEATURE_GROUP_NAME = 'ibk-fs-group-fg1-domain4-mes'
DATE_STR = '20240601'
DOMINIO = 'domain4'
DATABASE = 'e_perm_aws'
NUM_PREVIOUS_MONTHS = '2'
RECORD_EVENT_TIME_NAME = 'cod_mes_timestamp'

# ################
# # USER PARAMAS #
# ################

df_sample_value = 0.3  # TESTING
num_previous_months = "2"
partition_name = 'cod_mes'
prefix_querys = f'{DOMINIO}/{FEATURE_GROUP_NAME}/features'
prefix_data = f'{DOMINIO}/{FEATURE_GROUP_NAME}/data'
tmp_prefix_data = f'{DOMINIO}/{FEATURE_GROUP_NAME}/tmp_data'
path = f"s3://{SOURCE_BUCKET}/{prefix_data}"

# Querys keys s3
query1 = f'code/{prefix_querys}/sql/query.sql'

### Definición de funciones auxiliares

In [2]:
# ####################
# ## Querys Process ##
# ####################
# ADD YOUR QUERYS HERE, ONE FUNCTION FOR EACH QUERY WITH THE SAME STRUCTURE
# BUT DIFFERENT QUERY PARAMS IF NEEDED

def query_df(bucket_artifact, key_artifact, database, codmes):
    
    s3_client = boto3.client('s3')
    response = s3_client.get_object(Bucket=bucket_artifact, Key=key_artifact)
    query = response['Body'].read().decode('utf-8')

    query = query.format(
        database=database,
        codmes=codmes
    )

    return query

# ####################
# ## Params Process ##
# ####################

def parameters_definition(input_date, input_previous_months):
    """
    Metodo parameters_definition:
    Metodo para obtener los parametros
    requeridos para consulta SQL
    """
    date_formatted = datetime.strptime(input_date, "%Y%m%d").date()
    codmes = str(date_formatted.year)+str(date_formatted.month).zfill(2)
    calendar_date = date_formatted.replace(day=1)
    input_previous_months = int(input_previous_months)
    month_dict = {}
    dates_dict = {}
    for i in range(0, input_previous_months+1):
        key = f'lcodmesm{str(i).zfill(2)}'
        value = calendar_date-relativedelta(months=i)
        dates_dict.update({key: value})
        value = str(value.year)+str(value.month).zfill(2)
        month_dict.update({key: value})
    codmes_previous = month_dict[f'lcodmesm{str(input_previous_months).zfill(2)}']
    return codmes, month_dict, dates_dict, calendar_date, codmes_previous

# #####################################
# ## Estimador de tamaño de archivos ##
# #####################################

def estimator_size(df, bucket, prefix, partition):
    s3_client = boto3.client('s3')
    prefix = prefix.rstrip('/') + '/'

    sample_df = df.sample(fraction=0.01)

    tmp_sample_path = f"s3://{bucket}/{prefix}"
    sample_df.write \
        .mode("overwrite") \
        .format("parquet") \
        .partitionBy(partition) \
        .save(tmp_sample_path)

    sample_df.unpersist()

    # ## Obtener la partición de la muestra ###

    partitions = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter='/')
    partition_prefixes = [content['Prefix'] for content in partitions.get('CommonPrefixes', [])]
    partition_prefix = partition_prefixes[0]

    def list_objects(bucket, prefix):
        total_size = 0
        s3_objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
        if 'Contents' in s3_objects:
            total_size = sum(obj['Size'] for obj in s3_objects['Contents'])
        return total_size

    # Calcular el tamaño total de todos los archivos en la partición seleccionada
    sample_dir_size = list_objects(bucket, partition_prefix)
    total_dir_size_estimated = sample_dir_size * 100  # ya que la muestra es del 1%

    desired_file_size = 150 * 1024 * 1024  # 150 MB
    num_files = math.ceil(total_dir_size_estimated / desired_file_size)

    s3_objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    delete_keys = [{'Key': obj['Key']} for obj in s3_objects.get('Contents', [])]

    if prefix in delete_keys[0]['Key']:
        s3_client.delete_objects(Bucket=bucket, Delete={'Objects': delete_keys})

    return num_files

# ######################
# ## Spark Transforms ##
# ######################

def transformations(df, input_record_event_time_name):
    """
    Metodo transformations:
    Transformaciones a las columnas y
    tipos de datos
    """
    df = df \
        .select(
            col("cod_mes").cast(IntegerType()),
            col("tip_doc").cast(IntegerType()),
            col("key_value").cast(StringType()),
            col("feature1").cast(StringType()),
            # col("feature2").cast(StringType()),
            # col("feature3").cast(StringType())
        )

    return df

# ESCHEMA .JSON DEFINITION
# {
#     "cod_mes": "Integral",
#     "tip_doc": "Integral",
#     "key_value": "String",
#     "feature1": "String",
#     "feature2": "String",
#     "feature3": "String"
# }

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Inicialización del proceso

In [3]:
# ################
# ## INIT PRAMS ##
# ################
codmes, month_dict, dates_dict, calendar_date, codmes_previous = parameters_definition(DATE_STR, num_previous_months)


# ########################
# ## LOAD QUERY FROM S3 ##
# ########################
query = query_df(ARTIFACTS_BUCKET, query1, DATABASE, codmes)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
for line in query.split('\n'):
    print(line)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

with t_tiempo as (
    select codmes, fecdia
        from (select DATE_FORMAT(TO_DATE(calendar_date, 'yyyy-MM-dd'),'yyyyMM') codmes,
                    TO_DATE(calendar_date, 'yyyy-MM-dd') fecdia,
                    TO_DATE(calendar_date, 'yyyy-MM-dd') calendar_date,
                    Row_Number() Over(ORDER BY calendar_date DESC) rnk
                from e_perm_aws.t_calendar_summary
                WHERE calendar_date <= DATE_FORMAT(TO_DATE('202406','yyyyMM'), 'yyyy-MM-dd')
            ) as A
        where A.rnk = 1
    ),
    t_cliente as (
        select p_codmes as cod_mes, key_value,ingreso_bruto as ingreso_bruto,cod_tipo_documento as tip_doc
        from e_perm_aws.t_aws_360_cliente_mes_d3
        where cast (p_codmes as int) = 202406
    )

	select t.cod_mes, t.key_value,t.ingreso_bruto,t.tip_doc
	from t_cliente as t

--------------
### Ejecucion SQL con spark

La ejecución perezosa en PySpark significa que PySpark no ejecuta las operaciones inmediatamente cuando las definís en tu código. En lugar de eso, construye un plan de ejecución y espera hasta que realmente necesite producir un resultado. Esto permite optimizar el proceso antes de ejecutar cualquier operación.


### ¿Cómo funciona?
1. **Definición de Transformaciones**: Cuando escribís transformaciones en PySpark (como `filter`, `map`, `select`), estas no se ejecutan inmediatamente. PySpark simplemente agrega estas operaciones a un plan de ejecución.
2. **Ejecución Diferida**: PySpark no realiza ninguna acción hasta que encuentra una operación que necesite un resultado concreto, como escribir datos a un archivo o mostrar datos.

### Métodos comunes que "rompen" la ejecución perezosa (Acciones):
1. **`show()`**: Muestra algunas filas del DataFrame en la consola.
2. **`collect()`**: Recoge los datos y los trae al programa como una lista en Python.
3. **`count()`**: Cuenta el número de filas del DataFrame.
4. **`write()`**: Guarda el DataFrame en un archivo (por ejemplo, en formato Parquet).

Estos métodos fuerzan a PySpark a ejecutar todas las transformaciones que definiste previamente para poder generar el resultado final.
|
**Ejemplo**: Si hacés varias transformaciones sobre un DataFrame y terminás con un `df.show()`, PySpark recién en ese momento ejecutará todas las transformaciones en cadena y mostrará el resultado.

In [5]:
df = spark.sql(query)
df = df.sample(fraction=0.05).limit(1000)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df.explain()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
CollectLimit 1000
+- *(1) Sample 0.0, 0.05, false, 272233669539652872
   +- *(1) Project [p_codmes#180 AS cod_mes#4, key_value#28, ingreso_bruto#34, cod_tipo_documento#27 AS tip_doc#6]
      +- *(1) ColumnarToRow
         +- FileScan parquet e_perm_aws.t_aws_360_cliente_mes_d3[cod_tipo_documento#27,key_value#28,ingreso_bruto#34,p_codmes#180] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://interbank-datalake-prod-us-east-1-195034049652-stage/pre-stage/da..., PartitionFilters: [isnotnull(p_codmes#180), (cast(p_codmes#180 as int) = 202406)], PushedFilters: [], ReadSchema: struct<cod_tipo_documento:int,key_value:string,ingreso_bruto:decimal(38,10)>

### Proceso de análisis en Spark SQL:
1. **Análisis de la Sintaxis (Parsing)**: Cuando escribís una query, Spark revisa que esté bien escrita. Si hay un error en la sintaxis o usás una función que no existe (como `date_parse`), Spark te avisa antes de ejecutar la consulta.

2. **Análisis de los Elementos (Analysis)**: Spark verifica que las columnas, tablas y funciones que mencionás en la query existan y sean correctas. Si algo no está bien, te mostrará un error antes de seguir.

3. **Optimización (Optimization)**: Si todo está bien, Spark reorganiza la consulta para hacerla más eficiente.

Entonces, sí, Spark detecta errores como funciones que no existen antes de traer los datos o ejecutar la consulta.

In [7]:
# ###########################
# Revisar el esquema de datos
# ###########################
for s in df.schema:
    print(s)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

StructField('cod_mes', StringType(), True)
StructField('key_value', StringType(), True)
StructField('ingreso_bruto', DecimalType(38,10), True)
StructField('tip_doc', IntegerType(), True)

In [14]:
# Accion para traer los datos a memoria // Rompemos la ejecucion perezosa >>> lazy evaluation
df.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Interrupted by user


In [9]:
# def nueva_feature(df):
#     df = df.withColumn("feature1", substring("key_value", 1, 8))
#     return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Aplicar esquemas de datos

In [None]:
df = transformations(df, RECORD_EVENT_TIME_NAME)

In [None]:
# df2.createOrReplaceTempView("table")