# Imports

In [1]:
import datetime
import pandas as pd
import sqlite3

from pymilvus import MilvusClient
from langchain_huggingface.embeddings import HuggingFaceEmbeddings

# Step 1: Data Ingestion

For this notebook, we will use a public dataset found on kaggle: https://www.kaggle.com/datasets/shivamb/disney-movies-and-tv-shows. This data set is a collection of titles on the Disney+ platform. This dataset is a mix of structured data (type, release_year, listed_in, etc.) and unstructured data (title, description).

## 1.1 Load Data

In [2]:
raw_disney_df = pd.read_csv('disney_plus_data.csv')

In [3]:
# Preview the data
raw_disney_df.head()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,Movie,Duck the Halls: A Mickey Mouse Christmas Special,"Alonso Ramirez Ramos, Dave Wasson","Chris Diamantopoulos, Tony Anselmo, Tress MacN...",,"November 26, 2021",2016,TV-G,23 min,"Animation, Family",Join Mickey and the gang as they duck the halls!
1,s2,Movie,Ernest Saves Christmas,John Cherry,"Jim Varney, Noelle Parker, Douglas Seale",,"November 26, 2021",1988,PG,91 min,Comedy,Santa Claus passes his magic bag to a new St. ...
2,s3,Movie,Ice Age: A Mammoth Christmas,Karen Disher,"Raymond Albert Romano, John Leguizamo, Denis L...",United States,"November 26, 2021",2011,TV-G,23 min,"Animation, Comedy, Family",Sid the Sloth is on Santa's naughty list.
3,s4,Movie,The Queen Family Singalong,Hamish Hamilton,"Darren Criss, Adam Lambert, Derek Hough, Alexa...",,"November 26, 2021",2021,TV-PG,41 min,Musical,"This is real life, not just fantasy!"
4,s5,TV Show,The Beatles: Get Back,,"John Lennon, Paul McCartney, George Harrison, ...",,"November 25, 2021",2021,,1 Season,"Docuseries, Historical, Music",A three-part documentary from Peter Jackson ca...


## 1.2 Data Exploration

Here we will do some basic exploration of the data to see if there is anything of note

In [4]:
def get_distinct_elements(col_series):
    all_elems = set()
    for elems_str in col_series:
        if pd.isnull(elems_str):
            continue
        items = elems_str.split(",")
        for item in items:
            all_elems.add(item.strip())
    return all_elems

In [5]:
# Each show_id is unique
raw_disney_df['show_id'].value_counts().describe()

count    1450.0
mean        1.0
std         0.0
min         1.0
25%         1.0
50%         1.0
75%         1.0
max         1.0
Name: count, dtype: float64

In [6]:
# All of the types are "Movie" or "TV Show"
raw_disney_df['type'].unique()

array(['Movie', 'TV Show'], dtype=object)

In [7]:
# None of these ratings are out of the ordinary
raw_disney_df['rating'].unique()

array(['TV-G', 'PG', 'TV-PG', nan, 'PG-13', 'TV-14', 'G', 'TV-Y7', 'TV-Y',
       'TV-Y7-FV'], dtype=object)

In [8]:
# All of these countries are valid
get_distinct_elements(raw_disney_df['country'])

{'Angola',
 'Argentina',
 'Australia',
 'Austria',
 'Belgium',
 'Botswana',
 'Brazil',
 'Canada',
 'China',
 'Czech Republic',
 'Denmark',
 'Egypt',
 'France',
 'Germany',
 'Guatemala',
 'Hong Kong',
 'Hungary',
 'India',
 'Iran',
 'Ireland',
 'Japan',
 'Kazakhstan',
 'Luxembourg',
 'Malaysia',
 'Mexico',
 'Namibia',
 'New Zealand',
 'Norway',
 'Pakistan',
 'Panama',
 'Philippines',
 'Poland',
 'Russia',
 'Singapore',
 'Slovenia',
 'South Africa',
 'South Korea',
 'Spain',
 'Sweden',
 'Switzerland',
 'Syria',
 'Taiwan',
 'Tanzania',
 'Thailand',
 'United Arab Emirates',
 'United Kingdom',
 'United States'}

