# Set up

In [1]:
from dotenv import load_dotenv
from llama_index.core import Document
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.core import VectorStoreIndex, ServiceContext
from llama_index.core.settings import Settings
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.query_engine import RetrieverQueryEngine # para crear una query_engine con un retriever específico.
from llama_index.core.evaluation import RetrieverEvaluator
import pandas as pd
from llama_index.core import StorageContext, load_index_from_storage
from llama_index.core import get_response_synthesizer
from llama_index.llms.openai import OpenAI

# Carga del texto limpio
data_path = "../data/plain_text/plain_text.txt"

with open(data_path, "r", encoding = "utf-8") as f:
    content = f.read()

pdf_doc = Document(text=content.strip())

# Export boolean
export_csv = False

# Key
load_dotenv()

# Set llm model
llm = OpenAI(model="gpt-4o-mini", temperature=0)
Settings.llm = llm
# Set embedding model
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")

In [2]:
# Auxiliar functions
def run_query_and_inspect(query, query_engine, show_nodes=True):
    """
    Ejecuta la query sobre el query_engine dado, imprime la respuesta y los chunks recuperados.
    """
    response = query_engine.query(query)

    print(f"\nConsulta: {query}\n{'='*60}")
    print(f"\nRespuesta: {response.response}\n{'='*60}")

    if show_nodes:
        for i, node in enumerate(response.source_nodes):
            print(f"\n--- Nodo {i+1} ---")
            print(f"Score: {node.score:.4f}")
            print(node.node.get_content())
    
    return response  # Devolvemos también el objeto response si lo quieres seguir utilizando

## Get VectorStoreIndex w optimized chunk_size

In [3]:
storage_path = "../data/index_storage/"
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
index = load_index_from_storage(storage_context)

In [4]:
# Redefine retriever
retriever = index.as_retriever(similarity_top_k=5)

# Query Engine

In [5]:
response_synthesizer = get_response_synthesizer(response_mode="compact")
query_engine = RetrieverQueryEngine(retriever=retriever, response_synthesizer=response_synthesizer, node_postprocessors=[])

# System prompt template

In [6]:
from llama_index.core.prompts import PromptTemplate
from llama_index.core import get_response_synthesizer

system_prompt = """
Eres un experto en PySpark y Spark, especializado en el temario de la certificación Databricks Certified Associate Developer for Apache Spark.

Debes responder única y exclusivamente utilizando la información que se encuentra en el contexto. 

Si la respuesta a la pregunta no se puede obtener del contexto, responde exactamente: "No he encontrado información al respecto."
"""

""" PROMPT TEMPLATE INICIAL --------------------
qa_template_str = (
    "Contexto: {context_str}\n\n"
    "Pregunta: {query_str}\n"
    "Respuesta:"
)

------------------------------------------------
"""

qa_template_str = """
Contexto: {context_str}
Utiliza exclusivamente la información anterior para responder. 
Pregunta: {query_str}
Respuesta:
"""
qa_template = PromptTemplate(qa_template_str)

llm = OpenAI(
    model="gpt-4o-mini",
    temperature=0,
    system_prompt=system_prompt
)

response_synthesizer_2 = get_response_synthesizer(
    response_mode="tree_summarize",
    llm=llm,
    text_qa_template=qa_template
)

query_engine_2 = RetrieverQueryEngine(
    retriever=retriever, 
    response_synthesizer=response_synthesizer_2
    # ,
    # node_postprocessors=[]
)

# Ejemplo de pregunta
query = "How can we configure the Spark application?"
response = run_query_and_inspect(query = query, query_engine=query_engine_2)


Consulta: How can we configure the Spark application?

Respuesta: No he encontrado información al respecto.

--- Nodo 1 ---
Score: 0.6063
This capability, along with 
other features, makes Spark the tool of choice for any production-grade applications.

Understanding Apache Spark and Its Applications

