In [13]:
# Instalar Java y Spark

!apt-get install openjdk-17-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-4.0.1/spark-4.0.1-bin-hadoop3.tgz
!tar xf spark-4.0.1-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-4.0.1-bin-hadoop3"

In [14]:
!wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar
!wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.262/aws-java-sdk-bundle-1.12.262.jar

In [15]:
import findspark
findspark.init()

In [16]:
# Montar Google Drive

from google.colab import drive
drive.mount('/content/drive')

# Ruta en Drive

drive_csv_path = '/content/drive/MyDrive/st0263-252/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-1K.csv'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [17]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('Colab-PySpark-S3-Drive-covid') \
    .config('spark.driver.memory','4g') \
    .config('spark.hadoop.fs.s3a.impl','org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider','org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .config('spark.hadoop.fs.s3a.multipart.size','104857600') \
    .config('spark.hadoop.fs.s3a.fast.upload','true') \
    .config('spark.hadoop.fs.s3a.path.style.access','true') \
    .getOrCreate()

print('Spark session creada:', spark)

Spark session creada: <pyspark.sql.session.SparkSession object at 0x7da7fc574b60>


In [19]:
# Rutas y lectura de CSV
s3_path = 's3://gmcalleh-datalake/datasets/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-1K.csv'
drive_path = drive_csv_path

print('S3:', s3_path)
print('Drive:', drive_path)

# Leer desde S3
try:
    df_s3 = spark.read.csv(s3_path, header=True, inferSchema=True)
    print('Leído desde S3 — filas:', df_s3.count())
except Exception as e:
    print('Error leyendo desde S3')

# Leer desde Drive
try:
    df_drive = spark.read.csv(drive_path, header=True, inferSchema=True)
    print('Leído desde Drive — filas:', df_drive.count())
except Exception as e:
    print('Error leyendo desde Drive:', e)

# tocó trabajar con la versión leída desde drive porque esa si está disponible
df = df_drive

print('Schema:')
df.printSchema()
df.show(5)


S3: s3://gmcalleh-datalake/datasets/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-1K.csv
Drive: /content/drive/MyDrive/st0263-252/bigdata/datasets/covid19/Casos_positivos_de_COVID-19_en_Colombia-1K.csv
Error leyendo desde S3
Leído desde Drive — filas: 1000
Schema:
root
 |-- fecha reporte web: string (nullable = true)
 |-- ID de caso: integer (nullable = true)
 |-- Fecha de notificación: string (nullable = true)
 |-- Código DIVIPOLA departamento: integer (nullable = true)
 |-- Nombre departamento: string (nullable = true)
 |-- Código DIVIPOLA municipio: integer (nullable = true)
 |-- Nombre municipio: string (nullable = true)
 |-- Edad: integer (nullable = true)
 |-- Unidad de medida de edad: integer (nullable = true)
 |-- Sexo: string (nullable = true)
 |-- Tipo de contagio: string (nullable = true)
 |-- Ubicación del caso: string (nullable = true)
 |-- Estado: string (nullable = true)
 |-- Código ISO del país: integer (nullable = true)
 |-- Nombre del país: string (nullable

In [31]:
# 8) SparkSQL: crear vista temporal y ejecutar las mismas consultas en SQL
# Registrar vista
view_name = 'covid'
df.createOrReplaceTempView(view_name)

print('Análisis de datos del Covid 19 en Colombia')

# Contar total de registros
print('Total de registros:')
spark.sql('SELECT count(*) as total FROM {}'.format(view_name)).show()

print('Departamentos')
sql_top_depart = spark.sql(f"""
SELECT `Nombre departamento`, COUNT(*) as total
FROM {view_name}
GROUP BY `Nombre departamento`
ORDER BY total DESC
LIMIT 10
""")
sql_top_depart.show(truncate=False)

print('Municipios')
sql_top_municipios = spark.sql(f"""
SELECT `Nombre municipio`, COUNT(*) as total
FROM {view_name}
GROUP BY `Nombre municipio`
ORDER BY total DESC
LIMIT 10
""")
sql_top_municipios.show(truncate=False)

print('Grupos de edad')
sql_age_dist = spark.sql(f"""
SELECT
  CASE
    WHEN Edad < 20 THEN '<20'
    WHEN Edad >= 20 AND Edad < 40 THEN '20-39'
    WHEN Edad >= 40 AND Edad < 60 THEN '40-59'
    WHEN Edad >= 60 AND Edad < 80 THEN '60-79'
    ELSE '80+'
  END as age_group,
  COUNT(*) as total
FROM {view_name}
WHERE Edad IS NOT NULL
GROUP BY age_group
ORDER BY age_group
""")
sql_age_dist.show()

print('Sexo')
sql_sex_dist = spark.sql(f"""
SELECT Sexo, COUNT(*) as total
FROM {view_name}
WHERE Sexo IS NOT NULL
GROUP BY Sexo
ORDER BY total DESC
""")
sql_sex_dist.show()

print('Estados')
sql_estado_dist = spark.sql(f"""
SELECT Estado, COUNT(*) as total
FROM {view_name}
WHERE Estado IS NOT NULL
GROUP BY Estado
ORDER BY total DESC
""")
sql_estado_dist.show()

print('Tipo de contagio')
sql_contagio_dist = spark.sql(f"""
SELECT `Tipo de contagio`, COUNT(*) as total
FROM {view_name}
WHERE `Tipo de contagio` IS NOT NULL
GROUP BY `Tipo de contagio`
ORDER BY total DESC
""")
sql_contagio_dist.show(truncate=False)

print('Ubicación del caso')
sql_ubicacion_dist = spark.sql(f"""
SELECT `Ubicación del caso`, COUNT(*) as total
FROM {view_name}
WHERE `Ubicación del caso` IS NOT NULL
GROUP BY `Ubicación del caso`
ORDER BY total DESC
""")
sql_ubicacion_dist.show()

print('Estado de recuperación')
sql_recuperado_dist = spark.sql(f"""
SELECT Recuperado, COUNT(*) as total
FROM {view_name}
WHERE Recuperado IS NOT NULL
GROUP BY Recuperado
ORDER BY total DESC
""")
sql_recuperado_dist.show()

print('Departamentos con estadísticas de recuperación')
sql_recuperados_depto = spark.sql(f"""
SELECT
  `Nombre departamento`,
  COUNT(*) as total_casos,
  SUM(CASE WHEN Recuperado = 'Recuperado' THEN 1 ELSE 0 END) as recuperados,
  ROUND(SUM(CASE WHEN Recuperado = 'Recuperado' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as porcentaje_recuperacion
FROM {view_name}
WHERE `Nombre departamento` IS NOT NULL
GROUP BY `Nombre departamento`
ORDER BY total_casos DESC
LIMIT 10
""")
sql_recuperados_depto.show(truncate=False)

print('Combinado: Departamento, Sexo y Grupo de Edad')
sql_combined = spark.sql(f"""
SELECT
  `Nombre departamento`,
  Sexo,
  CASE
    WHEN Edad < 20 THEN '<20'
    WHEN Edad >= 20 AND Edad < 40 THEN '20-39'
    WHEN Edad >= 40 AND Edad < 60 THEN '40-59'
    WHEN Edad >= 60 AND Edad < 80 THEN '60-79'
    ELSE '80+'
  END as age_group,
  COUNT(*) as total
FROM {view_name}
WHERE `Nombre departamento` IS NOT NULL
  AND Sexo IS NOT NULL
  AND Edad IS NOT NULL
GROUP BY `Nombre departamento`, Sexo, age_group
ORDER BY total DESC
LIMIT 15
""")
sql_combined.show(truncate=False)


Análisis de datos del Covid 19 en Colombia
Total de registros:
+-----+
|total|
+-----+
| 1000|
+-----+

Departamentos
+-------------------+-----+
|Nombre departamento|total|
+-------------------+-----+
|BOGOTA             |401  |
|VALLE              |148  |
|ANTIOQUIA          |106  |
|CUNDINAMARCA       |49   |
|CARTAGENA          |39   |
|RISARALDA          |34   |
|BARRANQUILLA       |31   |
|HUILA              |30   |
|QUINDIO            |23   |
|NORTE SANTANDER    |19   |
+-------------------+-----+

Municipios
+----------------+-----+
|Nombre municipio|total|
+----------------+-----+
|BOGOTA          |401  |
|CALI            |101  |
|MEDELLIN        |63   |
|CARTAGENA       |39   |
|BARRANQUILLA    |31   |
|NEIVA           |27   |
|PEREIRA         |25   |
|PALMIRA         |22   |
|VALLEDUPAR      |16   |
|ARMENIA         |15   |
+----------------+-----+

Grupos de edad
+---------+-----+
|age_group|total|
+---------+-----+
|    20-39|  441|
|    40-59|  336|
|    60-79|  141|
|   