In [0]:
import pandas as pd
import numpy as np

Get Dollar Value in Argentinian Pesos from API for the specific date

In [0]:
import requests

date = "2025-12-17"
#url = f"https://dolarapi.com/v1/dolares/oficial/{date}"
#url = f"https://dolarapi.com/v1/dolares/oficial/"
url = f"https://api.argentinadatos.com/v1/cotizaciones/dolares"

response = requests.get(url)
data = response.json()

filtered = [item for item in data if item.get("fecha") == date]
filtered = [item for item in filtered if item.get("casa") == "oficial"]


print(filtered)

dolar = None
if filtered:
    dolar = filtered[0].get("venta")

print(dolar)

**Bronze Layer - Import CSV and Create Delta Table**

In [0]:
# Leer el archivo CSV a un Dataframe
df = spark.read.format("csv") \
    .option("header","true") \
    .option("sep",";") \
    .load("/Volumes/workspace/default/bronze/Precios_Medicamentos.csv")

In [0]:
# Check the Dataframe
display(df)

In [0]:
# Cambiar nombre de columnas con caracteres invalidos
df = df.withColumnRenamed("DROGA ", "Droga") \
    .withColumnRenamed("MARCA ", "Marca") \
    .withColumnRenamed("PRESENTACION", "Presentacion") \
    .withColumnRenamed("LABORATORIO", "Laboratorio") \
    .withColumnRenamed("COBERTURA", "Porcentaje_Cobertura") \
    .withColumnRenamed("COPAGO", "Precio_Afiliado_PAMI_$")

In [0]:
# Drop the existing table
#spark.sql("DROP TABLE IF EXISTS workspace.default.bronze_Precios_Medicamentos")

# Write the DataFrame as a new Delta table
df.write.format("delta") \
    .mode("append") \
    .saveAsTable("workspace.default.bronze_Precios_Medicamentos")


**Silver Layer - Limpieza, Calidad y Estandarización de los Datos**

In [0]:
from pyspark.sql.functions import regexp_replace, col

In [0]:
# Leer tabla Bronze
df = spark.read.table("bronze_Precios_Medicamentos")

In [0]:
# Aplicar Schema y Limpieza
df_silver = df \
    .filter(df.Presentacion.contains("mg")) \
    .dropDuplicates() \
    .withColumn("Precio_Afiliado_PAMI_$",regexp_replace(col("Precio_Afiliado_PAMI_$"), "[^0-9.]", "").cast("double")) \
    .withColumn("Porcentaje_Cobertura",regexp_replace(col("Porcentaje_Cobertura"), "%", "").cast("double"))

In [0]:
# Check the Dataframe
display(df_silver)

In [0]:
# Drop the existing table
#spark.sql("DROP TABLE IF EXISTS workspace.default.silver_Precios_Medicamentos")

# Write the DataFrame as a new Delta table
df_silver.write.format("delta") \
    .mode("append") \
    .saveAsTable("workspace.default.silver_Precios_Medicamentos")


**Gold Layer - Visualización**

In [0]:
from pyspark.sql.functions import sum, avg, count, min, max, countDistinct, when, col, format_number

In [0]:
df_gold = df_silver.withColumn(
    "Precio_Total_$",
    when(
        col("Porcentaje_Cobertura") == 0, None
    ).otherwise(
        col("Precio_Afiliado_PAMI_$") / (col("Porcentaje_Cobertura") / 100)
    )
).withColumn(
    "Precio_Afiliado_PAMI_USD",
    col("Precio_Afiliado_PAMI_$") / dolar
).withColumn(
    "Precio_Total_USD",
    col("Precio_Total_$") / dolar
)
# Para visualización, puedes agregar columnas formateadas:
df_gold = df_gold.withColumn(
    "Precio_Total_$_fmt", format_number(col("Precio_Total_$"), 2)
).withColumn(
    "Precio_Afiliado_PAMI_USD_fmt", format_number(col("Precio_Afiliado_PAMI_USD"), 2)
).withColumn(
    "Precio_Total_USD_fmt", format_number(col("Precio_Total_USD"), 2)
)

In [0]:
# Check the Dataframe
display(df_gold)

In [0]:
# Drop the existing table
#spark.sql("DROP TABLE IF EXISTS workspace.default.gold_Precios_Medicamentos")

# Write the DataFrame as a new Delta table
df_gold.write.format("delta") \
    .mode("append") \
    .saveAsTable("workspace.default.gold_Precios_Medicamentos")
