In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=a8ee7a6f894d3da9d08209052ad458c417da4dedd6426458721374990663b267
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.functions import desc
from pyspark.sql.functions import udf
from google.colab import drive

In [3]:
spark=SparkSession.builder.appName('data_processing').getOrCreate()

In [4]:
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
df=spark.read.csv('/content/drive/My Drive/Covid19/Casos_positivos_de_COVID-19_en_Colombia-100K.csv',inferSchema=True,header=True)

In [6]:
df.columns

['fecha reporte web',
 'ID de caso',
 'Fecha de notificación',
 'Código DIVIPOLA departamento',
 'Nombre departamento',
 'Código DIVIPOLA municipio',
 'Nombre municipio',
 'Edad',
 'Unidad de medida de edad',
 'Sexo',
 'Tipo de contagio',
 'Ubicación del caso',
 'Estado',
 'Código ISO del país',
 'Nombre del país',
 'Recuperado',
 'Fecha de inicio de síntomas',
 'Fecha de muerte',
 'Fecha de diagnóstico',
 'Fecha de recuperación',
 'Tipo de recuperación',
 'Pertenencia étnica',
 'Nombre del grupo étnico']

In [7]:
df.printSchema()

root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable = true)
 |-- Recuperado: string (nullable = true)
 |-- Fecha de inicio de síntomas: string (nullable = true)
 |-- Fecha de muerte: string (nullable = true)
 |-- Fecha de diagnóstico: string (nullable = true)
 |-- Fecha de recuperación: string (nullable = true)
 |-- Tipo de r

In [8]:
df.select('Nombre municipio','sexo','Edad').show(15)

+----------------+----+----+
|Nombre municipio|sexo|Edad|
+----------------+----+----+
|          BOGOTA|   F|  19|
|            BUGA|   M|  34|
|        MEDELLIN|   F|  50|
|        MEDELLIN|   M|  55|
|        MEDELLIN|   M|  25|
|          ITAGUI|   F|  27|
|       CARTAGENA|   F|  85|
|          BOGOTA|   F|  22|
|          BOGOTA|   F|  28|
|          BOGOTA|   F|  36|
|          BOGOTA|   F|  42|
|           NEIVA|   F|  74|
|           NEIVA|   F|  68|
|         PALMIRA|   M|  48|
|   VILLAVICENCIO|   F|  30|
+----------------+----+----+
only showing top 15 rows



In [9]:
df =df.withColumnRenamed("fecha reporte web", "fecha_reporte_web")
df =df.withColumnRenamed("ID de caso", "id_caso")
df =df.withColumnRenamed("Fecha de notificación", "fecha_notificacion")
df =df.withColumnRenamed("Código DIVIPOLA departamento", "codigo_departamento")
df =df.withColumnRenamed("Nombre departamento", "departamento")
df =df.withColumnRenamed("Código DIVIPOLA municipio", "codigo_municipio")
df =df.withColumnRenamed("Nombre municipio", "municipio")
df =df.withColumnRenamed("Edad", "edad")
df =df.withColumnRenamed("Sexo", "sexo")
df =df.withColumnRenamed("Tipo de contagio", "tipo_contagio")
df =df.withColumnRenamed("Ubicación del caso", "ubicacion_caso")
df =df.withColumnRenamed("Estado", "estado")
df =df.withColumnRenamed("Código ISO del país", "codigo_pais")
df =df.withColumnRenamed("Nombre del país", "nombre_pais")
df =df.withColumnRenamed("Recuperado", "recuperado")
df =df.withColumnRenamed("Fecha de inicio de síntomas", "fecha_inicio_sintomas")
df =df.withColumnRenamed("Fecha de muerte", "fecha_muerte")
df =df.withColumnRenamed("Fecha de diagnóstico", "fecha_diagnostico")
df =df.withColumnRenamed("Fecha de recuperación", "fecha_recuperacion")
df =df.withColumnRenamed("Tipo de recuperación", "tipo_recuperacion")
df =df.withColumnRenamed("Pertenencia étnica", "pertenencia_etnica")
df =df.withColumnRenamed("Nombre del grupo étnico", "nombre_etnia")

