In [None]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("TP2")


sc = SparkContext(conf = conf)

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TP2-TDVII") \
    .getOrCreate()

# 2. Map Reduce Spark

### 1. Para cada provincia, calcular cuantos efectores de salud hay

In [None]:
import pandas as pd

url = 'https://raw.githubusercontent.com/Dafydd8/TP2-TDVII/main/data/csvs/efectordesalud.csv'
df_efectordesalud = pd.read_csv(url, delimiter=',')

In [None]:
efectoresdesalud = []

for index, row in df_efectordesalud.iterrows():
  cod_refes = row['cod_refes']
  nombre = row['nombre']
  id_prov = row['id_prov']
  nivel = row['nivel']
  efectoresdesalud.append((cod_refes, nombre, id_prov, nivel))

In [None]:
url = 'https://raw.githubusercontent.com/Dafydd8/TP2-TDVII/main/data/csvs/provincia.csv'
df_provincia = pd.read_csv(url, delimiter=',')

In [None]:
provincias = []

for index, row in df_provincia.iterrows():
  id_prov = row['id_prov']
  nombre = row['nombre']
  poblacion_total = row['poblacion_total']
  provincias.append((id_prov, nombre, poblacion_total))

In [None]:
rdd_efectores = sc.parallelize(efectoresdesalud)
rdd_provincias = sc.parallelize(provincias)

efectores_por_prov = rdd_efectores.map(lambda x: (x[2], 1)) # (id_prov, 1)
prov_nombre = rdd_provincias.map(lambda x: (x[0], x[1])) # (id_prov, nombre)
red = efectores_por_prov.reduceByKey(lambda a, b: a + b) # (id_prov, cantidad de efectores)

joined = red.join(prov_nombre) # (id_prov, (cantidad de efectores, nombre))
res = joined.map(lambda x: (x[1][1], x[1][0])) # (nombre, cantidad de efectores)

print("Provincia, Cantidad de efectores de salud")
for k, v in res.collect():
  print(f"{k}, {v}")

Provincia, Cantidad de efectores de salud
CABA, 3
Santa Fe, 3
Tucumán, 2
Entre Ríos, 4
Santiago del Estero, 3
Neuquén, 2
San Juan, 1
Formosa, 4
La Rioja, 1
Chaco, 4
La Pampa, 1
Tierra del Fuego, 1
Buenos Aires, 4
Córdoba, 3
Mendoza, 1
Salta, 4
Corrientes, 3
Misiones, 3
Río Negro, 2
San Luis, 3
Jujuy, 5
Catamarca, 1
Chubut, 5
Santa Cruz, 2


### 2. Promedio de edad por provincia

In [None]:
url = 'https://raw.githubusercontent.com/Dafydd8/TP2-TDVII/main/data/csvs/persona.csv'
df_persona = pd.read_csv(url, delimiter=',')

In [None]:
personas = []

for index, row in df_persona.iterrows():
  id_prov = row['id_provincia']
  fecha = row['fecha_nacimiento']
  personas.append((id_prov, fecha))

In [None]:
from datetime import datetime

rdd = sc.parallelize(personas)

hoy = datetime.now()

edades = rdd.map(lambda x: (
    x[0],  # id_provincia
    (hoy.year - datetime.strptime(x[1], "%Y-%m-%d").year -
    ((hoy.month, hoy.day) < (datetime.strptime(x[1], "%Y-%m-%d").month, datetime.strptime(x[1], "%Y-%m-%d").day)), 1)
))

datos = edades.reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) # sumo por un lado las edades, y por el otro las apariciones

avg = datos.mapValues(lambda x: x[0] / x[1])
joined = avg.join(rdd_provincias)
avg_final = joined.map(lambda x: (x[1][1], x[1][0]))

for nombre, edad in sorted(avg_final.collect()):
    print(f"{nombre}: {edad}")

Buenos Aires: 42.388888888888886
CABA: 51.6
Catamarca: 49.642857142857146
Chaco: 36.526315789473685
Chubut: 39.0
Corrientes: 48.36842105263158
Córdoba: 46.705882352941174
Entre Ríos: 61.1
Formosa: 59.94117647058823
Jujuy: 55.833333333333336
La Pampa: 50.294117647058826
La Rioja: 40.0
Mendoza: 54.375
Misiones: 39.5
Neuquén: 43.42857142857143
Río Negro: 48.388888888888886
Salta: 34.09090909090909
San Juan: 39.0
San Luis: 40.13333333333333
Santa Cruz: 47.25
Santa Fe: 36.611111111111114
Santiago del Estero: 49.54545454545455
Tucumán: 48.15384615384615


### 3. Cuántos HCE gestionan recetas digitales

In [None]:
url = 'https://raw.githubusercontent.com/Dafydd8/TP2-TDVII/main/data/csvs/hce.csv'
df_hce = pd.read_csv(url, delimiter=',')

In [None]:
hce = []

for index, row in df_hce.iterrows():
  g = row['generacion_receta_digital']
  hce.append(g)


