In [None]:
!apt-get install openjdk-8-jdk-headless -qq < /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Download the correct version of Apache Spark (3.5.4)
!wget -q https://downloads.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz

# Verify the download and extract the file
import os
if os.path.exists("spark-3.5.4-bin-hadoop3.tgz"):
    print("Spark tar file downloaded successfully.")
    # Extract the tar file
    !tar xf spark-3.5.4-bin-hadoop3.tgz
else:
    print("Spark tar file not found. Download might have failed.")
!tar xf spark-3.5.4-bin-hadoop3.tgz
!pip install pyspark
!pip install findspark==1.4.2


Spark tar file downloaded successfully.


In [None]:
# Set environment variables (update SPARK_HOME if using a different version)
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

# Initialize findspark
import findspark
findspark.init()

# Create and test Spark session
from pyspark.sql import SparkSession

try:
    spark = SparkSession.builder.appName('Spark_1').getOrCreate()
    print("Apache Spark version:", spark.version)
except Exception as e:
    print("Error initializing Spark session:", e)


Apache Spark version: 3.5.4


In [None]:
# Importar librerías necesarias
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator

# Crear una sesión de Spark
spark = SparkSession.builder \
    .appName("Covid Data Ingestion") \
    .getOrCreate()

# Cargar el archivo CSV subido
filename = "Personas_por_entidad_federativa_activas_en_sistema_con_Tratamiento_Antirretroviral_-_noviembre_2024.csv"  # Reemplaza con la ruta del archivo subido
dfi = spark.read.options(header='true', inferSchema='true').csv(filename)

# Mostrar las primeras filas del DataFrame
dfi.show()

# Mostrar el esquema del DataFrame
dfi.printSchema()

# Llenar valores nulos si es necesario
dfi = dfi.fillna({"consumo_arv_mensual": 0, "numero_pacientes": 0})

# Convertir 'corte_mes' a tipo entero
dfi = dfi.withColumn("corte_mes", F.col("corte_mes").cast("int"))

# Crear una columna de fecha combinando corte_anio y corte_mes
dfi = dfi.withColumn("fecha", F.concat(F.col("corte_anio"), F.lit("-"), F.col("corte_mes"), F.lit("-01")))

# Seleccionar las columnas necesarias
dfi = dfi.select("nombre_medicamento", "consumo_arv_mensual", "numero_pacientes", "corte_anio", "corte_mes", "fecha")
from pyspark.sql.functions import mean, when, isnull

# Calculate the mean of 'numero_pacientes', 'corte_anio', 'corte_mes'
mean_numero_pacientes = dfi.select(mean(col('numero_pacientes'))).collect()[0][0]
mean_corte_anio = dfi.select(mean(col('corte_anio'))).collect()[0][0]
mean_corte_mes = dfi.select(mean(col('corte_mes'))).collect()[0][0]
# Calculate the mean of 'corte_mes', handling potential None result
mean_corte_mes_result = dfi.select(mean(col('corte_mes')).cast("integer")).collect()[0][0]  # Force to integer
mean_corte_mes = mean_corte_mes_result if mean_corte_mes_result is not None else 0 # Use 0 if None

# Impute nulls using the calculated means or a constant value for categorical features
dfi = dfi.fillna({
    "numero_pacientes": mean_numero_pacientes,
    "corte_anio": int(mean_corte_anio), # cast to int if 'corte_anio' is an integer column
    "corte_mes": int(mean_corte_mes), # cast to int if 'corte_mes' is an integer column
})
# Convertir las columnas categóricas y numéricas a formato numérico usando VectorAssembler
assembler = VectorAssembler(inputCols=["numero_pacientes", "corte_anio", "corte_mes"], outputCol="features", handleInvalid="skip")
dfi = assembler.transform(dfi)

# Paso 2: Dividir los datos en conjunto de entrenamiento y prueba
train_data, test_data = dfi.randomSplit([0.8, 0.2], seed=123)

