In [1]:
from data_download import descargar_y_guardar_json
from generar_json import generar_pais_ciudad
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, to_date, col, monotonically_increasing_id, explode, array, rand, floor, date_add
from pyspark.sql.types import IntegerType, LongType, DoubleType
import pandas as pd
import random

In [2]:
# Configurar la sesión de Spark
spark = SparkSession.builder \
    .appName("Transformación de datos COVID 2") \
    .getOrCreate()

In [3]:
def expand_df(df_entrada):
    df_with_id = df_entrada.withColumn("temp_id", monotonically_increasing_id())
    ids = array([(lit(i)) for i in range(1, 7)])
    tabla_duplicada = df_with_id.withColumn("id", explode(ids))
    
    columnas = ["id"] + [c for c in df_with_id.columns if c != "temp_id"]
    tabla_duplicada = tabla_duplicada.select(columnas)

    # Multiplicar columnas numéricas por un factor aleatorio entre 1 y 2.1, y redondear sin decimales
    for col_name in df_entrada.columns:
        # Verificar si el tipo de la columna es numérico y no es la columna 'date'
        if isinstance(df_entrada.schema[col_name].dataType, (DoubleType, IntegerType, LongType)) and col_name != "date":
            # Multiplicar, redondear hacia abajo y convertir a entero
            tabla_duplicada = tabla_duplicada.withColumn(col_name, floor(col(col_name) * (1 + rand() * 1.1)))
        
    # Transformar columna de date a Fecha de regalo
    tabla_duplicada = tabla_duplicada.withColumn('date', to_date(col('date').cast("string"), 'yyyyMMdd'))

    return tabla_duplicada

    Generar json

In [4]:
fecha = 20200506
url = f"https://api.covidtracking.com/v1/us/{fecha}.json"
ruta_archivo = "./downloads/datos_covid.json"
descargar_y_guardar_json(url, ruta_archivo)

Datos guardados exitosamente en ./downloads/datos_covid.json


    Leer el archivo JSON como DataFrame Spark

In [5]:
json_entrada = spark.read.json('./downloads/datos_covid.json')

In [6]:
json_entrada.show()

+--------+--------------------+-----+-------------+--------------------+------------+----------------------+---------------------+--------------------+---------------+--------------+--------------------+--------+----------------+----------------------+---------------------+-------+------+--------+----------------+---------+------+-----+----------------+------------------------+
|    date|         dateChecked|death|deathIncrease|                hash|hospitalized|hospitalizedCumulative|hospitalizedCurrently|hospitalizedIncrease|inIcuCumulative|inIcuCurrently|        lastModified|negative|negativeIncrease|onVentilatorCumulative|onVentilatorCurrently|pending|posNeg|positive|positiveIncrease|recovered|states|total|totalTestResults|totalTestResultsIncrease|
+--------+--------------------+-----+-------------+--------------------+------------+----------------------+---------------------+--------------------+---------------+--------------+--------------------+--------+----------------+---------

    Multiplicar las columnas asignando un 'id'

In [7]:
json_mult = expand_df(json_entrada)

In [8]:
json_mult.show()

+---+----------+--------------------+------+-------------+--------------------+------------+----------------------+---------------------+--------------------+---------------+--------------+--------------------+--------+----------------+----------------------+---------------------+-------+------+--------+----------------+---------+------+-----+----------------+------------------------+
| id|      date|         dateChecked| death|deathIncrease|                hash|hospitalized|hospitalizedCumulative|hospitalizedCurrently|hospitalizedIncrease|inIcuCumulative|inIcuCurrently|        lastModified|negative|negativeIncrease|onVentilatorCumulative|onVentilatorCurrently|pending|posNeg|positive|positiveIncrease|recovered|states|total|totalTestResults|totalTestResultsIncrease|
+---+----------+--------------------+------+-------------+--------------------+------------+----------------------+---------------------+--------------------+---------------+--------------+--------------------+--------+-----

    Generar e Importar ciudades y paises

In [9]:
generar_pais_ciudad()

Datos de ciudades y países guardados exitosamente en la carpeta 'g_datos'


In [10]:
# Cargar el archivo JSON utilizando Pandas
pandas_ciudades = pd.read_json("./d_generados/ciudades.json")
pandas_paises = pd.read_json("./d_generados/paises.json")

# Convertir el DataFrame de Pandas a un DataFrame de Spark
spark_ciudades = spark.createDataFrame(pandas_ciudades)
spark_paises= spark.createDataFrame(pandas_paises)

# Juntar los dos DataFrames por la columna 'ID'
paises_ciudades_tabla = spark_paises.join(spark_ciudades, spark_paises['id'] == spark_ciudades['id']).select(
    spark_paises.id,
    spark_paises.id_pais,
    spark_ciudades.id_ciudad,
    spark_paises.Pais,
    spark_ciudades.Ciudad,
    spark_ciudades.Coordenadas
)