Multiple language support
Spark supports multiple languages for development such as Java, R, Scala, and Python. This gives 
users the flexibility to use any language of choice to build applications in Spark.
The components of Spark
Let’s talk about the different components Spark has. As you can see in Figure 1.1, Spark Core is the 
backbone of operations in Spark and spans across all the other components that Spark has. Other 
components that we’re going to discuss in this section are Spark SQL, Spark Streaming, Spark MLlib, 
and GraphX.
Figure 2.1: Spark components
Let’s look at the first component of Spark.
Spark Core
Spark Core is central to all the other components of Spark. It provi

In [7]:
queries = [
    "What does collect() function does in pyspark?",
    "How can we control the intern process in a Spark application?", 
    "What is the under-the-hood workflow in a Spark application?",
    "What is the role of the Driver component?",
    "How is the execution hierarchy organized in Spark?",
    "How can I get the number of rows in a Spark DataFrame?",
    "How can I create a DataFrame in PySpark?",
    "What is the optimized way to perform joins in PySpark?",
    "What does a 'broadcast join' means?",
    "How does persist() differ from cache() in PySpark?",
    "How can I select specific columns from a PySpark DataFrame?"
]

for query in queries: 
    response = run_query_and_inspect(query, query_engine_2, show_nodes=False)


Consulta: What does collect() function does in pyspark?

Respuesta: The collect() function retrieves all elements from the DataFrame or RDD and returns them as a list. It should be used with caution as it brings all data to the driver node, and if the driver doesn’t have enough memory to hold the data, it may result in out-of-memory errors.

Consulta: How can we control the intern process in a Spark application?

Respuesta: No he encontrado información al respecto.

Consulta: What is the under-the-hood workflow in a Spark application?

Respuesta: The under-the-hood workflow in a Spark application involves the following steps:

1. A user submits a spark-submit request to the Spark engine, creating a Spark application. An action performed will result in a job being created.
2. The request initiates communication with the cluster manager, which initializes the Spark driver to execute the main() method of the Spark application. A SparkSession is created for this execution.
3. The driver c

In [24]:
query = "How does persist() differ from cache() in PySpark?"
response = run_query_and_inspect(query, query_engine_2, True)


Consulta: How does persist() differ from cache() in PySpark?

Respuesta: No he encontrado información al respecto.

--- Nodo 1 ---
Score: 0.6865
Data persistence ensures that the intermediate results are 
available for reuse without recomputation.
•	 Caching versus persistence: Caching is a specific form of data persistence that stores data in 
memory, while persistence encompasses both in-memory and on-disk storage.
Caching data
Caching is a form of data persistence that stores DataFrames, RDDs, or datasets in memory for fast 
access. It is an essential optimization technique that improves the performance of Spark applications, 
particularly when dealing with iterative algorithms or repeated computations.
To cache a DataFrame or an RDD, you can use the .cache() or .persist() method while 
specifying the storage level:
•	 Memory-only: This option stores data in memory but does not replicate it for fault tolerance. 
Use .cache() or .persist(StorageLevel.MEMORY_ONLY).
•	 Memory-only, se

In [41]:
# Prueba con otros prompts también sigue dando el mismo resultado. 
# Comprobación de temperatura: 

llm = OpenAI(
    model="gpt-4o-mini",
    temperature=0.3,
    system_prompt=system_prompt
)

response_synthesizer_3 = get_response_synthesizer(
    response_mode="compact",
    llm=llm,
    text_qa_template=qa_template
)

query_engine_3 = RetrieverQueryEngine(
    retriever=retriever, 
    response_synthesizer=response_synthesizer_3,
    node_postprocessors=[]
)

query = "How does persist() differ from cache() in PySpark?"
response = run_query_and_inspect(query, query_engine_3, False)


Consulta: How does persist() differ from cache() in PySpark?

Respuesta: No he encontrado información al respecto.


In [10]:
# Prueba con otros prompts también sigue dando el mismo resultado. 
# Comprobación de response_mode
system_prompt = """
Eres un experto en PySpark y Spark, especializado en el temario de la certificación Databricks Certified Associate Developer for Apache Spark.

Debes responder única y exclusivamente utilizando la información que se encuentra en los documentos proporcionados como contexto. 

Si la respuesta a la pregunta no se obtiene claramente del contexto, intenta razonar con el contexto una respuesta útil para responder a la consulta.

Si la respuesta a la pregunta no se puede obtener del contexto ni se puede razonar a partir de él, responde exactamente: "No he encontrado información al respecto."
"""

