In [None]:
# --- Celda 1 (Versi√≥n Definitiva para MODO LOCAL) ---
import pyspark
from pyspark.sql import SparkSession
from elasticsearch import Elasticsearch

# Detiene cualquier sesi√≥n previa
try:
    spark.stop()
except:
    pass

# Construye la sesi√≥n en MODO LOCAL. No necesita el conector nativo.
spark = SparkSession.builder \
    .appName("AnalisisFlotaDrones_Local") \
    .master("local[*]") \
    .getOrCreate()

# Cliente para Elasticsearch (para borrar el √≠ndice despu√©s)
es_client = Elasticsearch("http://localhost:9200")

# Verificar conexiones
if es_client.ping():
    print("‚úÖ Conexi√≥n con Elasticsearch exitosa.")
else:
    print("‚ùå Error: No se pudo conectar a Elasticsearch.")

print("‚úÖ Sesi√≥n de Spark y clientes listos.")
spark

In [None]:
pip install pandas

In [None]:
# --- Celda 2 Mejorada: Generar o Cargar Datos de Drones ---
import pandas as pd
import os

csv_filename = 'drone_sensors_data.csv'

# Comprobar si el archivo ya existe en la carpeta local
if not os.path.exists(csv_filename):
    print(f"El archivo '{csv_filename}' no existe. Generando nuevos datos...")
    
    # --- Generar Datos de Drones con Pandas ---
    num_drones = 50
    data = {
        'drone_id': [f'DRN-{i:03}' for i in range(1, num_drones + 1)],
        'bateria_restante': [round(20 + 80 * os.urandom(1)[0] / 255, 2) for _ in range(num_drones)],
        'temperatura_motor': [round(60 + 40 * os.urandom(1)[0] / 255, 2) for _ in range(num_drones)],
        'vibracion_hz': [round(5 + 25 * os.urandom(1)[0] / 255, 2) for _ in range(num_drones)]
    }
    df_pandas = pd.DataFrame(data)

    # Guardar localmente en la carpeta de notebooks
    df_pandas.to_csv(csv_filename, index=False)
    
    print(f"‚úÖ Archivo '{csv_filename}' creado con {len(df_pandas)} registros.")

else:
    print(f"‚úÖ El archivo '{csv_filename}' ya existe. Cargando datos desde el archivo.")
    # Cargar los datos desde el CSV existente
    df_pandas = pd.read_csv(csv_filename)
    print(f"Cargados {len(df_pandas)} registros.")

# Mostrar las primeras 5 filas para verificar
df_pandas.head()

In [None]:
# --- Celda 3 Mejorada: Subir a HDFS (si es necesario) ---
from hdfs import InsecureClient
from hdfs.util import HdfsError

# Cliente para interactuar con HDFS
hdfs_client = InsecureClient('http://localhost:9870')

# Definir rutas
hdfs_path_raw = '/data/raw/drones'
hdfs_filepath = f'{hdfs_path_raw}/{csv_filename}'

try:
    # Intenta obtener el estado del archivo. Si no existe, lanzar√° una HdfsError.
    status = hdfs_client.status(hdfs_filepath)
    print(f"‚úÖ El archivo ya existe en HDFS en '{hdfs_filepath}'. No se necesita subir de nuevo.")
    
except HdfsError:
    # Si el archivo no existe, la excepci√≥n HdfsError es capturada.
    print(f"El archivo no existe en HDFS. Procediendo a la subida...")
    
    # Asegurarse de que el directorio base exista.
    hdfs_client.makedirs(hdfs_path_raw)
    print(f"Directorio '{hdfs_path_raw}' verificado/creado en HDFS.")
    
    # Subir el archivo, overwrite=True es seguro aqu√≠ porque ya sabemos que no existe,
    # pero es una buena pr√°ctica por si ocurre algo entre la comprobaci√≥n y la subida.
    hdfs_client.upload(hdfs_path_raw, csv_filename, overwrite=True)
    
    print(f"‚úÖ Archivo '{csv_filename}' subido exitosamente a HDFS en: '{hdfs_filepath}'")

finally:
    # En cualquier caso (exista o no), listar el contenido para confirmar.
    print("\nContenido actual en HDFS en el directorio /data/raw/drones:")
    print(hdfs_client.list(hdfs_path_raw))

In [None]:
# --- Leer desde HDFS y Procesar con Spark ---
from pyspark.sql.functions import col, when

df_spark = spark.read.option("header", "true").option("inferSchema", "true").csv(f"hdfs://localhost:9000{hdfs_filepath}")

print("Esquema inferido por Spark:")
df_spark.printSchema()

# Calcular un "√çndice de Riesgo"
# El riesgo aumenta si la bater√≠a es baja, la temperatura es alta o la vibraci√≥n es alta
df_analizado = df_spark.withColumn(
    "indice_riesgo",
    (
        when(col("bateria_restante") < 30, 1).otherwise(0) +
        when(col("temperatura_motor") > 85, 1).otherwise(0) +
        when(col("vibracion_hz") > 20, 1).otherwise(0)
    )
)

print("\nDataFrame con √çndice de Riesgo calculado:")
df_analizado.show()

# Filtrar solo los drones que necesitan mantenimiento (riesgo > 0)
drones_en_riesgo = df_analizado.filter(col("indice_riesgo") > 0).sort(col("indice_riesgo").desc())

print("\nüö® Drones que requieren atenci√≥n inmediata:")
drones_en_riesgo.show()

In [None]:
# --- Celda 5 (Tu Soluci√≥n): Convertir el resultado final a Pandas y Cargar ---

import json
from elasticsearch import Elasticsearch

# Re-creamos el cliente por si la sesi√≥n se reinici√≥
es_client = Elasticsearch("http://localhost:9200")
es_index_name = "drones_en_riesgo"

print(f"Preparando para enviar los resultados a Elasticsearch...")

# Borrar el √≠ndice si ya existe para una prueba limpia
if es_client.indices.exists(index=es_index_name):
    es_client.indices.delete(index=es_index_name)
    print(f"√çndice '{es_index_name}' antiguo borrado.")

# 1. PASO CLAVE: Convertir el DataFrame final de Spark a un DataFrame de Pandas
# Esta es la √∫nica acci√≥n que trae datos del entorno Spark al entorno Python.
print("Convirtiendo resultado de Spark ('drones_en_riesgo') a Pandas...")
df_pandas_final = drones_en_riesgo.toPandas()
print(f"Conversi√≥n completa. Se van a indexar {len(df_pandas_final)} drones.")

# 2. Convertir el DataFrame de Pandas a una lista de diccionarios
# (Este formato es ideal para el cliente de Elasticsearch)
documentos_para_es = df_pandas_final.to_dict(orient='records')

# 3. Indexar la lista de drones en Elasticsearch
print("Indexando documentos en Elasticsearch...")
for doc in documentos_para_es:
    # Usamos el cliente de python que ya sabemos que funciona
    es_client.index(index=es_index_name, document=doc, id=doc['drone_id'])

# 4. Refrescar el √≠ndice para que los datos est√©n disponibles para b√∫squeda
es_client.indices.refresh(index=es_index_name)

print(f"\n‚úÖ ¬°√âXITO! Datos indexados en Elasticsearch exitosamente.")