FileNotFoundError: File ./d_generados/ciudades.json does not exist

In [None]:
paises_ciudades_tabla.show()

+---+-------+---------+-----------+---------+------------------+
| id|id_pais|id_ciudad|       Pais|   Ciudad|       Coordenadas|
+---+-------+---------+-----------+---------+------------------+
|  1|    201|      101|     España|   Madrid|[40.4168, -3.7038]|
|  2|    202|      102|    Francia|Barcelona| [41.3851, 2.1734]|
|  3|    203|      103|   Alemania| Valencia|[39.4699, -0.3763]|
|  4|    204|      104|     Italia|    Paris| [48.8566, 2.3522]|
|  5|    205|      105|Reino Unido|   Berlin|   [52.52, 13.405]|
|  6|    206|      106|   Portugal|     Roma|[41.9028, 12.4964]|
+---+-------+---------+-----------+---------+------------------+



    Unir por id json entrada y paises.

In [None]:
tabla_casi_final = paises_ciudades_tabla.join(json_mult, on="id", how="right")
tabla_casi_final = tabla_casi_final.orderBy(spark_paises.id.asc())

In [None]:
tabla_casi_final.show()

+---+-------+---------+-----------+---------+------------------+----------+--------------------+------+-------------+--------------------+------------+----------------------+---------------------+--------------------+---------------+--------------+--------------------+--------+----------------+----------------------+---------------------+-------+------+--------+----------------+---------+------+-----+----------------+------------------------+
| id|id_pais|id_ciudad|       Pais|   Ciudad|       Coordenadas|      date|         dateChecked| death|deathIncrease|                hash|hospitalized|hospitalizedCumulative|hospitalizedCurrently|hospitalizedIncrease|inIcuCumulative|inIcuCurrently|        lastModified|negative|negativeIncrease|onVentilatorCumulative|onVentilatorCurrently|pending|posNeg|positive|positiveIncrease|recovered|states|total|totalTestResults|totalTestResultsIncrease|
+---+-------+---------+-----------+---------+------------------+----------+--------------------+------+---

In [None]:
def expand_and_transform_json(df_entrada, num_copies):
    # Crear un DataFrame con múltiplos y cambiar el tipo de 'copy_id'
    multiplier_df = spark.range(num_copies).withColumn("multiplier", lit(1) + col("id") * lit(0.1))
    multiplier_df = multiplier_df.withColumn("copy_id", col("id").cast("integer")).drop("id")
    
    # Realizar un cross join con el DataFrame original
    expanded_df = df_entrada.crossJoin(multiplier_df)

    # Aplicar transformaciones a columnas numéricas y convertir a entero
    for column in df_entrada.columns:
        if column not in ['date', 'id', 'id_pais', 'id_ciudad'] and isinstance(df_entrada.schema[column].dataType, (DoubleType, IntegerType, LongType)):
            random_factor = random.uniform(1, 1.5)
            expanded_df = expanded_df.withColumn(column, (col(column) * col("multiplier") * random_factor).cast("integer"))
    
    # Ajustar la fecha según el número de copias
    if 'date' in df_entrada.columns:
        expanded_df = expanded_df.withColumn('date', date_add(col('date'), col('copy_id')))

    expanded_df = expanded_df.drop('multiplier', 'copy_id', 'id')

    return expanded_df

In [None]:
# Se generan datos para 120 dias, lo que equivaldria a 4 meses
tabla_final = expand_and_transform_json(tabla_casi_final, 120)

In [None]:
tabla_final.show()

+-------+---------+-----------+--------+------------------+----------+--------------------+------+-------------+--------------------+------------+----------------------+---------------------+--------------------+---------------+--------------+--------------------+--------+----------------+----------------------+---------------------+-------+------+--------+----------------+---------+------+-----+----------------+------------------------+
|id_pais|id_ciudad|       Pais|  Ciudad|       Coordenadas|      date|         dateChecked| death|deathIncrease|                hash|hospitalized|hospitalizedCumulative|hospitalizedCurrently|hospitalizedIncrease|inIcuCumulative|inIcuCurrently|        lastModified|negative|negativeIncrease|onVentilatorCumulative|onVentilatorCurrently|pending|posNeg|positive|positiveIncrease|recovered|states|total|totalTestResults|totalTestResultsIncrease|
+-------+---------+-----------+--------+------------------+----------+--------------------+------+-------------+----

In [None]:
tabla_final_panda = tabla_final.toPandas()

In [None]:
tabla_final_panda.to_parquet("datos_generados/tabla_datos_mult.parquet")

In [None]:
spark.stop()