In [1]:
import os
import json
import requests
import numpy as np
import pandas as pd
import random
from typing import List
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, row_number,  col, when, rand
from pyspark.sql.window import Window
from datetime import datetime, timedelta
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, StructType, StructField, IntegerType, FloatType, NumericType

### Generar ficheros con ciudades y países

In [2]:
def descargar_datos_covid(url_api, fecha):
    response = requests.get(url_api)
    
    if response.status_code == 200:
        datos_json = response.json()
        
        nombre_archivo = f"{fecha}.json"
        
        directorio = 'data'
        if not os.path.exists(directorio):
            os.makedirs(directorio)
        
        ruta_archivo = os.path.join(directorio, nombre_archivo)
        with open(ruta_archivo, 'w') as file:
            json.dump(datos_json, file, indent=4)
        
        print(f"Los datos de COVID descargados el {fecha}.json han sido guardados en '{ruta_archivo}'.")
    else:
        print("No se pudo descargar los datos. Estado de la solicitud:", response.status_code)

fecha = "20200602"
url_api = f"https://api.covidtracking.com/v1/us/{fecha}.json"

descargar_datos_covid(url_api, fecha)

Los datos de COVID descargados el 20200602.json han sido guardados en 'data\20200602.json'.


In [16]:
spark = SparkSession.builder \
    .appName("Generar CSV") \
    .getOrCreate()

In [4]:
def generar_csv(spark):
    cities = [
        Row(
            id=1,
            country_id=1,
            name="Valencia",
            lat="39.47010514497293",
            lng="-0.3766213363909406",
        ),
        Row(
            id=2,
            country_id=2,
            name="París",
            lat="48.85663074697171",
            lng="2.3518663102905877",
        ),
        Row(
            id=3,
            country_id=3,
            name="New York",
            lat="40.7128",
            lng="-74.0060",
        ),
        Row(
            id=4, country_id=4, name="Tokyo", lat="35.6895", lng="139.6917"
        ),
        Row(
            id=5, country_id=5, name="London", lat="51.5074", lng="-0.1278"
        ),
        Row(
            id=6,
            country_id=6,
            name="Sydney",
            lat="-33.8743710663433",
            lng="151.0535114434631",
        ),
    ]

    countries = [
        Row(
            id=1,
            name="España",
        ),
        Row(
            id=2,
            name="Francia",
        ),
        Row(
            id=3,
            name="Estados Unidos",
        ),
        Row(id=4, name="Japón"),
        Row(id=5, name="Inglaterra"),
        Row(
            id=6,
            name="Australia",
        ),
    ]

    cities_df = spark.createDataFrame(
        cities,
        StructType(
            [
                StructField("id", IntegerType(), True),
                StructField("country_id", IntegerType(), True),
                StructField("name", StringType(), True),
                StructField("lat", StringType(), True),
                StructField("lng", StringType(), True),
            ]
        ),
    )

    countries_df = spark.createDataFrame(
        countries,
        StructType(
            [
                StructField("id", IntegerType(), True),
                StructField("name", StringType(), True),
            ]
        ),
    )

    # Creación de los archivos de ciudades.csv y paises.csv
    directorio_script = os.getcwd()
    directorio_salida = os.path.join(directorio_script, "datos")

    if not os.path.exists(directorio_salida):
        os.makedirs(directorio_salida)

    ruta_salida_ciudades = os.path.join(directorio_salida, "ciudades.csv")
    ruta_salida_paises = os.path.join(directorio_salida, "paises.csv")

    ciudades_pandas_df = cities_df.toPandas()
    paises_pandas_df = countries_df.toPandas()

    ciudades_pandas_df.to_csv(ruta_salida_ciudades, index=False)
    paises_pandas_df.to_csv(ruta_salida_paises, index=False)


generar_csv(spark=spark)

In [5]:
def generar_fechas(fecha: int, n_veces: int) -> List[int]:
    año = fecha // 10000
    mes = (fecha // 100) % 100
    día = fecha % 100

    fecha_actual = datetime(año, mes, día)
    fechas = []

    for _ in range(n_veces):
        fecha_actual += timedelta(days=1)
        fechas.append(int(fecha_actual.strftime('%Y%m%d')))
            
    return fechas

def csv_a_parquet(ruta_csv: str | None = None, delimitador: str = ",") -> bool:
    try:
        if ruta_csv is None: 
            raise ValueError("Introduce la ruta del CSV")
        
        partes_ruta = ruta_csv.split("/")
        carpeta_ruta = "/".join(partes_ruta[:-1])
        nuevo_nombre = partes_ruta[-1].split(".")[0] + ".parquet"
        
        pd.read_csv(filepath_or_buffer=ruta_csv, delimiter=delimitador).to_parquet(path=f"{carpeta_ruta}/{nuevo_nombre}")
        print("¡Transformación realizada con éxito! :)")
        return True
    except Exception as e:
        print(f"Error: {e}")
        return False

In [18]:
def main():
    RUTA_CARPETA = os.getcwd()
    ruta_covid_diario = os.path.join(RUTA_CARPETA, "datos", f"{fecha}.json")
    df = spark.read.option("multiline", True).json(ruta_covid_diario)
    df.createOrReplaceTempView("json")

    columnas_numericas = [
        col.name for col in df.schema.fields if isinstance(col.dataType, NumericType)
    ]

    esquema = StructType(
        [
            *[
                StructField(nombre_col, df.schema[nombre_col].dataType, True)
                for nombre_col in [*columnas_numericas, "dateChecked"]
            ],
            StructField("id_ciudad", IntegerType(), True),
            StructField("factor_multiplicacion", FloatType(), True),
        ]
    )

    try:
        agregar_filas = []
        for i in range(6):
            N_VECES = 120
            ciudad = i
            filas = df.select(*columnas_numericas, "dateChecked").collect()[0]
            fila = filas.asDict()
            fechas = generar_fechas(fecha=fila["date"], n_veces=N_VECES)
            a_fila = {}

            for j in range(N_VECES):
                factor_m = random.uniform(0.5, 1.6)
                for clave in fila:
                    match clave:
                        case "date":
                            a_fila[clave] = fechas[j]
                        case "dateChecked":
                            a_fila[clave] = fila[clave]
                        case _:
                            nuevo_valor = int(np.floor(fila[clave] * factor_m))
                            a_fila[clave] = nuevo_valor

                a_fila["id_ciudad"] = ciudad+1
                a_fila["factor_multiplicacion"] = factor_m
                agregar_filas.append(Row(**a_fila))

        df_parquet = spark.createDataFrame(agregar_filas, esquema)
        df_parquet.createOrReplaceTempView("parquet")
        df_parquet.toPandas().to_parquet("./datos/info_covid.parquet")
        df_parquet.toPandas().to_csv("./datos/info_covid.csv")
        print("Parquet guardado, nombre: info_covid.parquet :)")
        
        csv_a_parquet(ruta_csv="../datos/datos_covid_transformados.csv")
    except Exception as e:
        print("Error al intentar guardar los datos en el archivo parquet: {}".format(e))

if __name__ == "__main__":
    main()

Parquet saved, name: covid_info.parquet :)
Error: [Errno 2] No such file or directory: '../data/transformed_covid_data.csv'