In [None]:
rdd = sc.parallelize(hce)
mapeo = rdd.map(lambda x: (x, 1))
conteo = mapeo.reduceByKey(lambda a, b: a+b)
for i, j in sorted(conteo.collect()):
    print(f"{i}: {j}")

Genera: 24
No genera: 21


### 4. Proporción de personas registradas en el sistema de salud

In [None]:
# Cantidad de personas por provincia
personas_rdd = sc.parallelize(personas)
registradas = personas_rdd.map(lambda x: (x[0], 1)) # (id_prov, 1)
conteo = registradas.reduceByKey(lambda a, b: a+b) # (id_prov, cantidad de personas)

prov_rdd = sc.parallelize(provincias)
prov_poblacion_total = prov_rdd.map(lambda x: (x[0], x[2])) # (id_prov, poblacion_total)
prov_nombre = prov_rdd.map(lambda x: (x[0], x[1])) # (id_prov, nombre)
join_rdd = prov_poblacion_total.join(conteo) # (id_prov, (poblacion total, cantidad de personas))

proporcion_rdd = join_rdd.mapValues(lambda x: round(x[1] / x[0], 6))  # proporción con 6 decimales
proporcion_join = proporcion_rdd.join(prov_nombre) # (id_prov, (proporcion, nombre))
res = proporcion_join.map(lambda x: (x[1][1], x[1][0])) # (nombre, proporcion)

print(f"Provincia - Proporción registrada")
for prov, prop in res.collect():
    print(f"{prov} - {prop*100}%")

print("\nNOTA: Hay pocas personas registradas respecto a la poblacion de las provicias, por eso los numeros tan chicos")

Provincia - Proporción registrada
Tucumán - 0.0007999999999999999%
Neuquén - 0.0019000000000000002%
La Rioja - 0.0042%
Córdoba - 0.00039999999999999996%
Corrientes - 0.0017%
San Luis - 0.0028%
Chubut - 0.0019000000000000002%
Santa Fe - 0.0005%
Santiago del Estero - 0.001%
Formosa - 0.003%
La Pampa - 0.004699999999999999%
Buenos Aires - 9.999999999999999e-05%
Salta - 0.0007999999999999999%
Río Negro - 0.0024000000000000002%
Catamarca - 0.0033000000000000004%
CABA - 0.0005%
Entre Ríos - 0.0007%
San Juan - 0.0021%
Chaco - 0.0015999999999999999%
Mendoza - 0.0007999999999999999%
Misiones - 0.0014%
Jujuy - 0.0015%
Santa Cruz - 0.0059%

NOTA: Hay pocas personas registradas respecto a la poblacion de las provicias, por eso los numeros tan chicos




---


# 3. Spark


### Cargamos las tablas

In [None]:
import requests
path = 'https://raw.githubusercontent.com/Dafydd8/TP2-TDVII/main/data/csvs/problema.csv'
req = requests.get(path)
url_content = req.content

csv_file_name = 'problema.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df_problema = spark.read.csv(csv_file_name, header=True, inferSchema=True)
df_problema.createOrReplaceTempView("problema")
# df_problema.printSchema()
df_problema.show(5)

+------------+---------+-----------------+--------------------+
|cod_problema|cod_refes|             tipo|         descripcion|
+------------+---------+-----------------+--------------------+
|           1|       11|  Infraestructura|Techo con filtrac...|
|           2|       21|  Infraestructura|Baños en mal esta...|
|           3|        3|     Conectividad|Internet inestabl...|
|           4|       14|FaltaDispositivos|Falta de termómet...|
|           5|        9|     Conectividad|No hay señal Wi-F...|
+------------+---------+-----------------+--------------------+
only showing top 5 rows



In [None]:
path = 'https://raw.githubusercontent.com/Dafydd8/TP2-TDVII/main/data/csvs/efectortercernivel.csv'
req = requests.get(path)
url_content = req.content

csv_file_name = 'efectortercernivel.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df_efectortercernivel = spark.read.csv(csv_file_name, header=True, inferSchema=True)

df_efectortercernivel.createOrReplaceTempView("efectortercernivel")
df_efectortercernivel.show(5)

+---------+--------------------+
|cod_refes|                tipo|
+---------+--------------------+
|        3|Instituto Especia...|
|        8|     Alta Tecnología|
|        9|     Alta Tecnología|
|       14|     Alta Tecnología|
|       15|Instituto Especia...|
+---------+--------------------+
only showing top 5 rows



In [None]:
path = 'https://raw.githubusercontent.com/Dafydd8/TP2-TDVII/main/data/csvs/efectordesalud.csv'
req = requests.get(path)
url_content = req.content

csv_file_name = 'efectordesalud.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df_efectordesalud = spark.read.csv(csv_file_name, header=True, inferSchema=True)

df_efectordesalud.createOrReplaceTempView("efectordesalud")
df_efectordesalud.show(5)

+---------+--------------------+-------+-------------+
|cod_refes|              nombre|id_prov|        nivel|
+---------+--------------------+-------+-------------+
|        1|  Centro Médico Roig|      1| Primer Nivel|
|        2|Posta Sanitaria C...|      2| Primer Nivel|
|        3|Posta Sanitaria T...|      2| Tercer Nivel|
|        4|Posta Sanitaria M...|      3|Segundo Nivel|
|        5|    Hospital Benítez|      3|Segundo Nivel|
+---------+--------------------+-------+-------------+
only showing top 5 rows



