# PROYECTO BIG DATA

Dataset = https://www.kaggle.com/datasets/albedox/cms-open-payment-dataset-2025?select=train_dataset.csv

### Introducción

El dataset seleccionado para este proyecto es CMS Open Payment Dataset 2025 que muestra las relaciones financieras entre manufacturadores de medicinas, equipamiento, biológicos y suminstros medicos y doctores practicantes y no practicantes. Estas relaciones financieras pueden crear conflicto de interés, es por eso que em CMS (Centers for Medicare & Medicaid Services) creó un programa para que haya transparencia entre las transacciones de estos.

En este proyecto se planea predecir el tipo de producto que compraría un médico de acuerdo a los atributos relevantes del comprador.


## Dataset

Este dataset cuenta con 12 millones de registros y tiene un peso de 4.81 GB

* payment_id = Identificador único asignado por el sistema para la transacción de pago.

* payment_amount = Valor total del pago en dólares estadounidenses (USD).

* payment_number = Número de pagos individuales incluidos en el monto total.

* address_full = Dirección comercial principal completa del receptor del pago.

* address_country = País principal donde se encuentra el negocio del receptor del pago.

* address_state = Estado principal del negocio del receptor del pago (abreviatura de 2 letras).

* address_city = Ciudad principal del negocio del receptor del pago.

* zip_code = Código postal (de 5 o 9 dígitos) de la ubicación principal del negocio del receptor.

* payment_day = Día del mes en que se realizó el pago.

* payment_month = Mes en que se realizó el pago.

* payment_year = Año en que se realizó el pago.

* publication_day = Día del mes en que se publicó el registro del pago.

* publication_month = Mes en que se publicó el registro del pago.

* publication_year = Año en que se publicó el registro del pago.

* change_type = Indicador que muestra si el registro es nuevo o agregado.

* indicator_third_party = Indica si el pago fue realizado a un tercero.

* indicator_related_product = Indica si el pago está relacionado con un producto específico.

* indicator_covered = Indica si el producto relacionado está “cubierto” bajo las reglas de Open Payments.

* identity_type = Designación profesional del receptor del pago.

* first_name = Nombre del receptor cubierto.

* last_name = Apellido del receptor cubierto.

* manufacturer_name = Nombre de la empresa que realizó el pago.

* manufacturer_state = Estado donde se encuentra ubicada la empresa que realizó el pago.

* manufacturer_country = País donde se encuentra ubicada la empresa que realizó el pago.

* payment_form = Método o forma del pago.

* payment_nature = Motivo o naturaleza del pago.

* product_type = Categoría del producto asociado con el pago.

* therapeutic_area = Categoría o área terapéutica del producto asociado con el pago.

* product_name = Nombre comercial del producto asociado con el pago.

* primary_type = Tipo principal de medicina practicada por el receptor.

* specialty_group = Especialidad médica principal del receptor.

* subspecialty = Subespecialidad específica del receptor.

* license_code = Abreviatura de 2 letras del estado correspondiente a la licencia médica principal del receptor.

In [1]:
#Bibliotecas para poder trabajar con Spark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz
!tar xf spark-3.5.6-bin-hadoop3.tgz
#Configuración de Spark con Python
!pip install -q findspark
!pip install pyspark

#Estableciendo variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.6-bin-hadoop3"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.81)] [1 InRelease 14.2 kB/129[0m[33m0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Waiting f[0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
                                                                               Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:4 https://cli.github.com/packages stable InRelease [3,917 B]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchp

'/content/spark-3.5.6-bin-hadoop3'

In [2]:
#Verificar la funcionalidad de Pyspark
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName('PySpark_prueba4').getOrCreate()
spark_session

In [3]:
# Importar dataset desde google drive
from google.colab import drive
drive.mount('/content/gdrive')

DATA = '/content/gdrive/MyDrive/datasets/pagos/train_dataset.csv'

df = spark_session.read.option('header', 'true').csv(DATA)

df.show()