llm = OpenAI(
    model="gpt-4o-mini",
    temperature=0.0,
    system_prompt=system_prompt
)

from llama_index.core.prompts import PromptTemplate
from llama_index.core import get_response_synthesizer

qa_template_str = """
{context_str}
Utiliza exclusivamente la información anterior para responder. 
Pregunta: {query_str}
Si no encuentras suficiente información para dar una respuesta clara, responde: 'No he encontrado información al respecto.'
Respuesta:
"""

qa_template = PromptTemplate(qa_template_str)

response_synthesizer_3 = get_response_synthesizer(
    response_mode="compact",
    llm=llm,
    text_qa_template=qa_template
)

query_engine_3 = RetrieverQueryEngine(
    retriever=retriever, 
    response_synthesizer=response_synthesizer_3,
    node_postprocessors=[]
)

query = "How does persist() differ from cache() in PySpark?"
response = run_query_and_inspect(query, query_engine_3, False)


Consulta: How does persist() differ from cache() in PySpark?

Respuesta: La diferencia entre `persist()` y `cache()` en PySpark radica en que `cache()` es una forma específica de `persist()` que almacena datos en memoria. Por otro lado, `persist()` permite especificar diferentes niveles de almacenamiento, que pueden incluir tanto almacenamiento en memoria como en disco. En resumen, `cache()` es un método que utiliza el nivel de almacenamiento `MEMORY_ONLY`, mientras que `persist()` puede utilizar varios niveles de almacenamiento, incluyendo opciones como `MEMORY_ONLY`, `MEMORY_ONLY_SER`, `MEMORY_AND_DISK`, y `DISK_ONLY`.


In [None]:
response_synthesizer_3 = get_response_synthesizer(
    response_mode="simple_summarize",
    llm=llm,
    text_qa_template=qa_template
)

In [50]:
for query in queries:
    response = run_query_and_inspect(query, query_engine_3, False)


Consulta: What does collect() function does in pyspark?

Respuesta: La función collect() en PySpark recupera todos los elementos del DataFrame o RDD y los devuelve como una lista. Debe usarse con precaución, ya que trae todos los datos al nodo del driver, lo que puede causar errores de falta de memoria si el driver no tiene suficiente memoria para contener los datos procesados.

Consulta: How can we control the intern process in a Spark application?

Respuesta: No he encontrado información al respecto.

Consulta: What is the under-the-hood workflow in a Spark application?

Respuesta: El flujo de trabajo en una aplicación Spark, desde la presentación de un trabajo hasta la liberación de recursos, se puede resumir en los siguientes pasos:

1. El proceso comienza cuando un usuario envía una solicitud `spark-submit` al motor de Spark, creando así una aplicación Spark. Una vez que se realiza una acción, se genera un trabajo.
2. Spark Core controla todas las funcionalidades y característica

In [49]:
# Probar en la otra consulta sin respuesta: 
query = "How can we control the intern process in a Spark application?"
response = run_query_and_inspect(query, query_engine_3, True)


Consulta: How can we control the intern process in a Spark application?

Respuesta: No he encontrado información al respecto.

--- Nodo 1 ---
Score: 0.5720
This access is 
governed and controlled by a process known as the cluster manager. It is the responsibility of the cluster 
manager to allocate computing resources for the Spark application when the application execution 
starts. These resources become available at the request of the application master. In the Apache Spark 
ecosystem, the application master plays a crucial role in managing and coordinating the execution 
of Spark applications within a distributed cluster environment. It’s an essential component that’s 
responsible for negotiating resources, scheduling tasks, and monitoring the application’s execution.
Once the resources are available, the driver is made aware of those resources. It’s the responsibility of 
the driver to manage these resources based on tasks that need to be executed by the Spark application. 
Once t

