In [176]:
# Importar módulo necesario
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, udf, regexp_replace, lit, trim,  unix_timestamp, from_unixtime, datediff, to_date, date_format
from pyspark.sql.types import StringType

from datetime import datetime

In [177]:
# Crear una instancia de SparkSession
spark = SparkSession.builder.getOrCreate()

In [178]:
# Leer el archivo CSV
df_arda_mesamf_especies = spark.read.csv("./santander/tablas_para_metricas/arda_mesamf_especies_202404231048.csv", sep=";", header=True)
df_cartera_de_especies_2 = spark.read.csv("./santander/tablas_para_metricas/cartera_de_especies_2_202404231126.csv", sep=";", header=True)
df_metrics_currency = spark.read.csv("./santander/tablas_para_metricas/metrics_currency.csv", sep=";", header=True)

In [179]:
df_cartera_de_especies_2.drop("fecha_emision")

# left join entre df_arda_mesamf_especies y df_cartera_de_especies_2 por especie y cod_especie y 
df_join_cartera_arda_currency = df_cartera_de_especies_2.alias("cartera") \
                                .join(df_arda_mesamf_especies.alias("arda"), (trim(col("cartera.especie")) == trim(col("arda.cod_especie"))), how="left") \
                                .join(df_metrics_currency.alias("currency"), (col("arda.cotiza") == col("currency.more_detail")), how="left")





In [180]:
def handle_isin(isin):
    if isin is None or isin == '':
        return '000000000000'
    else:
        return isin

def calculate_mat(date_str):
    if date_str is None:
        return None
    date_format = "%Y-%m-%d"
    date = datetime.strptime(date_str, date_format)
    partition_nwca = datetime.strptime('2024-03-27', "%Y-%m-%d")
    diff = (partition_nwca - date).days
    if diff < 90:
        return 'MAT2'
    elif diff >= 90 and diff < 365:
        return 'MAT3'
    elif diff >= 365 and diff < 365*2:
        return 'MAT4'
    elif diff >= 365*2:
        return 'MAT5'
    else:
        return 'MAT1'

def calculate_days(date_str):
    if date_str is None:
        return None
    date_format = "%Y-%m-%d"
    date = datetime.strptime(date_str, date_format)
    base_date = datetime.strptime('1900-01-01', "%Y-%m-%d")
    return str((date - base_date).days + 2)

calculate_mat_udf = udf(calculate_mat, StringType())
handle_isin_udf = udf(handle_isin, StringType())
ddate_udf = udf(calculate_days, StringType())

In [182]:
calculate_days("2038-01-09")

'50414'

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 42474)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 257, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/

In [181]:
df_to_load = df_join_cartera_arda_currency.withColumn("nomb", col("arda.nombre_corto")) \
                                                    .withColumn("ddate", ddate_udf(col("cartera.fecha_vencimiento"))) \
                                                    .withColumn("arda.isin", handle_isin_udf(col("arda.isin"))) \
                                                    .withColumn("mat", calculate_mat_udf(col("cartera.fecha_vencimiento"))) \
                                                    .withColumn("tc", col("transaction_currency")) 



# df_to_load.show(50, False)


df_to_load = df_to_load.selectExpr(
    "cartera as cartera",
    "especie as especie",
    "sec_id as sec_id",
    "rubro_altair as rubro_altair",
    "valor_nominal as valor_nominal",
    "valor_contable as valor_contable",
    "arda.fecha_emision as fecha_emision",
    "fecha_incorporacion_tenencia as fecha_incorporacion_tenencia",
    "cartera.fecha_vencimiento as fecha_vencimiento",
    "tipo_valoracion as tipo_valoracion",
    "nomb as nomb",
    "ddate as ddate",
    "mat as mat",
    "arda.isin as isin",
    "tc as tc",
    "ocig",
    "gros",
    "rnm"
    )

df_to_load = df_to_load.withColumn("fecha_emision", to_date(col("fecha_emision"), "dd/MM/yyyy").cast("string")) 

# guardar dataframe df_to_load a csv
# df_to_load.write.csv("./santander/tablas_para_metricas/cartera_de_especies_202404231126.csv", sep=";", mode="overwrite")
df_to_load.show(50, False)
                                                    

+-------+-------+------+------------+-------------+---------------+-------------+----------------------------+-----------------+---------------+---------------+-----+----+------------+---+----+----+----+
|cartera|especie|sec_id|rubro_altair|valor_nominal|valor_contable |fecha_emision|fecha_incorporacion_tenencia|fecha_vencimiento|tipo_valoracion|nomb           |ddate|mat |isin        |tc |ocig|gros|rnm |
+-------+-------+------+------------+-------------+---------------+-------------+----------------------------+-----------------+---------------+---------------+-----+----+------------+---+----+----+----+
|TRD    |ARN38D |null  |1250405     |8118.0       |4241655.0      |2020-09-04   |2020-09-08                  |2038-01-09       |1              |BONO U$ 2038   |50414|MAT2|US040114HU71|840|null|null|null|
|TRD    |ARN41E |null  |1250405     |151942.0     |58289663.51    |2020-09-04   |2020-09-08                  |2041-07-09       |2              |BONO EU 2041   |51691|MAT2|XS2177365363|