## Data preprocessing

In [24]:
import pandas as pd

In [25]:
df = pd.read_csv('netflix_titles.csv')
df.head()

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,TV Show,3%,,"João Miguel, Bianca Comparato, Michel Gomes, R...",Brazil,"August 14, 2020",2020,TV-MA,4 Seasons,"International TV Shows, TV Dramas, TV Sci-Fi &...",In a future where the elite inhabit an island ...
1,s2,Movie,7:19,Jorge Michel Grau,"Demián Bichir, Héctor Bonilla, Oscar Serrano, ...",Mexico,"December 23, 2016",2016,TV-MA,93 min,"Dramas, International Movies",After a devastating earthquake hits Mexico Cit...
2,s3,Movie,23:59,Gilbert Chan,"Tedd Chan, Stella Chung, Henley Hii, Lawrence ...",Singapore,"December 20, 2018",2011,R,78 min,"Horror Movies, International Movies","When an army recruit is found dead, his fellow..."
3,s4,Movie,9,Shane Acker,"Elijah Wood, John C. Reilly, Jennifer Connelly...",United States,"November 16, 2017",2009,PG-13,80 min,"Action & Adventure, Independent Movies, Sci-Fi...","In a postapocalyptic world, rag-doll robots hi..."
4,s5,Movie,21,Robert Luketic,"Jim Sturgess, Kevin Spacey, Kate Bosworth, Aar...",United States,"January 1, 2020",2008,PG-13,123 min,Dramas,A brilliant group of students become card-coun...


In [26]:
from nltk.tokenize import WordPunctTokenizer
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords

def clean_doc(doc: str, lemmatize: bool=True) -> str:
    """ Performs basic cleaning of text data. Takes in a sentence, tokenizes it, removes stopwords, and lemmatizes it.

    Args:
        doc (str): document to be cleaned
        lemmatize (bool, optional): whether to lemmatize or not. Defaults to True.

    Returns:
        str: cleaned tokens as a string with ' ' as a delimiter
    """
    if type(doc) != str: return " "
    
    tokenizer = WordPunctTokenizer()
    tokens = tokenizer.tokenize(doc)
    tokens = [word.lower() for word in tokens]

    # Remove stopwords
    stopword = set(stopwords.words('english'))
    tokens = [word for word in tokens if word not in stopword]

    # Remove punctuations
    tokens = [word for word in tokens if word.isalpha() or word.isnumeric()]
    
    if lemmatize:
        # Lemmatize
        lemmatizer = WordNetLemmatizer()
        tokens = [lemmatizer.lemmatize(word) for word in tokens]

    return " ".join(tokens)


In [27]:
# Clean the data required for feature extraction
df["processed_description"] = df["description"].apply(clean_doc)
df["cleaned_cast"] = df["cast"].apply(clean_doc, args=(False,))
df["cleaned_title"] = df["title"].apply(clean_doc, args=(False,))
df["cleaned_director"] = df["director"].apply(clean_doc, args=(False,))

In [28]:
# Concatenate all the columns into one feature column
df["feature"] = df["cleaned_title"].astype(str) + " " + df["cleaned_director"].astype(str) + " " + df["cleaned_cast"].astype(str) + " " + df["processed_description"].astype(str)


## Word Embedding
Generate a vector for a sentence

In [29]:
# all-mpnet-base-v2 -> best general purpose quality
# all-MiniLM-L6-v2 -> 5 times fater but still offers good quality
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')


In [30]:
print(f"There is total {sum(df.isna().sum())} NaN values.")
df.fillna("", inplace=True)
print(f"There is total {sum(df.isna().sum())} NaN values after replacement.")

There is total 3631 NaN values.
There is total 0 NaN values after replacement.


Converting required columns into a dictionary to be later consumed by elastic search bulk API

In [31]:
encoded = []
for i, row in df.iterrows():
    dict_ = {
        "title" : row["title"],
        "type" : row["type"],
        "director": row["director"],
        "cast": row["cast"],
        "rating": row["rating"],
        "description": row["description"],
        "release_year": row["release_year"],
        "feature_vector" :model.encode(row["feature"])
    }
    encoded.append(dict_)

In [32]:
len(encoded)

7787

### Creating Index in Elastic Search

In [33]:
from elasticsearch import helpers

In [34]:

from ElasticUtil import ElasticUtil

eu = ElasticUtil("http://localhost:9200")
es = eu.get_elastic_client()

In [35]:
# Creating a custom mapping to store the feature vector as a dense vector
settings = {
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1,
    },
    "mappings": {
        "properties": {
            "title": {
                "type" : "text",
            },
            "type": {
                "type" : "text",
            },
            "director": {
                "type" : "text",
            },
            "cast": {
                "type" : "text",
            },
            "rating": {
                "type" : "text",
            },
            "description": {
                "type" : "text",
            },
            "release_year": {
                "type" : "integer",
            },
            "feature_vector":{
                "type" : "dense_vector",
                "dims" : 384 
            }
        }
    }
    
}

In [36]:
# Deleting the index if it exists
es.indices.delete(index="netflix", ignore=[400, 404])

{'acknowledged': True}

In [37]:
my = es.indices.create(index="netflix", body=settings, ignore=400)
my


{'acknowledged': True, 'shards_acknowledged': True, 'index': 'netflix'}

In [38]:

def generator(df: pd.DataFrame):
    """Creates a generator for the dataframe to be indexed into Elasticsearch

    Args:
        df (pd.DataFrame): Dataframe to be indexed

    Raises:
        StopIteration: When the generator is exhausted

    Yields:
        dict : Dictionary containing the data to be indexed
    """
    for i, line in enumerate(df):
        yield {
            "_index": "netflix",
            "_id": i,
            "_source": {
                "title" : line['title'],
                "type" : line['type'],
                "director": line['director'],
                "cast": line['cast'],
                "rating": line['rating'],
                "description": line['description'],
                "release_year": line['release_year'],    
                "feature_vector" : line['feature_vector'],
            }
        }
    raise StopIteration

In [39]:
# Creating the generator
gen = generator(encoded)

In [40]:
# Feeding into the bulk API to index the data
try:
    res = helpers.bulk(es, gen)
except Exception as e:
    print(e)
    print("Data indexing complete!")

generator raised StopIteration
Data indexing complete!