In [10]:
df = df.withColumn("datos_basicos", concat(col("id_caso"), lit(", "), col("edad"), lit(", "), col("sexo")))
df.select("datos_basicos").show(3)

+-------------+
|datos_basicos|
+-------------+
|     1, 19, F|
|     2, 34, M|
|     3, 50, F|
+-------------+
only showing top 3 rows



In [11]:
df = df.drop("codigo_departamento","codigo_municipio")
df.columns

['fecha_reporte_web',
 'id_caso',
 'fecha_notificacion',
 'departamento',
 'municipio',
 'edad',
 'Unidad de medida de edad',
 'sexo',
 'tipo_contagio',
 'ubicacion_caso',
 'estado',
 'codigo_pais',
 'nombre_pais',
 'recuperado',
 'fecha_inicio_sintomas',
 'fecha_muerte',
 'fecha_diagnostico',
 'fecha_recuperacion',
 'tipo_recuperacion',
 'pertenencia_etnica',
 'nombre_etnia',
 'datos_basicos']

In [12]:
df.select("id_caso", "sexo","fecha_inicio_sintomas","fecha_diagnostico").filter(df["sexo"] == "M").show(5)

+-------+----+---------------------+-----------------+
|id_caso|sexo|fecha_inicio_sintomas|fecha_diagnostico|
+-------+----+---------------------+-----------------+
|      2|   M|     4/3/2020 0:00:00| 9/3/2020 0:00:00|
|      4|   M|     6/3/2020 0:00:00|11/3/2020 0:00:00|
|      5|   M|     8/3/2020 0:00:00|11/3/2020 0:00:00|
|     14|   M|     7/3/2020 0:00:00|13/3/2020 0:00:00|
|     18|   M|     7/3/2020 0:00:00|14/3/2020 0:00:00|
+-------+----+---------------------+-----------------+
only showing top 5 rows



In [13]:
lambda_function = udf(lambda column_value: "sexo: " + column_value)
df = df.withColumn("tag", lambda_function(df["sexo"]))
df.select('sexo','tag').show(5)

+----+-------+
|sexo|    tag|
+----+-------+
|   F|sexo: F|
|   M|sexo: M|
|   F|sexo: F|
|   M|sexo: M|
|   M|sexo: M|
+----+-------+
only showing top 5 rows



In [14]:
departamentos = df.groupBy("departamento").count().orderBy(desc("count")).limit(10)
departamentos.show()

+------------+-----+
|departamento|count|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   ATLANTICO|10994|
|       VALLE|10404|
|   CARTAGENA| 8333|
|   ANTIOQUIA| 4554|
|      NARIÑO| 3520|
|CUNDINAMARCA| 2827|
|    AMAZONAS| 2317|
|       CHOCO| 1636|
+------------+-----+



In [15]:
df.createOrReplaceTempView("data")
departamentos = spark.sql("SELECT departamento, COUNT(*) AS casos FROM data GROUP BY departamento ORDER BY casos DESC LIMIT 10")
departamentos.show()

+------------+-----+
|departamento|casos|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   ATLANTICO|10994|
|       VALLE|10404|
|   CARTAGENA| 8333|
|   ANTIOQUIA| 4554|
|      NARIÑO| 3520|
|CUNDINAMARCA| 2827|
|    AMAZONAS| 2317|
|       CHOCO| 1636|
+------------+-----+



In [16]:
municipios = df.groupBy("municipio").count().orderBy(desc("count")).limit(10)
municipios.show()

+------------+-----+
|   municipio|count|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   CARTAGENA| 8333|
|        CALI| 7747|
|     SOLEDAD| 6233|
|     LETICIA| 2194|
|    MEDELLIN| 2137|
|      TUMACO| 1501|
|BUENAVENTURA| 1453|
|      QUIBDO| 1367|
+------------+-----+



In [17]:
municipios = spark.sql("SELECT municipio, COUNT(*) AS casos FROM data GROUP BY municipio ORDER BY casos DESC LIMIT 10")
municipios.show()

+------------+-----+
|   municipio|casos|
+------------+-----+
|      BOGOTA|30016|
|BARRANQUILLA|13065|
|   CARTAGENA| 8333|
|        CALI| 7747|
|     SOLEDAD| 6233|
|     LETICIA| 2194|
|    MEDELLIN| 2137|
|      TUMACO| 1501|
|BUENAVENTURA| 1453|
|      QUIBDO| 1367|
+------------+-----+



