In [0]:
!pip install nltk
import nltk
import numpy as np
from nltk.corpus import stopwords
nltk.download('stopwords')

Collecting nltk
  Downloading nltk-3.7-py3-none-any.whl (1.5 MB)
[?25l[K     |▏                               | 10 kB 17.6 MB/s eta 0:00:01[K     |▍                               | 20 kB 8.7 MB/s eta 0:00:01[K     |▋                               | 30 kB 7.2 MB/s eta 0:00:01[K     |▉                               | 40 kB 6.6 MB/s eta 0:00:01[K     |█                               | 51 kB 4.7 MB/s eta 0:00:01[K     |█▎                              | 61 kB 5.5 MB/s eta 0:00:01[K     |█▌                              | 71 kB 5.4 MB/s eta 0:00:01[K     |█▊                              | 81 kB 6.1 MB/s eta 0:00:01[K     |██                              | 92 kB 6.7 MB/s eta 0:00:01[K     |██▏                             | 102 kB 5.7 MB/s eta 0:00:01[K     |██▍                             | 112 kB 5.7 MB/s eta 0:00:01[K     |██▋                             | 122 kB 5.7 MB/s eta 0:00:01[K     |██▉                             | 133 kB 5.7 MB/s eta 0:00:01[K     |██

In [0]:
data = sc.textFile('/FileStore/tables/plot_summaries.txt')
movie_metadata = sc.textFile("/FileStore/tables/movie_metadata.tsv")
movie_metadata = movie_metadata.map(lambda line: line.split('\t'))

In [0]:
def remove_stop_words(words):
    stop_words = set(stopwords.words('english'))
    filtered_words = []
    for word in words.lower().split(' '):
        word = word.replace(',', '')
        word = word.replace('.', '')
        if word not in stop_words:
            filtered_words.append(word)
    return filtered_words

def get_score(tf_value, df_value, word, total_documents):
    return tf_value * np.log(total_documents/df_value)

def get_movie_titles(movieIds):
    return [movie_metadata.filter(lambda i: i[0] == movieId[0]).collect()[0][2] for movieId in movieIds]

def search(query, table):
    results = table.filter(lambda x: x[0] == query).map(lambda x: x[1]).flatMap(lambda x: x).sortBy(lambda x: -x[1]).take(10)
    return get_movie_titles(results)

def get_total_words(query, document_words):
    return list(set(query + document_words))

def get_document_words(document_words):
    return list(map(lambda x: x[0], document_words))

def get_query_vector(query, words):
    vector = np.zeros(len(words))
    for i, word in enumerate(words):
        if word in query:
            vector[i] = 1
    return vector

def get_document_vector(words, document_words):
    vector = np.ones(len(words))
    for i, word in enumerate(words):
        if word not in document_words:
            vector[i] = 0
    return vector

def get_similarity(query, document):
    query = query.split(' ')
    document_words = get_document_words(document[1])
    total_words = get_total_words(query, document_words)
    query_vector = get_query_vector(query, total_words)
    document_vector = get_document_vector(total_words, document_words)
    return np.dot(query_vector, document_vector)/(np.linalg.norm(query_vector) * np.linalg.norm(document_vector))

def search_multiple_terms(query, document_map):
    score = document_map.map(lambda x: (x[0], get_similarity(query, x))).sortBy(lambda x: -x[1]).take(10)
    return get_movie_titles(score)

In [0]:
total_documents = data.count()

# Data = [[document, terms]]
data = data.map(lambda line: line.split('\t'))
data = data.map(lambda line: (line[0], remove_stop_words(line[1])))

In [0]:
# Maping from [[document, terms]] to [[((document, word), no_of_words)]]
document_vs_word_map = data.map(lambda x: [((x[0], w), len(x[1])) for w in x[1]])

# Maping from [[((document, word), no_of_words)]] to [((document, word), no_of_words)]
document_vs_word_map = document_vs_word_map.flatMap(lambda x: x)

# Maping from [((document, word), no_of_words)] to [((document, word), count)]
document_word_vs_freq_map = document_vs_word_map.map(lambda x: (x[0], 1/x[1]))

# Calculating tf
tf = document_word_vs_freq_map.reduceByKey(lambda x,y : x+y)

In [0]:
document_map = document_vs_word_map.map(lambda x: (x[0], 1))
document_map = document_map.reduceByKey(lambda x,y : x+y)
document_map = document_map.map(lambda x: (x[0][0], (x[0][1],x[1])))
document_map = document_map.groupByKey().mapValues(list)

In [0]:
# output [(word, (document, tf))]
word_vs_freq_map = tf.map(lambda x: (x[0][1], (x[0][0], x[1])))

# Output [(word, [(document, tf)])
df = word_vs_freq_map.groupByKey().mapValues(list)

In [0]:
# Calculate tf-idf for each document
word_score = df.map(lambda x: (x[0], [(y[0], get_score(y[1], len(x[1]), x[0], total_documents)) for y in x[1]]))

In [0]:
print(search('katniss', word_score))

['The Hunger Games', 'The Hunger Games: Catching Fire']


In [0]:
print(search_multiple_terms('dark stormy', document_map))

['Ryokunohara Meikyuu', 'The Showdown', 'Il mistero di Oberwald', 'Burlesque', 'The Big Fall', 'Escapee', 'Odd Thomas', 'Corrections', 'Dark Rainbow', 'Watercolor Painting in a Rainy Day 2']
