## Ejemplo Completo para Indexar Documentos desde PySpark a Elasticsearch
Supongamos que tienes una colección de datos que deseas indexar en Elasticsearch. Aquí está el código completo que puedes ejecutar en un Jupyter Notebook:

In [1]:
from pyspark.sql import SparkSession

# Iniciar Spark Session
spark = SparkSession.builder \
    .appName("PySpark Elasticsearch Integration") \
    .config("spark.es.nodes", "elasticsearch") \
    .config("spark.es.port", "9200") \
    .config("spark.es.nodes.wan.only", "true") \
    .getOrCreate()

# Supongamos que tienes algunos datos de ejemplo
data = [("1", "Text for document one"), ("2", "Text for document two"), ("3", "Text for document three")]
columns = ["id", "doc_text"]

# Crear un DataFrame
df = spark.createDataFrame(data, schema=columns)

# Especificar el índice de Elasticsearch donde se indexarán los datos
index_name = "documents"

# Escribir el DataFrame a Elasticsearch
df.write.format("org.elasticsearch.spark.sql") \
    .option("es.resource", f"{index_name}/_doc") \
    .mode("append") \
    .save()

# Detener la sesión Spark
spark.stop()


## Leer Datos desde Elasticsearch en PySpark
Para leer datos desde Elasticsearch de vuelta a un DataFrame de PySpark, puedes usar el siguiente código:

In [2]:
from pyspark.sql import SparkSession

# Iniciar Spark Session
spark = SparkSession.builder \
    .appName("PySpark Read from Elasticsearch") \
    .config("spark.es.nodes", "elasticsearch") \
    .config("spark.es.port", "9200") \
    .config("spark.es.nodes.wan.only", "true") \
    .getOrCreate()

# Leer datos desde el índice de Elasticsearch
index_name = "documents"
df = spark.read.format("org.elasticsearch.spark.sql") \
    .option("es.resource", f"{index_name}/_doc") \
    .load()

# Mostrar los datos
df.show()

# Detener la sesión Spark
spark.stop()


+--------------------+---+
|            doc_text| id|
+--------------------+---+
|Text for document...|  2|
|Text for document...|  1|
|Text for document...|  3|
+--------------------+---+

