::: {.callout-note appearance="simple"}
### Licensing Notice
Text and media: [CC BY 4.0](https://creativecommons.org/licenses/by/4.0/)
Code and snippets: [Apache License 2.0](http://www.apache.org/licenses/LICENSE-2.0)
:::

## Motivation and Meaning

**R**etrieval **A**ugmented **G**eneration (RAG) is inference-time context injection: pull **relevant** external data and condition generation on it. This *grounds* responses in authoritative sources while keeping LLMs parametric knowledge (weights) frozen.

#### Motivation
- **Knowledge limited to training data**: Your proprietary/domain-specific data isn't there
- **Fixed knowledge cut-off date**: The model can't answer questions about recent events. Without RAG, they will either refuse to answer or *hallucinate*.

##### Alternatives & Trade-offs
- Why not fine-tune?
    - Fine-tuning excels at teaching *task formats* (SQL generation, JSON output) and *reasoning styles*, not injecting factual knowledge.
    - **Catastrophic forgetting**: Updating knowledge degrades performance on other tasks as model weights get overwritten.
    - **Inefficient**: requires separate fine-tuned checkpoints per domain or use-case or document. It is tricky to learn new knowledge without regressing on old knowledge even with LoRA/QLoRA. 
- Ok, why not just stuff everything in the context window?
    - **Recall Degradation**: While 1M+ token models ace "single-needle" tests, performance drops significantly (to ~60-70%) when retrieving multiple distributed facts
    - **Cost & Latency**: Processing massive contexts is computationally expensive and slow compared to vector search. Retrieval remains necessary for corpora exceeding the window size.
- RAG is a reasonable pattern:
    - When you need fresh, attributable knowledge with minimal model changes and can tolerate added latency.

#### The RAG Pipeline
RAG acts as a filter to inject only *relevant* context. A typical production pipeline looks like this:

0. **Ingestion & Indexing**: Chunk documents, generate embeddings, and upsert into a vector database. *Note: In production, this is a continuous sync pipeline, not a one-time setup.*
1. **Retrieval**: For a user query, search your indexed corpus (vector/keyword) and pull the top‑k relevant chunks, often followed by a *re-ranking* step for precision.
2. **Augmented (prompt)**: Inject selected chunks into the system prompt or user message with appropriate metadata (source citations).
3. **Generation**: The LLM generates an answer conditioned *strictly* on the provided context, minimizing external knowledge leakage.

```{mermaid}
%%| label: fig-mermaid-ragv1
%%| fig-cap: "RAG pipeline flowchart showing ingestion pipeline, query processing, retrieval, augmentation and LLM generation."

flowchart TB
    n2["LLM"] L_n2_n4_0@-- generates grounded answer --> n4["Answer"]
    n3["Document Corpus"] L_n3_n5_0@<-- ingestion pipeline<br>(chunk + embed) --> n5["Hybrid index<br>(inverted keywords<br>+ <br>vector embeddings)<br><br>"]
    n5 L_n5_n6_0@-- "top-k" --> n6["Retrieval &amp; Re-ranking"]
    n6 L_n6_n7_0@-- "top-k re-ranked chunks + citation metadata" --> n7["Prompt Builder"]
    n7 L_n7_n2_0@-- "system prompt (use only given context) + user query + <br>top-k re-ranked chunks &amp; citation metadata" --> n2
    n1["User Query"] L_n1_n8_0@--> n8["Query processing <br>&amp; embedding"]
    n1 L_n1_n7_0@-- user query --> n7
    n8 L_n8_n6_0@-- text + query expansions + embeddings --> n6

    n3@{ shape: docs}
    n5@{ shape: cyl}
    n6@{ shape: rect}
    n7@{ shape: rect}
    n1@{ shape: rect}
    n8@{ shape: rect}

    L_n2_n4_0@{ animation: slow } 
    L_n3_n5_0@{ animation: none } 
    L_n5_n6_0@{ animation: slow } 
    L_n6_n7_0@{ animation: slow } 
    L_n7_n2_0@{ animation: slow } 
    L_n1_n8_0@{ animation: slow } 
    L_n1_n7_0@{ animation: slow } 
    L_n8_n6_0@{ animation: slow }
```

In [4]:
import pyspark
from delta import *
from pyspark.sql import SparkSession

builder = (SparkSession.builder
           .appName("LocalDatabricksPrep")
           .master("local[*]")
           .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
           .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
           # This downloads the Delta jar automatically:
           .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.2.0") 
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

print("Spark + Delta session created!")

Spark + Delta session created!


In [6]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import pyspark.sql.types as T
import pandas as pd
from pyspark import SparkFiles


url = "https://raw.githubusercontent.com/rfordatascience/tidytuesday/master/data/2021/2021-04-20/netflix_titles.csv"
spark.sparkContext.addFile(url)


# 1. Download Raw CSV directly
df = spark.read.option("header", "true").option("inferSchema", "true").csv("file://"+ SparkFiles.get("netflix_titles.csv"))
df.show(truncate=False, n=5)


+-------+-------+-----+-----------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+-----------------+------------+------+---------+--------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|show_id|type   |title|director         |cast                                                                                                                                                                      |country      |date_added       |release_year|rating|duration |listed_in                                               |description                                                                                                                                          |
+-------+-------+-----+-------------

In [7]:
df.printSchema()

root
 |-- show_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [8]:
# The Task:

# From netflix_bronze, select title, release_year, and date_added.

# Clean date_added (currently string like "September 25, 2021") into a real date.

# Calculate days_diff = date_added - release_year (assume Jan 1st).


sub_df = df.select(
    F.col("title"),
    F.col("release_year"),
    F.col("date_added"),
)

sub_df = sub_df.withColumn("date_added_clean", F.to_date(F.col("date_added"), "MMMM d, yyyy"))
sub_df = sub_df.withColumn("release_year_clean", F.to_date(F.col("release_year"), "y"))
# Assuming 1 jan of release year
sub_df = sub_df.withColumn("days_diff", F.date_diff(F.col("date_added_clean"), F.col("release_year_clean"))) # default is somehow days


sub_df.show(n=5)



+-----+------------+-----------------+----------------+------------------+---------+
|title|release_year|       date_added|date_added_clean|release_year_clean|days_diff|
+-----+------------+-----------------+----------------+------------------+---------+
|   3%|        2020|  August 14, 2020|      2020-08-14|        2020-01-01|      226|
| 7:19|        2016|December 23, 2016|      2016-12-23|        2016-01-01|      357|
|23:59|        2011|December 20, 2018|      2018-12-20|        2011-01-01|     2910|
|    9|        2009|November 16, 2017|      2017-11-16|        2009-01-01|     3241|
|   21|        2008|  January 1, 2020|      2020-01-01|        2008-01-01|     4383|
+-----+------------+-----------------+----------------+------------------+---------+
only showing top 5 rows


In [9]:
# Get a simple list of movies for "Brad Pitt".

df.filter(F.lower(F.col("cast")).contains("brad pitt")).select(F.col("title")).show()

+--------------------+
|               title|
+--------------------+
|A Stoning in Fulh...|
|               Babel|
|          By the Sea|
|Inglourious Basterds|
| Killing Them Softly|
|    Ocean's Thirteen|
|      Ocean's Twelve|
|         War Machine|
+--------------------+



In [10]:
# "Who are the top 5 most frequent actors in the dataset?"
temp_df = df.withColumn("cast_split", F.split(F.col("cast"), ",")).withColumn("cast_exploded", F.explode(F.col("cast_split")))
temp_df.groupBy(F.col("cast_exploded")).agg({"cast_exploded": 'count'}).select("cast_exploded", F.col("count(cast_exploded)").alias("count")).orderBy(F.col("count").desc()).show(n=5)

+-----------------+-----+
|    cast_exploded|count|
+-----------------+-----+
|      Anupam Kher|   38|
| Takahiro Sakurai|   28|
|          Om Puri|   27|
|   Shah Rukh Khan|   27|
|      Boman Irani|   25|
+-----------------+-----+
only showing top 5 rows


In [11]:
temp_df.groupBy(
    F.col("cast_exploded")
).agg(
    F.count("*").alias("count")
).orderBy(F.col("count").desc()).show(5)

+-----------------+-----+
|    cast_exploded|count|
+-----------------+-----+
|      Anupam Kher|   38|
| Takahiro Sakurai|   28|
|          Om Puri|   27|
|   Shah Rukh Khan|   27|
|      Boman Irani|   25|
+-----------------+-----+
only showing top 5 rows


In [13]:
temp_df = df.withColumn("cast_split", F.split(F.col("cast"), ",")).withColumn("cast_split_clean", F.transform(F.col("cast_split"), lambda x: F.trim(x)))
temp_df.show()

+-------+-------+------+--------------------+--------------------+--------------------+-----------------+------------+------+---------+--------------------+--------------------+--------------------+--------------------+
|show_id|   type| title|            director|                cast|             country|       date_added|release_year|rating| duration|           listed_in|         description|          cast_split|    cast_split_clean|
+-------+-------+------+--------------------+--------------------+--------------------+-----------------+------------+------+---------+--------------------+--------------------+--------------------+--------------------+
|     s1|TV Show|    3%|                NULL|João Miguel, Bian...|              Brazil|  August 14, 2020|        2020| TV-MA|4 Seasons|International TV ...|In a future where...|[João Miguel,  Bi...|[João Miguel, Bia...|
|     s2|  Movie|  7:19|   Jorge Michel Grau|Demián Bichir, Hé...|              Mexico|December 23, 2016|        2016| T

In [46]:
# “For the top 10 actors by movie count, what is the average gap in years between their consecutive movies?”
exploded_temp_df = temp_df.withColumn("actor", F.explode(F.col("cast_split_clean"))).filter(F.col("actor") != "").select("actor", "title", "release_year")
# exploded_temp_df.show()

actor_counts = exploded_temp_df.groupBy(F.col("actor")).agg(F.count("*").alias("count")).orderBy(F.col("count").desc())
# actor_counts.show()

top10 = actor_counts.limit(10)


# join back on original table
top10_actors = exploded_temp_df.join(F.broadcast(top10), on="actor", how="inner")
# top10_actors.show()


# window func.

w = Window.partitionBy("actor").orderBy("release_year")


# 5. Compute previous movie year using lag, then gap
gaps_df = (top10_actors
    .withColumn("prev_year", F.lag("release_year").over(w))
    .filter(F.col("prev_year").isNotNull())
    .withColumn("gap_years", F.datediff(F.col("release_year"), F.col("prev_year"))/F.lit(365.0))
)

gaps_df.show()


+------------+--------------------+------------+-----+---------+------------------+
|       actor|               title|release_year|count|prev_year|         gap_years|
+------------+--------------------+------------+-----+---------+------------------+
|Akshay Kumar|Mujhse Shaadi Karogi|        2004|   29|     2004|               0.0|
|Akshay Kumar|             Bewafaa|        2005|   29|     2004|1.0027397260273974|
|Akshay Kumar|               Insan|        2005|   29|     2005|               0.0|
|Akshay Kumar|         Bhagam Bhag|        2006|   29|     2005|               1.0|
|Akshay Kumar|Humko Deewana Kar...|        2006|   29|     2006|               0.0|
|Akshay Kumar|Jaan-E-Mann: Let'...|        2006|   29|     2006|               0.0|
|Akshay Kumar|     Phir Hera Pheri|        2006|   29|     2006|               0.0|
|Akshay Kumar|     Bhool Bhulaiyaa|        2007|   29|     2006|               1.0|
|Akshay Kumar|     Namastey London|        2007|   29|     2007|            

In [48]:
gaps_df.groupBy("actor").agg(F.avg(F.col("gap_years")).alias("avg_gap_year")).orderBy(F.col("avg_gap_year").asc()).show()

+----------------+-------------------+
|           actor|       avg_gap_year|
+----------------+-------------------+
|       Yuki Kaji|0.38482613277133826|
|Takahiro Sakurai| 0.4288649706457926|
|    Akshay Kumar| 0.5361056751467711|
|     Boman Irani| 0.6158061116965227|
|     Anupam Kher| 0.7077848312729702|
|  Shah Rukh Khan| 0.7946817082997581|
|    Paresh Rawal|  1.116122233930453|
|         Om Puri|  1.207746811525744|
|Naseeruddin Shah| 1.2422295701464334|
|Amitabh Bachchan| 1.6934668071654373|
+----------------+-------------------+



25/12/23 16:39:25 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 956198 ms exceeds timeout 120000 ms
25/12/23 16:39:25 WARN SparkContext: Killing executors is not supported by current scheduler.
25/12/23 16:39:26 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:359)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [21]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7787 entries, 0 to 7786
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   show_id       7787 non-null   object
 1   type          7787 non-null   object
 2   title         7787 non-null   object
 3   director      5398 non-null   object
 4   cast          7069 non-null   object
 5   country       7280 non-null   object
 6   date_added    7777 non-null   object
 7   release_year  7787 non-null   int64 
 8   rating        7780 non-null   object
 9   duration      7787 non-null   object
 10  listed_in     7787 non-null   object
 11  description   7787 non-null   object
dtypes: int64(1), object(11)
memory usage: 730.2+ KB


In [22]:
df['release_year'] = df['release_year'].astype('category')

In [23]:
help(df.describe)

Help on method describe in module pandas.core.generic:

describe(percentiles=None, include=None, exclude=None) -> 'Self' method of pandas.core.frame.DataFrame instance
    Generate descriptive statistics.

    Descriptive statistics include those that summarize the central
    tendency, dispersion and shape of a
    dataset's distribution, excluding ``NaN`` values.

    Analyzes both numeric and object series, as well
    as ``DataFrame`` column sets of mixed data types. The output
    will vary depending on what is provided. Refer to the notes
    below for more detail.

    Parameters
    ----------
    percentiles : list-like of numbers, optional
        The percentiles to include in the output. All should
        fall between 0 and 1. The default is
        ``[.25, .5, .75]``, which returns the 25th, 50th, and
        75th percentiles.
    include : 'all', list-like of dtypes or None (default), optional
        A white list of data types to include in the result. Ignored
        fo

In [24]:
df

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,TV Show,3%,,"João Miguel, Bianca Comparato, Michel Gomes, R...",Brazil,"August 14, 2020",2020,TV-MA,4 Seasons,"International TV Shows, TV Dramas, TV Sci-Fi &...",In a future where the elite inhabit an island ...
1,s2,Movie,7:19,Jorge Michel Grau,"Demián Bichir, Héctor Bonilla, Oscar Serrano, ...",Mexico,"December 23, 2016",2016,TV-MA,93 min,"Dramas, International Movies",After a devastating earthquake hits Mexico Cit...
2,s3,Movie,23:59,Gilbert Chan,"Tedd Chan, Stella Chung, Henley Hii, Lawrence ...",Singapore,"December 20, 2018",2011,R,78 min,"Horror Movies, International Movies","When an army recruit is found dead, his fellow..."
3,s4,Movie,9,Shane Acker,"Elijah Wood, John C. Reilly, Jennifer Connelly...",United States,"November 16, 2017",2009,PG-13,80 min,"Action & Adventure, Independent Movies, Sci-Fi...","In a postapocalyptic world, rag-doll robots hi..."
4,s5,Movie,21,Robert Luketic,"Jim Sturgess, Kevin Spacey, Kate Bosworth, Aar...",United States,"January 1, 2020",2008,PG-13,123 min,Dramas,A brilliant group of students become card-coun...
...,...,...,...,...,...,...,...,...,...,...,...,...
7782,s7783,Movie,Zozo,Josef Fares,"Imad Creidi, Antoinette Turk, Elias Gergi, Car...","Sweden, Czech Republic, United Kingdom, Denmar...","October 19, 2020",2005,TV-MA,99 min,"Dramas, International Movies",When Lebanon's Civil War deprives Zozo of his ...
7783,s7784,Movie,Zubaan,Mozez Singh,"Vicky Kaushal, Sarah-Jane Dias, Raaghav Chanan...",India,"March 2, 2019",2015,TV-14,111 min,"Dramas, International Movies, Music & Musicals",A scrappy but poor boy worms his way into a ty...
7784,s7785,Movie,Zulu Man in Japan,,Nasty C,,"September 25, 2020",2019,TV-MA,44 min,"Documentaries, International Movies, Music & M...","In this documentary, South African rapper Nast..."
7785,s7786,TV Show,Zumbo's Just Desserts,,"Adriano Zumbo, Rachel Khoo",Australia,"October 31, 2020",2019,TV-PG,1 Season,"International TV Shows, Reality TV",Dessert wizard Adriano Zumbo looks for the nex...