In [None]:
path = 'https://raw.githubusercontent.com/Dafydd8/TP2-TDVII/main/data/csvs/provincia.csv'
req = requests.get(path)
url_content = req.content

csv_file_name = 'provincia.csv'
csv_file = open(csv_file_name, 'wb')

csv_file.write(url_content)
csv_file.close()

df_prov = spark.read.csv(csv_file_name, header=True, inferSchema=True)

df_prov.createOrReplaceTempView("provincia")
df_prov.show(5)

+-------+------------+---------------+
|id_prov|      nombre|poblacion_total|
+-------+------------+---------------+
|      1|Buenos Aires|       17523996|
|      2|        CABA|        3121707|
|      3|     Córdoba|        3840905|
|      4|    Santa Fe|        3544908|
|      5|     Mendoza|        2014533|
+-------+------------+---------------+
only showing top 5 rows



### **Consultas**

In [None]:
query = "\
  select *\
  from problema\
"

res = spark.sql(query)
res.show(5)

+------------+---------+-----------------+--------------------+
|cod_problema|cod_refes|             tipo|         descripcion|
+------------+---------+-----------------+--------------------+
|           1|       11|  Infraestructura|Techo con filtrac...|
|           2|       21|  Infraestructura|Baños en mal esta...|
|           3|        3|     Conectividad|Internet inestabl...|
|           4|       14|FaltaDispositivos|Falta de termómet...|
|           5|        9|     Conectividad|No hay señal Wi-F...|
+------------+---------+-----------------+--------------------+
only showing top 5 rows



#### 1. Cantidad de efectores de salud que tienen problemas por nivel.

In [None]:
query = """
  SELECT e.nivel, COUNT(DISTINCT e.cod_refes) as cant_efectores_con_problemas
  FROM efectordesalud e
    JOIN problema p ON e.cod_refes = p.cod_refes
  GROUP BY e.nivel
  ORDER BY cant_efectores_con_problemas DESC
"""
result = spark.sql(query)
result.show()


+-------------+----------------------------+
|        nivel|cant_efectores_con_problemas|
+-------------+----------------------------+
|Segundo Nivel|                           7|
| Primer Nivel|                           4|
| Tercer Nivel|                           3|
+-------------+----------------------------+



#### 2. Cantidad de problemas de cada tipo por provincia

In [None]:
query = """
  SELECT
    pr.nombre provincia
    , p.tipo
    , COUNT(*) cantidad
  FROM problema p
    JOIN efectordesalud e ON p.cod_refes = e.cod_refes
      JOIN provincia pr ON e.id_prov = pr.id_prov
  GROUP BY pr.nombre, p.tipo
  ORDER BY provincia, cantidad DESC
"""
result = spark.sql(query)
result.show()

+------------+-----------------+--------+
|   provincia|             tipo|cantidad|
+------------+-----------------+--------+
|Buenos Aires|     Conectividad|       3|
|Buenos Aires|  Infraestructura|       2|
|        CABA|     Conectividad|       1|
|     Córdoba|  Infraestructura|       2|
|  Entre Ríos|FaltaDispositivos|       1|
|  Entre Ríos|  Infraestructura|       1|
|     Mendoza|  Infraestructura|       1|
|       Salta|FaltaDispositivos|       2|
|       Salta|     Conectividad|       1|
|    Santa Fe|FaltaDispositivos|       1|
|     Tucumán|     Conectividad|       2|
|     Tucumán|  Infraestructura|       1|
|     Tucumán|FaltaDispositivos|       1|
+------------+-----------------+--------+



#### 3. Cantidad de problemas por provincia, por habitante

In [None]:
query = """
  SELECT pr.nombre provincia,
        COUNT(p.cod_problema) cantidad_problemas,
        pr.poblacion_total,
        (ROUND(COUNT(p.cod_problema) / pr.poblacion_total, 6)) problemas_por_habitante
  FROM problema p
  JOIN efectordesalud e ON p.cod_refes = e.cod_refes
  JOIN provincia pr ON e.id_prov = pr.id_prov
  GROUP BY pr.nombre, pr.poblacion_total
  ORDER BY problemas_por_habitante DESC
"""
result = spark.sql(query)
result.show()

+------------+------------------+---------------+-----------------------+
|   provincia|cantidad_problemas|poblacion_total|problemas_por_habitante|
+------------+------------------+---------------+-----------------------+
|       Salta|                 3|        1440672|                 2.0E-6|
|     Tucumán|                 4|        1703186|                 2.0E-6|
|  Entre Ríos|                 2|        1385961|                 1.0E-6|
|     Córdoba|                 2|        3840905|                 1.0E-6|
|    Santa Fe|                 1|        3544908|                    0.0|
|Buenos Aires|                 5|       17523996|                    0.0|
|        CABA|                 1|        3121707|                    0.0|
|     Mendoza|                 1|        2014533|                    0.0|
+------------+------------------+---------------+-----------------------+