In [12]:
queries_2 = [
    "How does the Catalyst optimizer improve query execution in Spark SQL?",
    "What is the difference between repartitioning and coalescing in Spark?",
    "How does watermarking help with late data in Structured Streaming?",
    "What are the key stages of the machine learning life cycle in Spark ML?",
    "What are the different storage levels available when persisting data in Spark?",
    "What is the difference between narrow and wide transformations in Spark?"
]

for query in queries_2:
    response = run_query_and_inspect(query, query_engine_3, True)


Consulta: How does the Catalyst optimizer improve query execution in Spark SQL?

Respuesta: El Catalyst optimizer mejora la ejecución de consultas en Spark SQL mediante el uso de técnicas avanzadas de optimización que transforman el plan de consulta lógico en un plan físico más eficiente. Esto se logra a través de optimizaciones basadas en reglas, que aplican un conjunto de reglas específicas para mejorar aspectos como el empuje de predicados, la plegadura de constantes y la poda de columnas. Además, el optimizador utiliza optimización basada en costos, que estima el costo de diferentes planes de ejecución considerando factores como la distribución de datos, estrategias de unión y recursos disponibles. Esto permite a Spark elegir el plan más eficiente basado en las características reales de ejecución.

--- Nodo 1 ---
Score: 0.7645
Advanced Operations and Optimizations in Spark

Apache Spark is well-known for its powerful optimization capabilities, which significantly enhance 
the perf

# Pruebas

In [18]:
retriever= index.as_retriever(similarity_top_k =5)
query = "What does collect() function do in PySpark?"
response = query_engine.query(query)

# 6️⃣ Mostrar los chunks devueltos
print(f"\nConsulta: {query}\n{'='*60}")

for i, node in enumerate(response.source_nodes):
    print(f"\n--- Nodo {i+1} ---")
    print(f"Score: {node.score:.4f}")
    print(node.node.get_content())


Consulta: What does collect() function do in PySpark?

--- Nodo 1 ---
Score: 0.5924
0 |string_test_3|
+-------+-------+-------+-------------+
Now, let’s take a look at the collect statement.
Collecting the data
A collect statement is used when we want to get all the data that is being processed in different clusters 
back to the driver. When using a collect statement, we need to make sure that the driver has enough 
memory to hold the processed data. If the driver doesn’t have enough memory to hold the data, we 
will get out-of-memory errors.
This is how you show the collect statement:
data_df.collect()
This statement will then show result as follows:
[Row(col_1=100, col_2=200.0, col_3='string_test_1', col_4=datetime.

--- Nodo 2 ---
Score: 0.5830
especially in cases where data is sorted in 
descending order
•	 It performs a more expensive operation compared to head(n) as it may involve scanning 
the entire dataset
In summary, take and collect are used to retrieve data elements, with 

In [9]:
queries = [
    "What does collect() function does in pyspark?",
    "How can we control the intern process in a Spark application?", 
    "What is the under-the-hood workflow in a Spark application?",
    "What is the role of the Driver component?",
    "How is the execution hierarchy organized in Spark?",
    "How can I get the number of rows in a Spark DataFrame?",
    "How can I create a DataFrame in PySpark?",
    "What is the optimized way to perform joins in PySpark?",
    "What does a 'broadcast join' means?",
    "How does persist() differ from cache() in PySpark?",
    "How can I select specific columns from a PySpark DataFrame?"
]

for query in queries: 
    print(f"Query: {query}")
    print("\n")
    print(f"Response: {query_engine.query(query)}")
    print("\n\n")

Query: What does collect() function does in pyspark?


Response: The collect() function retrieves all elements from a DataFrame or RDD and returns them as a list. It should be used with caution, as it brings all data to the driver node, which requires sufficient memory to avoid out-of-memory errors.



Query: How can we control the intern process in a Spark application?


Response: The internal process in a Spark application can be controlled through the application master and the driver. The application master is responsible for managing and coordinating the execution of the application, negotiating resources, scheduling tasks, and monitoring execution. The driver, which can run on the master node or as an independent process, manages the allocated resources and divides the application into smaller tasks for execution. Additionally, the SparkSession serves as the main entry point for interaction with Spark, allowing for the execution of queries and managing the overall application pro

KeyboardInterrupt: 