Mounted at /content/gdrive
+----------+--------------+--------------+--------------------+---------------+-------------+---------------+--------+-----------+-------------+------------+---------------+-----------------+----------------+-----------+---------------------+-------------------------+-----------------+--------------------+----------+-----------------+--------------------+------------------+--------------------+-----------------+------------------+------------+--------------------+------------+-------------------+--------------------+--------------------+------------+--------------------+
|payment_id|payment_amount|payment_number|        address_full|address_country|address_state|   address_city|zip_code|payment_day|payment_month|payment_year|publication_day|publication_month|publication_year|change_type|indicator_third_party|indicator_related_product|indicator_covered|       identity_type|first_name|        last_name|   manufacturer_name|manufacturer_state|manufacturer_countr

In [4]:
# Droppear columnas que no son útiles para el análisis
# La razón principal de droppear estas columnas varía entre que no son
# importantes para el análisis o son columnas categóricas entre 200 y 10,000
# categoías únicas que hacen el análisis sumamente complejo y el modelado
# muy tardado.

df = df.drop("payment_id", "address_full", "address_city", "address_state", "zip_code", "payment_year", "publication_day", "publication_month", "publication_year", "change_type", "indicator_third_party", "indicator_related_product", "indicator_covered", "first_name", "last_name", "manufacturer_name", "manufacturer_country", "manufacturer_state", "therapeutic_area", "product_name", "subspecialty", "license_code", "updated_at")
df.show()


+--------------+--------------+---------------+-----------+-------------+--------------------+-----------------+------------------+------------+-------------------+--------------------+
|payment_amount|payment_number|address_country|payment_day|payment_month|       identity_type|     payment_form|    payment_nature|product_type|       primary_type|     specialty_group|
+--------------+--------------+---------------+-----------+-------------+--------------------+-----------------+------------------+------------+-------------------+--------------------+
|            60|             1|  UNITED STATES|     FRIDAY|        MARCH|           PHYSICIAN|ITEMS OR SERVICES| FOOD AND BEVERAGE|     UNKNOWN|     MEDICAL DOCTOR|ALLOPATHIC & OSTE...|
|            48|             1|  UNITED STATES|  WEDNESDAY|         JUNE|           PHYSICIAN|ITEMS OR SERVICES| FOOD AND BEVERAGE|      DEVICE|     MEDICAL DOCTOR|ALLOPATHIC & OSTE...|
|            40|             1|  UNITED STATES|    TUESDAY|        APR

In [5]:
from pyspark.sql import functions as F

# Quitamos los registros que no pertenecen a estados unidos ya que estos son muy
# pocos y no son significativos para el modelo

df = df.filter(F.col("address_country") == "UNITED STATES")
df = df.drop("address_country")

df.show()

+--------------+--------------+-----------+-------------+--------------------+-----------------+------------------+------------+-------------------+--------------------+
|payment_amount|payment_number|payment_day|payment_month|       identity_type|     payment_form|    payment_nature|product_type|       primary_type|     specialty_group|
+--------------+--------------+-----------+-------------+--------------------+-----------------+------------------+------------+-------------------+--------------------+
|            60|             1|     FRIDAY|        MARCH|           PHYSICIAN|ITEMS OR SERVICES| FOOD AND BEVERAGE|     UNKNOWN|     MEDICAL DOCTOR|ALLOPATHIC & OSTE...|
|            48|             1|  WEDNESDAY|         JUNE|           PHYSICIAN|ITEMS OR SERVICES| FOOD AND BEVERAGE|      DEVICE|     MEDICAL DOCTOR|ALLOPATHIC & OSTE...|
|            40|             1|    TUESDAY|        APRIL|           PHYSICIAN|ITEMS OR SERVICES| FOOD AND BEVERAGE|     UNKNOWN|DOCTOR OF OPTOMETRY|EY

In [6]:
from pyspark.sql import functions as F
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler, IndexToString
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# =========================
# 0) Asegurar tipos numéricos
# =========================

# Como las columnas payment_amount y payment_number vienen como string,
# las convertimos a double.

df_num = (
    df
    .withColumn("payment_amount", F.col("payment_amount").cast("double"))
    .withColumn("payment_number", F.col("payment_number").cast("double"))
)

df_num.printSchema()

# =========================
# 1) Indexar la columna target product_type -> label
# =========================

# Elegimos a la columna "product_type" como la columna target

label_indexer = StringIndexer(
    inputCol="product_type",
    outputCol="label",
    handleInvalid="keep"
).fit(df_num)