In [9]:
# Check to see what the options for duration are. They all end in 'min', 'Season' or 'Seasons
get_distinct_elements(raw_disney_df['duration'])

{'1 Season',
 '1 min',
 '10 Seasons',
 '10 min',
 '100 min',
 '101 min',
 '102 min',
 '103 min',
 '104 min',
 '105 min',
 '106 min',
 '107 min',
 '108 min',
 '109 min',
 '11 min',
 '110 min',
 '111 min',
 '112 min',
 '113 min',
 '114 min',
 '115 min',
 '116 min',
 '117 min',
 '118 min',
 '119 min',
 '12 min',
 '120 min',
 '121 min',
 '122 min',
 '123 min',
 '124 min',
 '125 min',
 '126 min',
 '127 min',
 '128 min',
 '129 min',
 '13 min',
 '130 min',
 '131 min',
 '132 min',
 '134 min',
 '135 min',
 '136 min',
 '137 min',
 '138 min',
 '139 min',
 '14 min',
 '140 min',
 '142 min',
 '143 min',
 '144 min',
 '145 min',
 '147 min',
 '148 min',
 '15 min',
 '150 min',
 '151 min',
 '152 min',
 '154 min',
 '16 Seasons',
 '160 min',
 '162 min',
 '169 min',
 '170 min',
 '175 min',
 '18 min',
 '180 min',
 '182 min',
 '183 min',
 '19 Seasons',
 '19 min',
 '2 Seasons',
 '2 min',
 '20 min',
 '21 min',
 '22 min',
 '23 min',
 '24 min',
 '25 min',
 '26 min',
 '27 min',
 '3 Seasons',
 '3 min',
 '30 min',
 

In [10]:
# All of these categories seem reasonable
get_distinct_elements(raw_disney_df['listed_in'])

{'Action-Adventure',
 'Animals & Nature',
 'Animation',
 'Anime',
 'Anthology',
 'Biographical',
 'Buddy',
 'Comedy',
 'Coming of Age',
 'Concert Film',
 'Crime',
 'Dance',
 'Disaster',
 'Documentary',
 'Docuseries',
 'Drama',
 'Family',
 'Fantasy',
 'Game Show / Competition',
 'Historical',
 'Kids',
 'Lifestyle',
 'Medical',
 'Movies',
 'Music',
 'Musical',
 'Mystery',
 'Parody',
 'Police/Cop',
 'Reality',
 'Romance',
 'Romantic Comedy',
 'Science Fiction',
 'Series',
 'Soap Opera / Melodrama',
 'Sports',
 'Spy/Espionage',
 'Superhero',
 'Survival',
 'Talk Show',
 'Thriller',
 'Travel',
 'Variety',
 'Western'}

In [11]:
# The largest description is 102 characters, so not too large
max(raw_disney_df['description'].map(lambda x: len(x) if not pd.isnull(x) else 0))

102

## 1.3 Create Database Tables

Setting up a database properly and creating a good structure can help optimize search speed, ease of access, ensure data consistency, and maintain data connections. Setting up the correct data types can help ensure clean data and also ease an analysts job of converting the datatypes themselves. Here we will create integers for fields that need integers and also create foreign keys to make sure data dependencies are maintained. A relational database should be sufficient for stardard querying and metrics.

As for other structural considerations, we split the dataset into several different tables. The benefit is that an analyst or an application may likely want to filter based on Movies or TV shows, so we will populate the two main tables 'disney_movie' and 'disney_country'. After that, due to the list-based nature of several fields (director, cast, country, listed_in), separate lookup tables are created and are linked on the show_id dimension.

Another design choice was made to separate the "duration" field into two fields "duration_min" (which represents the number of minutes a movie has) and "seasons" to number of season for a tv show. Both of these ar converted to integers to be more useful for later analysis and trending.

In [12]:
# Connect to database
con = sqlite3.connect("disney.db")
cur = con.cursor()

In [13]:
# Movie Tables
create_movies_stmt = """CREATE TABLE IF NOT EXISTS disney_movie (
	show_id INTEGER PRIMARY KEY,
   	type TEXT DEFAULT NULL,
	title TEXT DEFAULT NULL,
	date_added TEXT DEFAULT NULL,
	release_year INTEGER DEFAULT NULL,
	rating TEXT DEFAULT NULL, 
	duration_min INTEGER DEFAULT NULL,
	description TEXT DEFAULT NULL
) STRICT"""

create_movie_country_stmt = """CREATE TABLE IF NOT EXISTS movie_country (
    show_id INTEGER,
    country TEXT DEFAULT NULL,
    FOREIGN KEY(show_id) REFERENCES disney_movie(show_id)
) STRICT"""

create_movie_directors_stmt = """CREATE TABLE IF NOT EXISTS movie_director (
    show_id INTEGER,
    director TEXT DEFAULT NULL,
    FOREIGN KEY(show_id) REFERENCES disney_movie(show_id)
) STRICT"""

create_movie_cast_stmt = """CREATE TABLE IF NOT EXISTS movie_person (
    show_id INTEGER,
    person TEXT DEFAULT NULL,
    FOREIGN KEY(show_id) REFERENCES disney_movie(show_id)
) STRICT"""

create_movie_listed_in_stmt = """CREATE TABLE IF NOT EXISTS movie_category (
    show_id INTEGER,
    category TEXT DEFAULT NULL,
    FOREIGN KEY(show_id) REFERENCES disney_movie(show_id)
) STRICT"""

In [14]:
# TV Tables
create_tv_stmt = """CREATE TABLE IF NOT EXISTS disney_tv (
	show_id INTEGER PRIMARY KEY,
   	type TEXT DEFAULT NULL,
	title TEXT DEFAULT NULL,
	date_added TEXT DEFAULT NULL,
	release_year INTEGER DEFAULT NULL,
	rating TEXT DEFAULT NULL, 
	num_seasons INTEGER DEFAULT NULL,
	description TEXT DEFAULT NULL
) STRICT"""

create_tv_country_stmt = """CREATE TABLE IF NOT EXISTS tv_country (
    show_id INTEGER,
    country TEXT DEFAULT NULL,
    FOREIGN KEY(show_id) REFERENCES disney_tv(show_id)
) STRICT"""

create_tv_directors_stmt = """CREATE TABLE IF NOT EXISTS tv_director (
    show_id INTEGER,
    director TEXT DEFAULT NULL,
    FOREIGN KEY(show_id) REFERENCES disney_tv(show_id)
) STRICT"""

create_tv_cast_stmt = """CREATE TABLE IF NOT EXISTS tv_person (
    show_id INTEGER,
    person TEXT DEFAULT NULL,
    FOREIGN KEY(show_id) REFERENCES disney_tv(show_id)
) STRICT"""

create_tv_listed_in_stmt = """CREATE TABLE IF NOT EXISTS tv_category (
    show_id INTEGER,
    category TEXT DEFAULT NULL,
    FOREIGN KEY(show_id) REFERENCES disney_tv(show_id)
) STRICT"""

In [15]:
cur.execute(create_movies_stmt)
cur.execute(create_movie_country_stmt)
cur.execute(create_movie_directors_stmt)
cur.execute(create_movie_cast_stmt)
cur.execute(create_movie_listed_in_stmt)


<sqlite3.Cursor at 0x10378f040>

In [16]:
cur.execute(create_tv_stmt)
cur.execute(create_tv_country_stmt)
cur.execute(create_tv_directors_stmt)
cur.execute(create_tv_cast_stmt)
cur.execute(create_tv_listed_in_stmt)

<sqlite3.Cursor at 0x10378f040>

# Step 2: Preprocess Data for Database

## 2.1 Preprocess Fields

In order to get the data ready, all of the strings in the original dataset are converted to their respective datatypes and the list-based fields are exploded into a one-to-many relationship. We also strip all of the fields in case there are leading or trailing spaces since those are not typically included on purpose.

In [17]:
def get_duration(duration_str):
    elements = duration_str.split(" ")

    unit = elements[1].lower()
    if unit != "min" and unit != "seasons" and unit != "season":
        return None
    else:
        return int(elements[0])

def col_to_list(list_str):
    if list_str == None:
        return None
    
    elems = list_str.split(",")
    return [elem.strip() for elem in elems]

In [18]:
disney_df  = raw_disney_df.copy()

# Strip out trailing or leading whitespace
for col in disney_df.columns:
    disney_df[col] = disney_df[col].map(lambda x: x.strip() if isinstance(x, str) else x)

# Change Datatypes
disney_df['show_id'] = disney_df.index
disney_df['date_added'] = disney_df['date_added'].astype('datetime64[ns]')
disney_df['release_year'] = disney_df['release_year'].astype('int64')
disney_df['duration'] = disney_df['duration'].map(get_duration)

# Make dates human-readable
disney_df['date_added'] = disney_df['date_added'].map(lambda dt: None if pd.isnull(dt) else dt.strftime("%Y-%m-%d"))

# Use None instead of NaN and NaT
for col in disney_df.columns:
    disney_df[col] = disney_df[col].map(lambda x: None if pd.isnull(x) else x)

# Create lists for columns that are lists
disney_df['country'] = disney_df['country'].map(col_to_list)
disney_df['director'] = disney_df['director'].map(col_to_list)
disney_df['cast'] = disney_df['cast'].map(col_to_list)
disney_df['listed_in'] = disney_df['listed_in'].map(col_to_list)

## 2.2 Preprocessing Considerations:

For this dataset, null values are not a big issue as there is no practical impact of having values be null. However, we will assume certain things for this dataset. We will check for null values and for values out of range. For demonstration purposes, we will default null-values in the country field to be for the United States and will assume the release year to be between 1950 and today. If there are any past today's date, we will assume it is for this year. 

For other datasets, null values may have a negative impact on downstream usage of the database. Null values and data issues will need more investigation since the data may not be missing at random. Based on the upstream reasons for null values and the downstream affects, we may want to forwardfill, backfill, impute, or leave as null. 

For example, perhaps the release years past today are mistakes, or perhaps there is a common mistake that we can find. Maybe 2081 really means 2018 and there was manual entry error where an individual swapped the 1 and 8; in this case, it would not be approprate to assume the current year as we are assuming now. There are also other fields that could have data entry issues, commonly names are spelled wrong in datasets, and we can use text similarity to predict if two names are really the same person. Similarly, there could be a consistency issue such as J. Smith being ths ame person as John Smith; this is not always the case but for some datasets, we may be able to make this assumption if a last name is very unique or if the data set is limited to a known set of fields.

We may also consider removing non-ASCII characters too, however, that is not an issue with this dataset.

In [19]:
def get_clean_release_year(title_year):
    current_year = datetime.date.today().year

    if pd.isnull(title_year):
        return None
    else:
        return max(min(current_year, title_year), 1950)

disney_df['country'] = disney_df['country'].fillna('United States')
disney_df['release_year'] = disney_df['release_year'].map(get_clean_release_year) 

In [20]:
# Separate the dataframe after preprocessing
movies_df = disney_df[disney_df['type'] == 'Movie']
tv_df = disney_df[disney_df['type'] == 'TV Show']

## 2.3 Populate Database

The following functions then populate the database.

In [21]:
def clean_row(row):
    def clean_elem(elem):
        cleaned_elem = str(elem) if not pd.isnull(elem) else elem
        return cleaned_elem
    return [clean_elem(elem) for elem in row]


def create_sql_insert_list(values_df):
    cleaned_values = [clean_row(row) for row in values_df]
    return cleaned_values

def populate_table(con, cursor, tablename, df):
    num_values = len(df.columns)
    placeholder_str = ", ".join(["?"]*num_values)
    values_list = create_sql_insert_list(df.values.tolist())   
    cursor.executemany(f"INSERT INTO {tablename} VALUES ({placeholder_str})", values_list)
    con.commit()
    
def populate_all_tables(con, cursor, df, label):
    titles_df = df[['show_id', 'type', 'title', 'date_added', 'release_year', 'rating', 'duration', 'description']]
    country_df = df[['show_id','country']].dropna(subset='country')
    directors_df = df[['show_id','director']].dropna(subset='director')
    cast_df = df[['show_id', 'cast']].dropna(subset='cast')
    category_df = df[['show_id', 'listed_in']].dropna(subset='listed_in')

    populate_table(con, cursor, f'disney_{label}', titles_df)

    populate_table(con, cursor, f'{label}_country', country_df.explode('country'))
    populate_table(con, cursor, f'{label}_director', directors_df.explode('director'))
    populate_table(con, cursor, f'{label}_person', cast_df.explode('cast'))
    populate_table(con, cursor, f'{label}_category', category_df.explode('listed_in'))


In [22]:
populate_all_tables(con, cur, movies_df, 'movie')
populate_all_tables(con, cur, tv_df, 'tv')

## 2.4 Retrive Data

Below are examples of how one may query the database for the data. The "get_all_titles" function provides functionality to return all titles similar to the original dataset.

In [23]:
# Execute a SELECT query
cur.execute("""SELECT * FROM disney_movie""")
column_names = [description[0] for description in cur.description]
disney_movie_df = pd.DataFrame(cur.fetchall(), columns=column_names)

cur.execute("SELECT * FROM movie_director")
column_names = [description[0] for description in cur.description]
movie_director_df = pd.DataFrame(cur.fetchall(), columns=column_names)

cur.execute("SELECT * FROM movie_person")
column_names = [description[0] for description in cur.description]
movie_person_df = pd.DataFrame(cur.fetchall(), columns=column_names)

cur.execute("SELECT * FROM disney_tv")
column_names = [description[0] for description in cur.description]
disney_tv_df = pd.DataFrame(cur.fetchall(), columns=column_names)

cur.execute("SELECT * FROM tv_director")
column_names = [description[0] for description in cur.description]
tv_director_df = pd.DataFrame(cur.fetchall(), columns=column_names)

From the above, an analyst could join on these tables and do basic statistics and filter the data based on country, director, etc. The analyst may also analyze the data to see which categories of content produce the highest ratings. With some NLP techniques, a data scientist may also be able to predict the ratings of any new titles that Disney is planning on releasing based on the categories, descriptions, and titles.

In [24]:
def get_titles(cursor, label):
    cursor.execute(f"""
        SELECT disney_{label}.*, 
            GROUP_CONCAT(director, ', ') AS directors, 
            GROUP_CONCAT(person, ', ') AS cast, 
            GROUP_CONCAT(category, ', ') AS listed_in
        FROM disney_{label}
        LEFT JOIN {label}_director ON disney_{label}.show_id = {label}_director.show_id
        LEFT JOIN {label}_person ON disney_{label}.show_id = {label}_person.show_id
        LEFT JOIN {label}_category ON disney_{label}.show_id = {label}_category.show_id
        GROUP BY disney_{label}.show_id
        ORDER BY disney_{label}.show_id asc
        """)
    column_names = [description[0] for description in cur.description]
    res_df = pd.DataFrame(cur.fetchall(), columns=column_names)
    return res_df

def get_all_titles(cursor):
    tv_titles_df = get_titles(cursor, 'tv')
    movie_titles_df = get_titles(cursor, 'movie')
    return pd.concat([tv_titles_df, movie_titles_df])

    

In [25]:
disney_titles_df = get_all_titles(cur)

Creating a similar structure to the original dataset is useful, however, joining in this way makes it more difficult to trend on the list-based fields.

# Step 3: Vectorization

We will use Huggingface for Embeddings and Milvus for the Vector database as they are open-source, and actively being developed. A vector database allows us to provide quick vector similarity search and is a scalable solution as the number of documents stored and vectorized becomes larger. The descriptions hold the most information, so we will want be storing this information into the vector database.

In [26]:
milvus_client = MilvusClient("./milvus_disney.db")

In [27]:
milvus_client.create_collection(
    collection_name="disney_collection",
    dimension=384
)

In [28]:
embeddings = HuggingFaceEmbeddings(model_name="paraphrase-MiniLM-L3-v2")

  from tqdm.autonotebook import tqdm, trange


For documents with larger texts, we will chunk each of the documents. There are several ways to do this such as Fixed Size Chunking, Recursive Chunking, Document Chunking, etc. Here we will use recursive chunking where we are chunking based on a space with a max chunk size of 100. The generator should help with memory for larger texts.

In [29]:
CHUNK_SIZE = 100

# Generator that creates chunks based on the max_chunk size, splitting by " "
def get_chunks(s, max_chunk_size=CHUNK_SIZE):
    start = 0
    end = 0
    while start + max_chunk_size  < len(s) and end != -1: # Loop and find all words that fit in the chunk
        end = s.rfind(" ", start, start + max_chunk_size + 1)
        yield s[start:end]
        start = end + 1
    yield s[start:] # The remaining words

Here we are going to insert into the Milvus vector database using a batch size of 500. This dataset isn't too big, but batch updating can help for larger datasets

In [30]:
BATCH_SIZE = 500

collection_data = []
for i, row in disney_titles_df.iterrows():
    description = row['description']
    for chunk in get_chunks(description):
        chunk_vector = embeddings.embed_query(chunk)
        collection_element = {"id": i, 
              "vector": chunk_vector, 
              "text": chunk, 
              "type": row['type'],
              "rating": row['rating'],
              "num_seasons": row['num_seasons'],
              "duration": row['duration_min'],
        }
        collection_data.append(collection_element)

        # Insert embedding data into Milvus when batch size hit
        if len(collection_data) % BATCH_SIZE == 0:
            res = milvus_client.insert(
                collection_name="disney_collection",
                data=collection_data
            )
            collection_data = []

# Insert remaining embedding data into Milvus
res = milvus_client.insert(
    collection_name="disney_collection",
    data=collection_data
)

We did not do so here, but creating multiple partitions may make search even faster and improve query performance as it can limit the search field if specifying a partition. For example, we could partition based on movie vs. tv show or partition baased on country. 

# Step 4: Query and Retrieve

After adding the embeddings to the collection, we can then search effectively and help generate relevant context for an LLM. Below you can see how this could work in practice. Say we want to know who the Avengers are, we can use the Milvus client to search and return the top 10 embeddings regarding the Avengers.

In [31]:
query = "Who are the Avengers?"
query_embedding = embeddings.embed_query(query)


res = milvus_client.search(
    collection_name="disney_collection", # Replace with the actual name of your collection
    data=[query_embedding],
    limit=10, # Max. number of search results to return
    output_fields=['text']
)

In [32]:
search_results = [elem['entity']['text'] for elem in res[0]]
search_results

['A new team of heroes joins the Avengers.',
 'The Avengers must be willing to sacrifice all to defeat Thanos.',
 'Captain America and Iron Man clash, causing the Avengers to choose sides.',
 'The epic finale to the Infinity Saga, this dramatic showdown pits the Avengers against Thanos.',
 'Avengers Iron Man and Hulk team up to fight an energy-devouring monster.',
 'The greatest heroes on the planet unite to face the greatest villains in the Super Hero Squad.',
 'Marvel’s mightiest heroes combine their power.',
 'Marvel Studios’ Captain Marvel launches the MCU’s most powerful hero.',
 'The teenage son in a superhero family anxiously awaits his super powers.',
 'Marvel’s young powered heroes join forces to protect the Universe.']

From the above search query, we can see that the search results correctly found movie and tv descriptions mentioning the Avengers and Marvel. In a RAG system, this will provide a narrower context to the LLM, allowing it to more quickly and accurately create a response to  the original query "Who are the Avengers?"