# Paso 3: Crear el modelo de regresión
lr = LinearRegression(featuresCol="features", labelCol="consumo_arv_mensual")

# Paso 4: Entrenar el modelo
lr_model = lr.fit(train_data)

# Paso 5: Hacer predicciones
predictions = lr_model.transform(test_data)

# Paso 6: Evaluar el modelo
evaluator = RegressionEvaluator(labelCol="consumo_arv_mensual", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"RMSE: {rmse}")

# Paso 7: Predecir el medicamento más consumido en el siguiente mes
medicamento_predicho = predictions \
    .groupBy("nombre_medicamento") \
    .agg(F.sum("prediction").alias("prediccion_total")) \
    .orderBy(F.col("prediccion_total").desc()) \
    .limit(1)

# Mostrar el medicamento con la predicción más alta
medicamento_predicho.show()


+-----------------+---------------------+--------------------+-------------------+--------------------+-------------+----------+------------+-------------------+----------------+
|clave_medicamento|establecimiento_salud|      unidad_almacen|              corte|  nombre_medicamento|unidad_medida|corte_anio|   corte_mes|consumo_arv_mensual|numero_pacientes|
+-----------------+---------------------+--------------------+-------------------+--------------------+-------------+----------+------------+-------------------+----------------+
|  010.000.4272.00|            ZACATECAS|CAPASITS - Fresnillo|2024-11-25 06:00:00|   ABACAVIR SOLUCION|           ml|      2024|Noviembre 25|                0.0|               0|
|  010.000.6203.00|            ZACATECAS|CAPASITS - Fresnillo|2024-11-25 06:00:00|BICTEGRAVIR 50 mg...|       Envase|      2024|Noviembre 25|              130.0|             130|
|  010.000.5862.00|            ZACATECAS|CAPASITS - Fresnillo|2024-11-25 06:00:00|    DARUNAVIR 150 mg|  

In [None]:
# Importar biblioteca para subir archivos en Google Colab
from google.colab import files

# Subir el archivo CSV
uploaded = files.upload()

# Verificar los archivos subidos
for filename in uploaded.keys():
    print(f"Archivo subido: {filename}")

# Importar PySpark para trabajar con datos
from pyspark.sql import SparkSession

# Crear una sesión de Spark
spark = SparkSession.builder \
    .appName("Covid Data Ingestion") \
    .getOrCreate()

# Cargar el archivo CSV subido
dfi = spark.read.options(header='true', inferSchema='true').csv(filename)

# Mostrar las primeras filas del DataFrame
dfi.show()

# Mostrar el esquema del DataFrame
dfi.printSchema()

# Contar el número de filas en el DataFrame
total_filas = dfi.count()
print(f"Total de filas: {total_filas}")

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.sql import functions as F

# Paso 1: Preprocesamiento - Convertir las columnas en características y etiqueta
# Aquí usamos 'consumo_arv_mensual' como la etiqueta (label) y otras columnas como características

# Crear un DataFrame con las columnas relevantes
dfi = dfi.select("nombre_medicamento", "consumo_arv_mensual", "numero_pacientes", "corte_anio", "corte_mes")

# Llenar valores nulos si es necesario
dfi = dfi.fillna({"consumo_arv_mensual": 0, "numero_pacientes": 0})
dfi = dfi.withColumn("corte_mes", F.col("corte_mes").cast("int"))
dfi = dfi.withColumn("corte_mes", col("corte_mes").cast("int"))
# Crear una columna de fecha combinando corte_anio y corte_mes
dfi = dfi.withColumn("fecha", F.concat(F.col("corte_anio"), F.lit("-"), F.col("corte_mes"), F.lit("-01")))


# Convertir las columnas categóricas y numéricas a formato numérico
# Aquí combinamos 'numero_pacientes' y 'fecha' como características


assembler = VectorAssembler(inputCols=["numero_pacientes", "corte_anio", "corte_mes"], outputCol="features")
dfi = assembler.transform(dfi)