In [18]:
dias = df.groupBy("fecha_reporte_web").count().orderBy(desc("count")).limit(10)
dias.show()

+-----------------+-----+
|fecha_reporte_web|count|
+-----------------+-----+
|27/6/2020 0:00:00| 4149|
|26/6/2020 0:00:00| 3843|
|24/6/2020 0:00:00| 3541|
|25/6/2020 0:00:00| 3486|
|29/6/2020 0:00:00| 3274|
|28/6/2020 0:00:00| 3178|
|18/6/2020 0:00:00| 3171|
|19/6/2020 0:00:00| 3059|
|21/6/2020 0:00:00| 3019|
|30/6/2020 0:00:00| 2803|
+-----------------+-----+



In [19]:
dias = spark.sql("SELECT fecha_reporte_web as Fecha, COUNT(*) AS Casos FROM data GROUP BY fecha_reporte_web ORDER BY casos DESC LIMIT 10")
dias.show()

+-----------------+-----+
|            Fecha|Casos|
+-----------------+-----+
|27/6/2020 0:00:00| 4149|
|26/6/2020 0:00:00| 3843|
|24/6/2020 0:00:00| 3541|
|25/6/2020 0:00:00| 3486|
|29/6/2020 0:00:00| 3274|
|28/6/2020 0:00:00| 3178|
|18/6/2020 0:00:00| 3171|
|19/6/2020 0:00:00| 3059|
|21/6/2020 0:00:00| 3019|
|30/6/2020 0:00:00| 2803|
+-----------------+-----+



In [20]:
edad = df.groupBy("edad").count().orderBy(desc("count")).limit(10)
edad.show()

+----+-----+
|edad|count|
+----+-----+
|  30| 2735|
|  29| 2611|
|  31| 2569|
|  28| 2540|
|  27| 2494|
|  26| 2436|
|  33| 2371|
|  32| 2362|
|  25| 2335|
|  34| 2310|
+----+-----+



In [21]:
edad = spark.sql("SELECT edad, COUNT(*) AS casos FROM data GROUP BY edad ORDER BY casos DESC LIMIT 10")
edad.show()

+----+-----+
|edad|casos|
+----+-----+
|  30| 2735|
|  29| 2611|
|  31| 2569|
|  28| 2540|
|  27| 2494|
|  26| 2436|
|  33| 2371|
|  32| 2362|
|  25| 2335|
|  34| 2310|
+----+-----+



In [22]:
etnia = df.groupBy("nombre_etnia").count().orderBy(desc("count")).limit(10)
etnia.show()

+------------+-----+
|nombre_etnia|count|
+------------+-----+
|        null|94342|
| Por definir| 3603|
|      MOKANA|  373|
|      TIKUNA|  357|
|        ZENU|  273|
|       PASTO|  218|
|       PIJAO|  116|
|MURUI_UITOTO|   96|
|   NASA_PAÉZ|   82|
|       WAYUU|   62|
+------------+-----+



In [24]:
etnia = spark.sql("SELECT nombre_etnia as Etnia, COUNT(*) AS casos FROM data GROUP BY nombre_etnia ORDER BY casos DESC LIMIT 10")
etnia.show()

+------------+-----+
|       Etnia|casos|
+------------+-----+
|        null|94342|
| Por definir| 3603|
|      MOKANA|  373|
|      TIKUNA|  357|
|        ZENU|  273|
|       PASTO|  218|
|       PIJAO|  116|
|MURUI_UITOTO|   96|
|   NASA_PAÉZ|   82|
|       WAYUU|   62|
+------------+-----+



In [29]:
departamentos.write.csv("/content/drive/My Drive/Covid19/Resultados/departamentos.csv")
municipios.write.csv("/content/drive/My Drive/Covid19/Resultados/municipios.csv")
dias.write.csv("/content/drive/My Drive/Covid19/Resultados/dias.csv")
edad.write.csv("/content/drive/My Drive/Covid19/Resultados/edad.csv")
etnia.write.csv("/content/drive/My Drive/Covid19/Resultados/etnia.csv")