# Para convertir de vuelta los índices a texto
label_converter = IndexToString(
    inputCol="prediction",
    outputCol="predicted_product_type",
    labels=label_indexer.labels
)

# =========================
# 2) Columnas a usar
# =========================
numeric_cols = [
    "payment_amount",
    "payment_number"
]

categorical_cols = [
    "payment_day", "payment_month",
    "identity_type",
    "payment_form", "payment_nature",
    "primary_type", "specialty_group"
]

# =========================
# 3) Indexers + OneHotEncoders
# =========================

# codificación de las columnas categóricas

indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in categorical_cols
]

encoders = [
    OneHotEncoder(inputCols=[f"{c}_idx"], outputCols=[f"{c}_oh"])
    for c in categorical_cols
]

feature_cols = numeric_cols + [f"{c}_oh" for c in categorical_cols]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

# =========================
# 4) Random Forest multiclase
# =========================

# Modelo random forest como clasificador

rf = RandomForestClassifier(
    labelCol="label",
    featuresCol="features",
    numTrees=10,
    maxDepth=4,
    subsamplingRate=0.8,
    featureSubsetStrategy="auto",
    seed=42
)

# =========================
# 5) Pipeline completo
# =========================
stages = [label_indexer] + indexers + encoders + [assembler, rf, label_converter]
pipeline = Pipeline(stages=stages)

# =========================
# 6) Train / Test split
# =========================
train_df, test_df = df_num.randomSplit([0.8, 0.2], seed=42)

# =========================
# 7) Entrenar modelo
# =========================
rf_model = pipeline.fit(train_df)

# =========================
# 8) Predicciones
# =========================
pred_test = rf_model.transform(test_df)

pred_test.select(
    "product_type",
    "predicted_product_type",
    "label",
    "prediction",
    "probability"
).show(10, truncate=False)

# =========================
# 9) Métricas multiclase
# =========================
acc_eval = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="accuracy"
)
f1_eval = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

print(f"Accuracy test: {acc_eval.evaluate(pred_test):.4f}")
print(f"F1 test      : {f1_eval.evaluate(pred_test):.4f}")

# =========================
# 10) Guardar resultados a CSV
# =========================
output_path = "/content/drive/MyDrive/pagos/predicciones_rf_product_type"

(pred_test
    .select(
        "payment_amount", "payment_number",
        "payment_day", "payment_month",
        "identity_type",
        "payment_form", "payment_nature",
        "primary_type", "specialty_group",
        "product_type", "predicted_product_type",
        "label", "prediction"
    )
    .coalesce(1)
    .write
    .mode("overwrite")
    .option("header", "true")
    .csv(output_path)
)

print(f"✅ Resultados guardados en: {output_path}")



root
 |-- payment_amount: double (nullable = true)
 |-- payment_number: double (nullable = true)
 |-- payment_day: string (nullable = true)
 |-- payment_month: string (nullable = true)
 |-- identity_type: string (nullable = true)
 |-- payment_form: string (nullable = true)
 |-- payment_nature: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- primary_type: string (nullable = true)
 |-- specialty_group: string (nullable = true)

+------------+----------------------+-----+----------+----------------------------------------------------------------------------------------------------------+
|product_type|predicted_product_type|label|prediction|probability                                                                                               |
+------------+----------------------+-----+----------+----------------------------------------------------------------------------------------------------------+
|DRUG        |DRUG                  |0.0  |0.0       |[0.6

# Conclusiones

Con los resultados de este modelo podemos ver que hay poder explicativo para poder predecir cuál es la categoría pertenece el producto que un médico podría llegar a comprar. Esto le permite a las manufacturadoras hacer un ajuste a sus producciones para el siguiente año y así optimizar sus recursos para tener mayores ingresos y evitar pérdidas.

Cabe recalacar que como el dataset tiene 12 millones de registros, se tuvo que hacer múltiples ajustes para que el tiempo de modelado fuera con un tiempo adecuado. Estas consideraciones fueron el bajar los parámetros del modelo para que no fuera tan complejo, esto quiere decir que hay posibilidad de mejorar el rendimiento del modelo para tener mejor precisión. La otra consideración es que se tuvo que eliminar ciertas columnas que inicialmente eran parte del análisis (como lo era manufacturer_state) pero que a la hora de aplicarles one-hot encoding, íbamos a terminar con 50 columnas adicionales.