# Paso 2: Dividir los datos en conjunto de entrenamiento y prueba
train_data, test_data = dfi.randomSplit([0.8, 0.2], seed=123)

# Paso 3: Crear el modelo de regresión
lr = LinearRegression(featuresCol="features", labelCol="consumo_arv_mensual")

# Paso 4: Entrenar el modelo
lr_model = lr.fit(train_data)

# Paso 5: Hacer predicciones
predictions = lr_model.transform(test_data)

# Paso 6: Evaluar el modelo
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="consumo_arv_mensual", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"RMSE: {rmse}")

# Paso 7: Predecir el medicamento más consumido en el siguiente mes
# Ordenar las predicciones por el consumo mensual y seleccionar el medicamento con mayor consumo

medicamento_predicho = predictions \
    .groupBy("nombre_medicamento") \
    .agg(F.sum("prediction").alias("prediccion_total")) \
    .orderBy(F.col("prediccion_total").desc()) \
    .limit(1)

# Mostrar el medicamento con la predicción más alta
medicamento_predicho.show()



Saving Personas_por_entidad_federativa_activas_en_sistema_con_Tratamiento_Antirretroviral_-_noviembre_2024.csv to Personas_por_entidad_federativa_activas_en_sistema_con_Tratamiento_Antirretroviral_-_noviembre_2024 (2).csv
Archivo subido: Personas_por_entidad_federativa_activas_en_sistema_con_Tratamiento_Antirretroviral_-_noviembre_2024 (2).csv
+-----------------+---------------------+--------------------+-------------------+--------------------+-------------+----------+------------+-------------------+----------------+
|clave_medicamento|establecimiento_salud|      unidad_almacen|              corte|  nombre_medicamento|unidad_medida|corte_anio|   corte_mes|consumo_arv_mensual|numero_pacientes|
+-----------------+---------------------+--------------------+-------------------+--------------------+-------------+----------+------------+-------------------+----------------+
|  010.000.4272.00|            ZACATECAS|CAPASITS - Fresnillo|2024-11-25 06:00:00|   ABACAVIR SOLUCION|           ml|

