In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q -O  /opt/spark-3.3.2-bin-hadoop2.tgz https://dlcdn.apache.org/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz
!tar xf /opt/spark-3.3.2-bin-hadoop2.tgz

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.3.2-bin-hadoop2'
!export JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64"
!export SPARK_HOME="/content/spark-3.3.2-bin-hadoop2"

In [6]:
!pip install -q findspark
import findspark
findspark.init()
findspark.find()

'/content/spark-3.3.2-bin-hadoop2'

In [10]:
from pyspark.sql import SparkSession

In [11]:
spark=SparkSession.builder.appName('data_processing').getOrCreate()

In [7]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [16]:
df=spark.read.csv('/content/drive/MyDrive/covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [17]:
#2.1 columnas
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [18]:
# 2.2 tipos de datos
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [19]:
#   2.3 seleccionar algunas columnas
df.select('ID de caso','Nombre departamento','Edad').show(10)

+----------+-------------------+----+
|ID de caso|Nombre departamento|Edad|
+----------+-------------------+----+
|         1|             BOGOTA|  19|
|         2|              VALLE|  34|
|         3|          ANTIOQUIA|  50|
|         4|          ANTIOQUIA|  55|
|         5|          ANTIOQUIA|  25|
|         6|          ANTIOQUIA|  27|
|         7|          CARTAGENA|  85|
|         8|             BOGOTA|  22|
|         9|             BOGOTA|  28|
|        10|             BOGOTA|  36|
+----------+-------------------+----+
only showing top 10 rows



In [20]:
# 2.4 Renombrar columnas
df =df.withColumnRenamed("fecha reporte web", "fecha_reporte")
df =df.withColumnRenamed("ID de caso", "id")
df =df.withColumnRenamed("Fecha de notificación", "fecha_notificacion")
df =df.withColumnRenamed("Código DIVIPOLA departamento", "codigo_departamento")
df =df.withColumnRenamed("Nombre departamento", "departamento")
df =df.withColumnRenamed("Código DIVIPOLA municipio", "codigo_municipio")
df =df.withColumnRenamed("Nombre municipio", "municipio")
df =df.withColumnRenamed("Edad", "edad")
df =df.withColumnRenamed("Sexo", "sexo")
df =df.withColumnRenamed("Tipo de contagio", "tipo_contagio")
df =df.withColumnRenamed("Ubicación del caso", "ubicacion_caso")
df =df.withColumnRenamed("Estado", "estado")
df =df.withColumnRenamed("Código ISO del país", "codigo_pais")
df =df.withColumnRenamed("Nombre del país", "nombre_pais")
df =df.withColumnRenamed("Recuperado", "recuperado")
df =df.withColumnRenamed("Fecha de inicio de síntomas", "fecha_inicio_sintomas")
df =df.withColumnRenamed("Fecha de muerte", "fecha_muerte")
df =df.withColumnRenamed("Fecha de diagnóstico", "fecha_diagnostico")
df =df.withColumnRenamed("Fecha de recuperación", "fecha_recuperacion")
df =df.withColumnRenamed("Fecha de recuperación", "fecha_recuperacion")
df =df.withColumnRenamed("Tipo de recuperación", "tipo_recuperacion")
df =df.withColumnRenamed("Pertenencia étnica", "pertenencia_etnica")
df =df.withColumnRenamed("Nombre del grupo étnico", "nombre_etnia")
df.columns

['fecha_reporte',
 'id',
 'fecha_notificacion',
 'codigo_departamento',
 'departamento',
 'codigo_municipio',
 'municipio',
 'edad',
 'Unidad de medida de edad',
 'sexo',
 'tipo_contagio',
 'ubicacion_caso',
 'estado',
 'codigo_pais',
 'nombre_pais',
 'recuperado',
 'fecha_inicio_sintomas',
 'fecha_muerte',
 'fecha_diagnostico',
 'fecha_recuperacion',
 'tipo_recuperacion',
 'pertenencia_etnica',
 'nombre_etnia']

In [21]:
# 2.5 Agregar columnas
from pyspark.sql.functions import concat, col, lit
df = df.withColumn("ubicacion_completa", concat(col("departamento"), lit(", "), col("municipio")))
df.select('ubicacion_completa','departamento','municipio').show(5)

+-------------------+------------+---------+
| ubicacion_completa|departamento|municipio|
+-------------------+------------+---------+
|     BOGOTA, BOGOTA|      BOGOTA|   BOGOTA|
|        VALLE, BUGA|       VALLE|     BUGA|
|ANTIOQUIA, MEDELLIN|   ANTIOQUIA| MEDELLIN|
|ANTIOQUIA, MEDELLIN|   ANTIOQUIA| MEDELLIN|
|ANTIOQUIA, MEDELLIN|   ANTIOQUIA| MEDELLIN|
+-------------------+------------+---------+
only showing top 5 rows



In [22]:
# 2.6 borrar columnas
df = df.drop("pertenencia_etnica","nombre_etnia")
df.columns

['fecha_reporte',
 'id',
 'fecha_notificacion',
 'codigo_departamento',
 'departamento',
 'codigo_municipio',
 'municipio',
 'edad',
 'Unidad de medida de edad',
 'sexo',
 'tipo_contagio',
 'ubicacion_caso',
 'estado',
 'codigo_pais',
 'nombre_pais',
 'recuperado',
 'fecha_inicio_sintomas',
 'fecha_muerte',
 'fecha_diagnostico',
 'fecha_recuperacion',
 'tipo_recuperacion',
 'ubicacion_completa']

In [23]:
# 2.7 Filtrar datos
df.select('id','edad').filter(df['edad'] >50).show(10)

+---+----+
| id|edad|
+---+----+
|  4|  55|
|  7|  85|
| 12|  74|
| 13|  68|
| 16|  61|
| 17|  73|
| 18|  54|
| 19|  54|
| 29|  55|
| 31|  65|
+---+----+
only showing top 10 rows



In [24]:
df.select('id','departamento').filter(df['departamento'] == 'ANTIOQUIA').show(5)

+---+------------+
| id|departamento|
+---+------------+
|  3|   ANTIOQUIA|
|  4|   ANTIOQUIA|
|  5|   ANTIOQUIA|
|  6|   ANTIOQUIA|
| 20|   ANTIOQUIA|
+---+------------+
only showing top 5 rows



In [25]:
# 2.8 ejecutar alguna función UDF o lambda sobre alguna columna creando una nueva.
from pyspark.sql.functions import udf
fun_lambda = udf(lambda column_value: column_value * 2)
df = df.withColumn("doubled_age", fun_lambda(df["edad"]))
df.select('edad','doubled_age').show(5)

+----+-----------+
|edad|doubled_age|
+----+-----------+
|  19|         38|
|  34|         68|
|  50|        100|
|  55|        110|
|  25|         50|
+----+-----------+
only showing top 5 rows



In [None]:
# 3. contestar las siguientes preguntas de negocio sobre los datos de covid:

In [None]:
#    3.1 Los 10 departamentos con más casos de covid en Colombia ordenados de mayor a menor.

In [26]:
# Dataframes
from pyspark.sql.functions import desc
departamentos_casos = df.groupBy("departamento").count()
departamentos_casos = departamentos_casos.orderBy(desc("count"))
top_10_departamentos = departamentos_casos.limit(10)
top_10_departamentos.show()

+------------+-----+
|departamento|count|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   ATLANTICO|10994|
|       VALLE|10404|
|   CARTAGENA| 8333|
|   ANTIOQUIA| 4554|
|      NARIÑO| 3520|
|CUNDINAMARCA| 2827|
|    AMAZONAS| 2317|
|       CHOCO| 1636|
+------------+-----+



In [27]:
# SparkSQL
df.createOrReplaceTempView("covid_data")
top_10_departamentos = spark.sql("SELECT departamento, COUNT(*) AS casos FROM covid_data GROUP BY departamento ORDER BY casos DESC LIMIT 10")
top_10_departamentos.show()

+------------+-----+
|departamento|casos|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   ATLANTICO|10994|
|       VALLE|10404|
|   CARTAGENA| 8333|
|   ANTIOQUIA| 4554|
|      NARIÑO| 3520|
|CUNDINAMARCA| 2827|
|    AMAZONAS| 2317|
|       CHOCO| 1636|
+------------+-----+



In [None]:
#      3.2 Las 10 ciudades con más casos de covid en Colombia ordenados de mayor a menor.

In [28]:
# Dataframes
ciudades_casos = df.groupBy("municipio").count()
ciudades_casos = ciudades_casos.orderBy(desc("count"))
top_10_ciudades = ciudades_casos.limit(10)
top_10_ciudades.show()

+------------+-----+
|   municipio|count|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   CARTAGENA| 8333|
|        CALI| 7747|
|     SOLEDAD| 6233|
|     LETICIA| 2194|
|    MEDELLIN| 2137|
|      TUMACO| 1501|
|BUENAVENTURA| 1453|
|      QUIBDO| 1367|
+------------+-----+



In [29]:
# SparkSQL
top_10_ciudades = spark.sql("SELECT municipio, COUNT(*) AS casos FROM covid_data GROUP BY municipio ORDER BY casos DESC LIMIT 10")
top_10_ciudades.show()

+------------+-----+
|   municipio|casos|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   CARTAGENA| 8333|
|        CALI| 7747|
|     SOLEDAD| 6233|
|     LETICIA| 2194|
|    MEDELLIN| 2137|
|      TUMACO| 1501|
|BUENAVENTURA| 1453|
|      QUIBDO| 1367|
+------------+-----+



In [None]:
#        3.3 Los 10 días con más casos de COVID-19 en Colombia ordenados de mayor a menor.

In [30]:
# Dataframes
dias_casos = df.groupBy("fecha_reporte").count()
dias_casos = dias_casos.orderBy(desc("count"))
top_10_dias = dias_casos.limit(10)
top_10_dias.show()

+-----------------+-----+
|    fecha_reporte|count|
+-----------------+-----+
|27/6/2020 0:00:00| 4149|
|26/6/2020 0:00:00| 3843|
|24/6/2020 0:00:00| 3541|
|25/6/2020 0:00:00| 3486|
|29/6/2020 0:00:00| 3274|
|28/6/2020 0:00:00| 3178|
|18/6/2020 0:00:00| 3171|
|19/6/2020 0:00:00| 3059|
|21/6/2020 0:00:00| 3019|
|30/6/2020 0:00:00| 2803|
+-----------------+-----+



In [31]:
#SparkSQL
top_10_dias = spark.sql("SELECT fecha_reporte, COUNT(*) AS casos FROM covid_data GROUP BY fecha_reporte ORDER BY casos DESC LIMIT 10")
top_10_dias.show()

+-----------------+-----+
|    fecha_reporte|casos|
+-----------------+-----+
|27/6/2020 0:00:00| 4149|
|26/6/2020 0:00:00| 3843|
|24/6/2020 0:00:00| 3541|
|25/6/2020 0:00:00| 3486|
|29/6/2020 0:00:00| 3274|
|28/6/2020 0:00:00| 3178|
|18/6/2020 0:00:00| 3171|
|19/6/2020 0:00:00| 3059|
|21/6/2020 0:00:00| 3019|
|30/6/2020 0:00:00| 2803|
+-----------------+-----+



In [None]:
#    3.4 Distribución de casos por edades de COVID-19 en Colombia.

In [32]:
# Dataframes
edades_casos = df.groupBy("edad").count()
edades_casos = edades_casos.orderBy(desc("count"))
edades_casos.show()

+----+-----+
|edad|count|
+----+-----+
|  30| 2735|
|  29| 2611|
|  31| 2569|
|  28| 2540|
|  27| 2494|
|  26| 2436|
|  33| 2371|
|  32| 2362|
|  25| 2335|
|  34| 2310|
|  35| 2292|
|  24| 2214|
|  36| 2175|
|  37| 2132|
|  38| 2098|
|  40| 2050|
|  23| 2041|
|  39| 1985|
|  22| 1879|
|  41| 1783|
+----+-----+
only showing top 20 rows



In [33]:
# SparkSQL
edades_casos = spark.sql("SELECT edad, COUNT(*) AS casos FROM covid_data GROUP BY edad ORDER BY casos DESC")
edades_casos.show()

+----+-----+
|edad|casos|
+----+-----+
|  30| 2735|
|  29| 2611|
|  31| 2569|
|  28| 2540|
|  27| 2494|
|  26| 2436|
|  33| 2371|
|  32| 2362|
|  25| 2335|
|  34| 2310|
|  35| 2292|
|  24| 2214|
|  36| 2175|
|  37| 2132|
|  38| 2098|
|  40| 2050|
|  23| 2041|
|  39| 1985|
|  22| 1879|
|  41| 1783|
+----+-----+
only showing top 20 rows



In [None]:
#  3.5 Realice la pregunta de negocio que quiera sobre los datos y responda con la correspondiente programación en Spark.
## Pregunta de negocio: Top 10 departamentos con mayor numero de fallecidos

In [34]:
#Dataframes
fallecidos_departamentos = df.filter(df['estado'] == 'Fallecido')
departamentos_fallecidos = fallecidos_departamentos.groupBy("departamento").count()
departamentos_fallecidos = departamentos_fallecidos.orderBy(desc("count"))
top_10_departamentos_fallecidos = departamentos_fallecidos.limit(10)
top_10_departamentos_fallecidos.show()

+------------+-----+
|departamento|count|
+------------+-----+
|      BOGOTA|  962|
|BARRANQUILLA|  926|
|   ATLANTICO|  662|
|       VALLE|  565|
|   CARTAGENA|  392|
|      NARIÑO|  137|
|       SUCRE|  103|
|   MAGDALENA|  100|
|       CHOCO|   93|
|    AMAZONAS|   93|
+------------+-----+



In [35]:
#SparkSQL
df.createOrReplaceTempView("covid_data")
top_10_departamentos_fallecidos = spark.sql("SELECT departamento, COUNT(*) AS fallecidos FROM covid_data WHERE estado = 'Fallecido' GROUP BY departamento ORDER BY fallecidos DESC LIMIT 10")
top_10_departamentos_fallecidos.show()

+------------+----------+
|departamento|fallecidos|
+------------+----------+
|      BOGOTA|       962|
|BARRANQUILLA|       926|
|   ATLANTICO|       662|
|       VALLE|       565|
|   CARTAGENA|       392|
|      NARIÑO|       137|
|       SUCRE|       103|
|   MAGDALENA|       100|
|       CHOCO|        93|
|    AMAZONAS|        93|
+------------+----------+



In [37]:
# 4. salve los datos del numeral 3 en Drive
# Guardar los resultados en formato CSV en Google Drive
ruta_destino = '/content/drive/MyDrive/resultados/'
top_10_departamentos.coalesce(1).write.csv(ruta_destino + 'top_10_departamentos.csv', mode='overwrite')
top_10_ciudades.coalesce(1).write.csv(ruta_destino + 'top_10_ciudades.csv', mode='overwrite')
top_10_dias.coalesce(1).write.csv(ruta_destino + 'top_10_dias.csv', mode='overwrite')
edades_casos.coalesce(1).write.csv(ruta_destino + 'distribucion_edades.csv', mode='overwrite')
top_10_departamentos_fallecidos.coalesce(1).write.csv(ruta_destino + 'top_10_departamentos_fallecidos.csv', mode='overwrite')