### Generación de archivos

Vamos a usar algunos datasets por defecto por practicidad, primero realizamos los import del notebook

In [0]:
import time
import datetime
import random

In [0]:
organization = 'FarmIA' # Folder raíz en el que quedaran los files

account = spark.conf.get("adls.account.name")
landing_path = f"abfss://landingcaso@{account}.dfs.core.windows.net"

print(landing_path)

Reutilizaremos la misma funcion para enviar en la misma estructura de año, mes y día los archivos a cada ruta

In [0]:
def land_file(df,datasource,dataset,format='json'):
  """
    Guarda un DataFrame en un sistema de archivos distribuido con una estructura de directorios basada en la fecha actual,
    utilizando un formato específico (por defecto, JSON). La función escribe el DataFrame en una ubicación temporal,
    lo mueve a una ruta final organizada por fuente de datos, conjunto de datos y marca de tiempo, y luego elimina el
    directorio temporal.

    Parámetros:
        df (pyspark.sql.DataFrame): El DataFrame de Spark que se desea guardar.
        datasource (str): Nombre o identificador de la fuente de datos, usado para organizar la ruta final.
        dataset (str): Nombre o identificador del conjunto de datos, usado para organizar la ruta final.
        format (str, opcional): Formato en el que se guardará el archivo. Por defecto es 'json'. 
                                Otros formatos soportados dependen de Spark (e.g., 'parquet', 'csv').

    Comportamiento:
        1. Escribe el DataFrame en una carpeta temporal (`tmp_path`) usando el formato especificado, coalesciendo los datos en un solo archivo.
        2. Genera una ruta final basada en la fecha actual (`YYYY/MM/DD`), el nombre de la fuente de datos, el conjunto de datos y una marca de tiempo.
        3. Mueve el archivo generado desde la carpeta temporal a la ruta final.
        4. Imprime la ruta final del archivo guardado.
        5. Elimina la carpeta temporal.

    Variables externas utilizadas:
        - landing_path (str): Ruta base del sistema de archivos donde se almacenan los datos. Debe estar definida globalmente.
        - dbutils.fs: Utilidad de Databricks para manipular el sistema de archivos (ls, mv, rm).
        - datetime: Módulo de Python para manejar fechas y marcas de tiempo.

    Ejemplo:
        save_file(mi_dataframe, "ventas", "diarias", format="parquet")
        # Salida esperada: "dbfs:/landing/ventas/diarias/2025/03/14/ventas_diarias_20250314123045.parquet"

    Notas:
        - La función asume que está ejecutándose en un entorno compatible con Databricks (por el uso de `dbutils.fs`).
        - Si el formato especificado no es compatible con Spark, se generará un error.
    """
  tmp_path = f'{landing_path}/tmp/'
  df.coalesce(1).write.format(format).mode("overwrite").save(tmp_path)
  now = datetime.datetime.utcnow()
  date_path = now.strftime("%Y/%m/%d")
  timestamp = now.strftime("%Y%m%d%H%M%S") 
  for file in dbutils.fs.ls(tmp_path):
    if file.name.endswith(f'.{format}'):
      final_path = file.path.replace('tmp',f'{datasource}/{dataset}')
      final_path = final_path.replace(file.name, f'{date_path}/{datasource}-{dataset}-{timestamp}.{format}')
      dbutils.fs.mv(file.path, final_path)
      print(final_path)
  dbutils.fs.rm(tmp_path, True)

## Datos de sensores de IoT

In [0]:
# Dataset origen: dbfs:/databricks-datasets/iot-stream/
# display(dbutils.fs.ls(f"/databricks-datasets/iot-stream/data-device/"))

iot_df = spark.read.json(f"/databricks-datasets/iot-stream/data-device/")
# iot_df.printSchema()

# Generar archivos de id con valores random cada vez y cargarlo a landing

min_value = random.randint(0, 999999)
max_value = min_value + random.randint(0, 1000)

print(f"Mínimo: {min_value}, Máximo: {max_value}")

file = iot_df.where(f"id between {min_value} and {max_value}").coalesce(1)

land_file(file,'FarmIA','iot-devices')
time.sleep(5)

## Datos de ventas de clientes

In [0]:
datasource = "retail-org"
dataset = "sales_orders"

# Lectura de dataset de sales order

df = spark.read.json(f"/databricks-datasets/{datasource}/{dataset}/")
# df.printSchema()

# Generar archivos de sales_oder con valores random cada vez y cargarlo a landing

min_value = random.randint(0, 90000000)
max_value = min_value + random.randint(0, 10000)

print(f"Mínimo: {min_value}, Máximo: {max_value}")

file = df.where(f"customer_id between {min_value} and {max_value}").coalesce(1)

land_file(file,'FarmIA','sales_orders')
time.sleep(5)