Py4JJavaError: An error occurred while calling o1213.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 167.0 failed 1 times, most recent failure: Lost task 0.0 in stage 167.0 (TID 150) (a5517985eee2 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$4112/1142805557`: (struct<numero_pacientes_double_VectorAssembler_f247711c1a1b:double,corte_anio_double_VectorAssembler_f247711c1a1b:double,corte_mes_double_VectorAssembler_f247711c1a1b:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1264)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1265)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 34 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2488)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1202)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1196)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1289)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1256)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1242)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1242)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
	at org.apache.spark.ml.regression.LinearRegression.trainWithNormal(LinearRegression.scala:456)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:354)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:329)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:186)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (`VectorAssembler$$Lambda$4112/1142805557`: (struct<numero_pacientes_double_VectorAssembler_f247711c1a1b:double,corte_anio_double_VectorAssembler_f247711c1a1b:double,corte_mes_double_VectorAssembler_f247711c1a1b:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:198)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1264)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1265)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:858)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 34 more


In [None]:
spark = SparkSession.builder \
    .appName("Covid Data Ingestion") \
    .config("spark.driver.memory", "30g") \
    .getOrCreate()

In [None]:
# Instalar las bibliotecas necesarias
!pip install ipython-sql
!pip install sqlalchemy

# Cargar la extensión SQL
%load_ext sql


Collecting jedi>=0.16 (from ipython->ipython-sql)
  Downloading jedi-0.19.2-py2.py3-none-any.whl.metadata (22 kB)
Downloading jedi-0.19.2-py2.py3-none-any.whl (1.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m17.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: jedi
Successfully installed jedi-0.19.2


In [None]:
# Crear una conexión a una base de datos SQLite
%sql sqlite:///my_database.db
# Contar el número de filas
total_filas = dfi.count()
print(f"Total de filas: {total_filas}")

Total de filas: 8100


In [None]:
# Obtener estadísticas descriptivas
dfi.describe().show()

+-------+-----------------+---------------------+--------------------+--------------------+-------------+----------+------------+-------------------+------------------+
|summary|clave_medicamento|establecimiento_salud|      unidad_almacen|  nombre_medicamento|unidad_medida|corte_anio|   corte_mes|consumo_arv_mensual|  numero_pacientes|
+-------+-----------------+---------------------+--------------------+--------------------+-------------+----------+------------+-------------------+------------------+
|  count|             8100|                 8100|                8100|                8100|         8100|      8100|        8100|               8100|              8100|
|   mean|             NULL|                 NULL|                NULL|                NULL|         NULL|    2024.0|        NULL|  21.07088888888889|20.720123456790123|
| stddev|             NULL|                 NULL|                NULL|                NULL|         NULL|       0.0|        NULL| 203.84898066664354|203.45

In [None]:
# Filtrar las filas donde 'nombre_medicamento' sea 'DARUNAVIR 150 mg'
dfi.filter(dfi['nombre_medicamento'] == "DARUNAVIR 150 mg").show()

+-----------------+---------------------+--------------------+-------------------+------------------+-------------+----------+------------+-------------------+----------------+
|clave_medicamento|establecimiento_salud|      unidad_almacen|              corte|nombre_medicamento|unidad_medida|corte_anio|   corte_mes|consumo_arv_mensual|numero_pacientes|
+-----------------+---------------------+--------------------+-------------------+------------------+-------------+----------+------------+-------------------+----------------+
|  010.000.5862.00|            ZACATECAS|CAPASITS - Fresnillo|2024-11-25 06:00:00|  DARUNAVIR 150 mg|  COMPRIMIDOS|      2024|Noviembre 25|                0.0|               0|
|  010.000.5862.00|            ZACATECAS|CAPASITS - Guadalupe|2024-11-25 06:00:00|  DARUNAVIR 150 mg|  COMPRIMIDOS|      2024|Noviembre 25|                0.0|               0|
|  010.000.5862.00|              YUCATAN|   CAPASITS - M�rida|2024-11-25 06:00:00|  DARUNAVIR 150 mg|  COMPRIMIDOS|

In [None]:
from pyspark.sql.functions import upper, trim, col

# Convertir a mayúsculas y eliminar espacios en blanco
dfi = dfi.withColumn("nombre_medicamento", upper(trim(col("nombre_medicamento"))))



In [None]:
# Mostrar las primeras filas y el esquema limpio
dfi.show()
dfi.printSchema()


+-----------------+---------------------+--------------------+-------------------+--------------------+-------------+----------+------------+-------------------+----------------+
|clave_medicamento|establecimiento_salud|      unidad_almacen|              corte|  nombre_medicamento|unidad_medida|corte_anio|   corte_mes|consumo_arv_mensual|numero_pacientes|
+-----------------+---------------------+--------------------+-------------------+--------------------+-------------+----------+------------+-------------------+----------------+
|  010.000.4272.00|            ZACATECAS|CAPASITS - Fresnillo|2024-11-25 06:00:00|   ABACAVIR SOLUCION|           ml|      2024|Noviembre 25|                0.0|               0|
|  010.000.6203.00|            ZACATECAS|CAPASITS - Fresnillo|2024-11-25 06:00:00|BICTEGRAVIR 50 MG...|       Envase|      2024|Noviembre 25|              130.0|             130|
|  010.000.5862.00|            ZACATECAS|CAPASITS - Fresnillo|2024-11-25 06:00:00|    DARUNAVIR 150 MG|  

In [None]:

# Importar funciones necesarias de PySpark
from pyspark.sql.functions import col, concat_ws, to_date, lit

# Crear una nueva columna 'corte' combinando corte_anio y corte_mes en formato 'yyyy-MM'
dfi = dfi.withColumn("corte", to_date(concat_ws("-", col("corte_anio"), col("corte_mes"), lit("01")), "yyyy-MM-dd"))

# Verificar el nuevo esquema
dfi.printSchema()

# Crear una vista temporal para consultas SQL
dfi.createOrReplaceTempView("tabla_medicamentos")

# Mostrar una consulta simple para verificar los datos
spark.sql("""
SELECT
    clave_medicamento,
    establecimiento_salud,
    unidad_almacen,
    corte,
    nombre_medicamento,
    unidad_medida,
    consumo_arv_mensual,
    numero_pacientes
FROM tabla_medicamentos
LIMIT 10
""").show()

# Confirmación de la creación de la vista temporal
print("Vista SQL 'tabla_medicamentos' creada exitosamente.")


root
 |-- clave_medicamento: string (nullable = true)
 |-- establecimiento_salud: string (nullable = true)
 |-- unidad_almacen: string (nullable = true)
 |-- corte: date (nullable = true)
 |-- nombre_medicamento: string (nullable = true)
 |-- unidad_medida: string (nullable = true)
 |-- corte_anio: integer (nullable = true)
 |-- corte_mes: string (nullable = true)
 |-- consumo_arv_mensual: double (nullable = true)
 |-- numero_pacientes: integer (nullable = true)

+-----------------+---------------------+--------------------+-----+--------------------+-------------+-------------------+----------------+
|clave_medicamento|establecimiento_salud|      unidad_almacen|corte|  nombre_medicamento|unidad_medida|consumo_arv_mensual|numero_pacientes|
+-----------------+---------------------+--------------------+-----+--------------------+-------------+-------------------+----------------+
|  010.000.4272.00|            ZACATECAS|CAPASITS - Fresnillo| NULL|   ABACAVIR SOLUCION|           ml|      

Análisis basado en el DataFrame dfi
a. Identificación de Dimensiones y Hechos
Dimensiones:

clave_medicamento: Identificador único del medicamento.
establecimiento_salud: Centro de salud asociado al consumo del medicamento.
unidad_almacen: Almacén que distribuye los medicamentos.
nombre_medicamento: Nombre comercial o técnico del medicamento.
unidad_medida: Unidad en la que se mide el consumo (tabletas, frascos, etc.).
corte: Fecha de corte en formato yyyy-MM-dd que combina corte_anio y corte_mes.

Hechos:

---



---



consumo_arv_mensual: Cantidad total de medicamento consumido en un mes.
numero_pacientes: Número de pacientes que recibieron el medicamento en el periodo indicado.

In [None]:
# Crear una vista temporal
dfi.createOrReplaceTempView("tabla_medicamentos")

# Consulta SQL para contar valores nulos por columna
spark.sql("""
SELECT
    SUM(CASE WHEN clave_medicamento IS NULL THEN 1 ELSE 0 END) AS nulos_clave_medicamento,
    SUM(CASE WHEN establecimiento_salud IS NULL THEN 1 ELSE 0 END) AS nulos_establecimiento_salud,
    SUM(CASE WHEN unidad_almacen IS NULL THEN 1 ELSE 0 END) AS nulos_unidad_almacen,
    SUM(CASE WHEN nombre_medicamento IS NULL THEN 1 ELSE 0 END) AS nulos_nombre_medicamento,
    SUM(CASE WHEN unidad_medida IS NULL THEN 1 ELSE 0 END) AS nulos_unidad_medida,
    SUM(CASE WHEN corte_anio IS NULL THEN 1 ELSE 0 END) AS nulos_corte_anio,
    SUM(CASE WHEN corte_mes IS NULL THEN 1 ELSE 0 END) AS nulos_corte_mes,
    SUM(CASE WHEN consumo_arv_mensual IS NULL THEN 1 ELSE 0 END) AS nulos_consumo_arv_mensual,
    SUM(CASE WHEN numero_pacientes IS NULL THEN 1 ELSE 0 END) AS nulos_numero_pacientes
FROM tabla_medicamentos
""").show()


+-----------------------+---------------------------+--------------------+------------------------+-------------------+----------------+---------------+-------------------------+----------------------+
|nulos_clave_medicamento|nulos_establecimiento_salud|nulos_unidad_almacen|nulos_nombre_medicamento|nulos_unidad_medida|nulos_corte_anio|nulos_corte_mes|nulos_consumo_arv_mensual|nulos_numero_pacientes|
+-----------------------+---------------------------+--------------------+------------------------+-------------------+----------------+---------------+-------------------------+----------------------+
|                      0|                          0|                   0|                       0|                  0|               0|              0|                        0|                     0|
+-----------------------+---------------------------+--------------------+------------------------+-------------------+----------------+---------------+-------------------------+--------------

b. Jerarquías en las Dimensiones
Dimensiones con Jerarquías:

Jerarquía Temporal (en la dimensión corte):

Nivel 1: Año (corte_anio).
Nivel 2: Mes (corte_mes).
Nivel 3: Fecha (corte).
Jerarquía de Establecimiento:

Nivel 1: Estado (puede derivarse si existe una columna relacionada o información geográfica del establecimiento).
Nivel 2: Municipio (asociado al estado, si se tienen los datos).
Nivel 3: Nombre del establecimiento (establecimiento_salud).
Jerarquía del Medicamento:

Nivel 1: Categoría del medicamento (por ejemplo, antirretrovirales).
Nivel 2: Nombre del principio activo (puede derivarse de nombre_medicamento si se tiene un catálogo).
Nivel 3: Nombre del medicamento específico (nombre_medicamento).


In [None]:
# Consulta para obtener la jerarquía de medicamentos (categoría, principio activo y nombre del medicamento)
spark.sql("""
SELECT
    SUBSTRING(nombre_medicamento, 1, 15) AS categoria,
    nombre_medicamento AS principio_activo,
    nombre_medicamento
FROM tabla_medicamentos
GROUP BY categoria, principio_activo, nombre_medicamento
ORDER BY categoria
""").show()



+---------------+--------------------+--------------------+
|      categoria|    principio_activo|  nombre_medicamento|
+---------------+--------------------+--------------------+
|ABACAVIR 600 MG|ABACAVIR 600 MG, ...|ABACAVIR 600 MG, ...|
|ABACAVIR SOLUCI|   ABACAVIR SOLUCION|   ABACAVIR SOLUCION|
|ABACAVIR TABLET|    ABACAVIR TABLETA|    ABACAVIR TABLETA|
|BICTEGRAVIR 50 |BICTEGRAVIR 50 MG...|BICTEGRAVIR 50 MG...|
|DARUNAVIR 150 M|    DARUNAVIR 150 MG|    DARUNAVIR 150 MG|
|DARUNAVIR 400 M|    DARUNAVIR 400 MG|    DARUNAVIR 400 MG|
|DARUNAVIR 600 M|    DARUNAVIR 600 MG|    DARUNAVIR 600 MG|
|DARUNAVIR 75 MG|     DARUNAVIR 75 MG|     DARUNAVIR 75 MG|
|DARUNAVIR 800 M|DARUNAVIR 800 MG,...|DARUNAVIR 800 MG,...|
|DOLUTEGRAVIR 10|  DOLUTEGRAVIR 10 MG|  DOLUTEGRAVIR 10 MG|
|DOLUTEGRAVIR 50|DOLUTEGRAVIR 50 M...|DOLUTEGRAVIR 50 M...|
|DOLUTEGRAVIR 50|  DOLUTEGRAVIR 50 MG|  DOLUTEGRAVIR 50 MG|
|DOLUTEGRAVIR 50|DOLUTEGRAVIR 50 M...|DOLUTEGRAVIR 50 M...|
|DOLUTEGRAVIR 5M|DOLUTEGRAVIR 5MG ...|DO

In [None]:
# Consulta para obtener la jerarquía de establecimiento (estado, municipio y nombre del establecimiento)
spark.sql("""
SELECT
    SUBSTRING(establecimiento_salud, 1, 10) AS estado,  -- Ajustar según datos disponibles
    establecimiento_salud AS municipio,  -- Asegúrate de tener la columna de municipio
    establecimiento_salud
FROM tabla_medicamentos
GROUP BY estado, municipio, establecimiento_salud
ORDER BY estado, municipio
""").show()


+----------+--------------------+---------------------+
|    estado|           municipio|establecimiento_salud|
+----------+--------------------+---------------------+
|AGUASCALIE|      AGUASCALIENTES|       AGUASCALIENTES|
|BAJA CALIF|     BAJA CALIFORNIA|      BAJA CALIFORNIA|
|BAJA CALIF| BAJA CALIFORNIA SUR|  BAJA CALIFORNIA SUR|
|  CAMPECHE|            CAMPECHE|             CAMPECHE|
|   CHIAPAS|             CHIAPAS|              CHIAPAS|
| CHIHUAHUA|           CHIHUAHUA|            CHIHUAHUA|
|CIUDAD DE |    CIUDAD DE MEXICO|     CIUDAD DE MEXICO|
|CLINICA DE|CLINICA DE INMUNO...| CLINICA DE INMUNO...|
|  COAHUILA|            COAHUILA|             COAHUILA|
|    COLIMA|              COLIMA|               COLIMA|
|   DURANGO|             DURANGO|              DURANGO|
|GUANAJUATO|          GUANAJUATO|           GUANAJUATO|
|  GUERRERO|            GUERRERO|             GUERRERO|
|   HIDALGO|             HIDALGO|              HIDALGO|
|HOSPITAL G|HOSPITAL GENERAL ...| HOSPITAL GENER

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col

# Crear la sesión de Spark
spark = SparkSession.builder.appName("Clustering Example").getOrCreate()

# Suponiendo que tienes el DataFrame 'dfi' cargado con las columnas 'consumo_arv_mensual' y 'numero_pacientes'
dfi = dfi.select("consumo_arv_mensual", "numero_pacientes")

# Usar VectorAssembler para combinar las características en una sola columna
assembler = VectorAssembler(inputCols=["consumo_arv_mensual", "numero_pacientes"], outputCol="features")
dfi = assembler.transform(dfi)

# Crear el modelo de K-means (con k=3 clústeres)
kmeans = KMeans().setK(3).setSeed(1)
model = kmeans.fit(dfi)

# Predecir el clúster al que pertenece cada punto de datos
predictions = model.transform(dfi)

# Mostrar las predicciones
predictions.select("consumo_arv_mensual", "numero_pacientes", "prediction").show()

# Ver los centros de los clústeres
centers = model.clusterCenters()
print("Centros de los clústeres:")
for center in centers:
    print(center)


+-------------------+----------------+----------+
|consumo_arv_mensual|numero_pacientes|prediction|
+-------------------+----------------+----------+
|                0.0|               0|         0|
|              130.0|             130|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|               13.0|              13|         0|
|                0.0|               0|         0|
|                1.0|               1|         0|
|                4.0|               4|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                1.0|               1|         0|
|                0.0|               0|         0|


In [None]:
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
import numpy as np

# Crear la sesión de Spark
spark = SparkSession.builder.appName("Clustering Example").getOrCreate()

# Suponiendo que tienes el DataFrame 'dfi' cargado con las columnas 'consumo_arv_mensual' y 'numero_pacientes'
dfi = dfi.select("consumo_arv_mensual", "numero_pacientes")

# Usar VectorAssembler para combinar las características en una sola columna
assembler = VectorAssembler(inputCols=["consumo_arv_mensual", "numero_pacientes"], outputCol="features")
dfi = assembler.transform(dfi)

# Crear el modelo de GMM (con k=3 clústeres)
gmm = GaussianMixture().setK(3).setSeed(1)
model = gmm.fit(dfi)

# Predecir el clúster al que pertenece cada punto de datos
predictions = model.transform(dfi)

# Mostrar las predicciones
predictions.select("consumo_arv_mensual", "numero_pacientes", "prediction").show()

# Ver las estadísticas de los clústeres
summary = model.summary
print("Log-likelihood:", summary.logLikelihood)

# Número de puntos de datos
n = dfi.count()

# Número de características
d = len(dfi.columns) - 1  # menos la columna 'features'

# Número de parámetros en GMM: k*(d + d(d+1)/2) - 1
k = 3  # número de clústeres
num_parameters = k * (d + d * (d + 1) / 2) - 1

# Log-likelihood
log_likelihood = summary.logLikelihood

# Calcular AIC y BIC
aic = 2 * num_parameters - 2 * log_likelihood
bic = np.log(n) * num_parameters - 2 * log_likelihood

print("AIC:", aic)
print("BIC:", bic)

# Ver los centros de los clústeres
centers = model.gaussiansDF.select("mean").collect()
print("Centros de los clústeres:")
for center in centers:
    print(center)



+-------------------+----------------+----------+
|consumo_arv_mensual|numero_pacientes|prediction|
+-------------------+----------------+----------+
|                0.0|               0|         0|
|              130.0|             130|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|               13.0|              13|         0|
|                0.0|               0|         0|
|                1.0|               1|         0|
|                4.0|               4|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                0.0|               0|         0|
|                1.0|               1|         0|
|                0.0|               0|         0|


In [None]:
# Mostrar todos los registros de la tabla temporal 'tabla_medicamentos'
spark.sql("SELECT * FROM tabla_medicamentos").show(truncate=False)

# Si prefieres un límite de filas para no llenar la consola:
spark.sql("SELECT * FROM tabla_medicamentos LIMIT 10").show(truncate=False)


+-----------------+---------------------+--------------------+-----+----------------------------------------------------------------------------+-------------+----------+------------+-------------------+----------------+
|clave_medicamento|establecimiento_salud|unidad_almacen      |corte|nombre_medicamento                                                          |unidad_medida|corte_anio|corte_mes   |consumo_arv_mensual|numero_pacientes|
+-----------------+---------------------+--------------------+-----+----------------------------------------------------------------------------+-------------+----------+------------+-------------------+----------------+
|010.000.4272.00  |ZACATECAS            |CAPASITS - Fresnillo|NULL |ABACAVIR SOLUCION                                                           |ml           |2024      |Noviembre 25|0.0                |0               |
|010.000.6203.00  |ZACATECAS            |CAPASITS - Fresnillo|NULL |BICTEGRAVIR 50 MG, EMTRICITABINA 200 MG, TENOFOV

In [None]:
dfi.show()

+-------------------+----------------+-------------+
|consumo_arv_mensual|numero_pacientes|     features|
+-------------------+----------------+-------------+
|                0.0|               0|    (2,[],[])|
|              130.0|             130|[130.0,130.0]|
|                0.0|               0|    (2,[],[])|
|                0.0|               0|    (2,[],[])|
|                0.0|               0|    (2,[],[])|
|                0.0|               0|    (2,[],[])|
|               13.0|              13|  [13.0,13.0]|
|                0.0|               0|    (2,[],[])|
|                1.0|               1|    [1.0,1.0]|
|                4.0|               4|    [4.0,4.0]|
|                0.0|               0|    (2,[],[])|
|                0.0|               0|    (2,[],[])|
|                0.0|               0|    (2,[],[])|
|                0.0|               0|    (2,[],[])|
|                0.0|               0|    (2,[],[])|
|                1.0|               1|    [1.0

In [None]:
a=127
b=140
c=a+b
print(c)

267
