In [24]:
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Word2Vec
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Normalizer
from pyspark.sql import SparkSession
from pyspark.sql import Window


In [2]:
import sys
sys.path.append(r"/home/aleksey/Документы/RecSystem")
from config import settings

spark = (SparkSession.builder
    .appName("DB")
    .config("spark.driver.extraClassPath","/usr/lib/spark-3.5.1/jars/postgresql-42.7.0.jar")
    .getOrCreate())

movie = (spark.read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", f"jdbc:postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
    .option("dbtable", f'"{settings.DB_SCHEMA}"."Movie"')
    .option("user", settings.DB_USER)
    .option("password", settings.DB_PASS)
    .load())

genre_movie = (spark.read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", f"jdbc:postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
    .option("dbtable", f'"{settings.DB_SCHEMA}"."GenreMovie"')
    .option("user", settings.DB_USER)
    .option("password", settings.DB_PASS)
    .load())

genre = spark.read.format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", f"jdbc:postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}") \
    .option("dbtable", f'"{settings.DB_SCHEMA}"."Genre"') \
    .option("user", settings.DB_USER) \
    .option("password", settings.DB_PASS) \
    .load()

keyword_movie = (spark.read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", f"jdbc:postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
    .option("dbtable", f'"{settings.DB_SCHEMA}"."KeywordMovie"')
    .option("user", settings.DB_USER)
    .option("password", settings.DB_PASS)
    .load())

keyword = (spark.read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", f"jdbc:postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
    .option("dbtable", f'"{settings.DB_SCHEMA}"."Keyword"')
    .option("user", settings.DB_USER)
    .option("password", settings.DB_PASS)
    .load())

person = (spark.read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", f"jdbc:postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
    .option("dbtable", f'"{settings.DB_SCHEMA}"."Person"')
    .option("user", settings.DB_USER)
    .option("password", settings.DB_PASS)
    .load())

crew = (spark.read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", f"jdbc:postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
    .option("dbtable", f'"{settings.DB_SCHEMA}"."Crew"')
    .option("user", settings.DB_USER)
    .option("password", settings.DB_PASS)
    .load())

cast = (spark.read
    .format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", f"jdbc:postgresql://{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}")
    .option("dbtable", f'"{settings.DB_SCHEMA}"."Cast"')
    .option("user", settings.DB_USER)
    .option("password", settings.DB_PASS)
    .load())


/usr/lib/spark-3.5.1/conf/spark-env.sh: строка 1: !/usr/bin/bash: Нет такого файла или каталога
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/15 21:00:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


24/04/15 21:00:53 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [9]:
from pyspark.sql.types import FloatType, DoubleType
@F.udf(returnType=DoubleType())
def cos_sim(u, v):
  return float(u.dot(v) / (u.norm(2) * v.norm(2)))


In [123]:
def calculate_cosine_sim(dataframe, description_var):
    tokenizer = RegexTokenizer(inputCol=description_var, outputCol="words", pattern="\\W")
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features")
    idf = IDF(inputCol="raw_features", outputCol="features")
    normalizer = Normalizer(inputCol="features", outputCol="normalized_features")
    
    pipeline = Pipeline(stages=[tokenizer, remover, hashing_tf,idf,normalizer])
    model = pipeline.fit(dataframe)
    transformed_df = model.transform(dataframe)
    transformed_df = transformed_df.drop('filtered_words','raw_features','words','features')
    postfix = 'right'
    transformed_df = transformed_df.crossJoin(transformed_df
                                              .select([F.col(c).alias(f'{c}_{postfix}') for c in transformed_df.columns]))
    transformed_df = transformed_df.withColumn('cos_sim',cos_sim(F.col('normalized_features'),F.col(f'normalized_features_{postfix}')))
    return transformed_df

def content_based_recommender(dataframe, movie_id, similarity_matrix,top_n=10):
    similar_movies = (similarity_matrix.filter(F.col('id') == movie_id)
        .sort(F.col('cos_sim').desc())
        .limit(top_n + 1))
    similar_movie_indices = similar_movies.select('id_right').collect()[1:]
    similar_movie_indices = [row['id_right'] for row in similar_movie_indices]
    
    return (dataframe.filter(F.col("id").isin(similar_movie_indices)))


In [16]:
movie = movie.withColumn("combined_column", F.concat(movie["title"], F.lit(" "), movie["tagline"], F.lit(" "), movie["overview"]))

cosine_sim = calculate_cosine_sim(movie.select('id','combined_column'), "combined_column")

                                                                                

In [14]:
content_based_recommender(movie, 4232, cosine_sim).show()

24/04/15 20:43:37 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
[Stage 14:>                 (0 + 1) / 1][Stage 17:>                 (0 + 1) / 1]

+-------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+------------------+--------------------+
|     id|               title|             tagline|            overview|         poster_path|original_language|release_date|runtime|        popularity|vote_count|      vote_average|     combined_column|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+------------------+--------------------+
|   4233|            Scream 2|Someone has taken...|Away at college, ...|/isdgZMoH1QMpfvzM...|               en|  1997-12-12|    120| 71.05599975585938|      3926|               6.5|Scream 2 Someone ...|
|   4234|            Scream 3|The most terrifyi...|While Sidney Pres...|/qpH8ToZVlFD1bakL...|               en|  2000-02-04|    117| 46.49399948120117|      3379|               6.0|Scream 

                                                                                

[Stage 14:>                                                         (0 + 1) / 1]

In [120]:
def combination_tto_genres_keywords_actors_directors(movie):
    joined_df = movie.join(genre_movie, movie["id"] == genre_movie["movie_id"], "left")

    movie_genre = joined_df.join(genre, genre_movie["genre_id"] == genre["id"], "left")

    movie_genre = (movie_genre
                .select(movie.id, movie.title,movie.tagline,movie.overview, genre.name)
                .groupBy(movie.id, movie.title,movie.tagline,movie.overview).agg(F.collect_list(genre.name).alias("genres")))
    
    movie_keyword = movie_genre.join(keyword_movie, movie["id"] == keyword_movie["movie_id"], "left")
    movie_keyword = movie_keyword.join(keyword,movie_keyword["keyword_id"] == keyword["id"],"left")
    movie_keyword = (movie_keyword
                    .select(movie_genre.id,movie_genre.title,movie_genre.tagline,movie_genre.overview,movie_genre.genres,keyword.name.alias("keyword_name"))
                    .groupBy(movie_genre.id,movie_genre.title,movie_genre.tagline,movie_genre.overview,movie_genre.genres).agg(F.collect_list('keyword_name').alias("keywords"))
                    )
    
    joined_data = crew.join(person, crew["person_id"] == person["id"], "left")
    directors = joined_data.filter(F.col("job") == "Director").select(crew.movie_id,person.name,).groupBy("movie_id").agg(
        F.collect_list("name").alias("directors")
    )

    joined_data = cast.join(person, cast["person_id"] == person["id"], "left")
    window_spec = Window.partitionBy("movie_id").orderBy(F.desc("popularity"))
    top_actors = (joined_data.withColumn("rank", F.row_number().over(window_spec))
                .filter(F.col("rank") <= 3)
                .select(cast.movie_id,person.name.alias("actor_name")))
    top_actors = (top_actors.
                groupBy("movie_id").agg(
                F.collect_list("actor_name").alias("actors")
    ))

    movie_actors = movie_keyword.join(top_actors, top_actors["movie_id"] == movie_keyword["id"], "left")
    movie_actors_directors = movie_actors.join(directors,directors['movie_id'] == movie_actors['id'],'left')
    
    movie_actors_directors = movie_actors_directors.select('id','title','tagline','overview','genres','keywords','actors','directors')
    movie_actors_directors = (movie_actors_directors
                                    .withColumn("combined_column",
                                      F.concat_ws(" ", *[F.col(c) for c in movie_actors_directors.columns if c != 'id'])))
    return movie_actors_directors.select('id','combined_column')


In [76]:
movie_genres_keywords = combination_tto_genres_keywords_actors_directors(movie)
cosine_sim = calculate_cosine_sim(movie_genres_keywords.select('id','combined_column'), "combined_column")

                                                                                

In [77]:
content_based_recommender(movie, 4232, cosine_sim).show()

24/04/15 17:47:36 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB

+------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+-----------------+--------------------+
|    id|               title|             tagline|            overview|         poster_path|original_language|release_date|runtime|        popularity|vote_count|     vote_average|     combined_column|
+------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+-----------------+--------------------+
|   948|           Halloween|The Night He Came...|Fifteen years aft...|/wijlZ3HaYMvlDTPq...|               en|  1978-10-24|     91| 78.03600311279297|      5286|7.557000160217285|Halloween The Nig...|
|  4233|            Scream 2|Someone has taken...|Away at college, ...|/isdgZMoH1QMpfvzM...|               en|  1997-12-12|    120| 71.05599975585938|      3926|              6.5|Scream 2 Someone 

                                                                                

In [121]:
def combination_genres_keywords(movie):
  joined_df = movie.join(genre_movie, movie["id"] == genre_movie["movie_id"], "left")
  
  movie_genre = joined_df.join(genre, genre_movie["genre_id"] == genre["id"], "left")

  movie_genre = (movie_genre
              .select(movie.id, genre.name)
              .groupBy(movie.id).agg(F.collect_list(genre.name).alias("genres")))
  
  movie_keyword = movie_genre.join(keyword_movie, movie["id"] == keyword_movie["movie_id"], "left")
  movie_keyword = movie_keyword.join(keyword,movie_keyword["keyword_id"] == keyword["id"],"left")
  movie_keyword = (movie_keyword
                  .select(movie_genre.id,movie_genre.genres,keyword.name.alias("keyword_name"))
                  .groupBy(movie_genre.id,movie_genre.genres).agg(F.collect_list('keyword_name').alias("keywords"))
                  )
  
  movie_genres_keywords = (movie_keyword
                                  .withColumn("combined_column",
                                    F.concat_ws(" ", *[F.col(c) for c in movie_keyword.columns if c != 'id'])))
  return movie_genres_keywords.select('id','combined_column')

In [124]:
movie_genres_keywords = combination_genres_keywords(movie)
cosine_sim = calculate_cosine_sim(movie_genres_keywords.select('id','combined_column'), "combined_column")

                                                                                

In [125]:
content_based_recommender(movie, 218, cosine_sim).show()

24/04/15 23:38:19 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/04/15 23:38:20 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB


+------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+-----------------+--------------------+
|    id|               title|             tagline|            overview|         poster_path|original_language|release_date|runtime|        popularity|vote_count|     vote_average|     combined_column|
+------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+-----------------+--------------------+
| 50472|Anplagghed al cinema|                    |A queue at the AT...|/eUese3BGFCRFPV8r...|               it|  2006-11-26|     95| 6.021999835968018|       327|              7.0|Anplagghed al cin...|
| 67677|Golkonda High School|                    |Set on the backdr...|/eRrjxoAD6IGrcmxt...|               te|  2011-01-14|    130|  2.74399995803833|         3|              7.0|Golkonda High Sch

                                                                                

In [119]:
def combination_tto_keywords_actors_directors(movie):
    movie_keyword = movie.join(keyword_movie, movie["id"] == keyword_movie["movie_id"], "left")
    movie_keyword = movie_keyword.join(keyword,movie_keyword["keyword_id"] == keyword["id"],"left")
    movie_keyword = (movie_keyword
                    .select(movie.id,movie.title,movie.tagline,movie.overview,keyword.name.alias("keyword_name"))
                    .groupBy(movie.id,movie.title,movie.tagline,movie.overview).agg(F.collect_list('keyword_name').alias("keywords"))
                    )
    
    joined_data = crew.join(person, crew["person_id"] == person["id"], "left")
    directors = joined_data.filter(F.col("job") == "Director").select(crew.movie_id,person.name,).groupBy("movie_id").agg(
        F.collect_list("name").alias("directors")
    )

    joined_data = cast.join(person, cast["person_id"] == person["id"], "left")
    window_spec = Window.partitionBy("movie_id").orderBy(F.desc("popularity"))
    top_actors = (joined_data.withColumn("rank", F.row_number().over(window_spec))
                .filter(F.col("rank") <= 3)
                .select(cast.movie_id,person.name.alias("actor_name")))
    top_actors = (top_actors.
                groupBy("movie_id").agg(
                F.collect_list("actor_name").alias("actors")
    ))

    movie_actors = movie_keyword.join(top_actors, top_actors["movie_id"] == movie_keyword["id"], "left")
    movie_actors_directors_keywords = movie_actors.join(directors,directors['movie_id'] == movie_actors['id'],'left')

    movie_actors_directors_keywords = movie_actors_directors_keywords.select('id','title','tagline','overview','keywords','actors','directors')

    movie_actors_directors_keywords = (movie_actors_directors_keywords
                                    .withColumn("combined_column",
                                      F.concat_ws(" ", *[F.col(c) for c in movie_keyword.columns if c != 'id'])))
    
    return movie_actors_directors_keywords.select('id','combined_column')

In [26]:
movie_actors_directors = combination_tto_keywords_actors_directors(movie)
# cosine_sim = calculate_cosine_sim(movie_actors_directors_keywords.select('id','combined_column'), "combined_column")

In [27]:
movie_actors_directors.show(5)

                                                                                

+---+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| id|         title|             tagline|            overview|            keywords|              actors|           directors|     combined_column|
+---+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 27|       9 Songs|2 lovers, one sum...|Matt, a young gla...|[blowjob, small b...|[Margo Stilley, K...|[Michael Winterbo...|9 Songs 2 lovers,...|
| 28|Apocalypse Now|  This is the end...|At the height of ...|[anti war, milita...|[Laurence Fishbur...|[Francis Ford Cop...|Apocalypse Now Th...|
| 65|        8 Mile|Every Moment Is A...|For Jimmy Smith, ...|[battle rap, 1990...|[Anthony Mackie, ...|     [Curtis Hanson]|8 Mile Every Mome...|
| 76|Before Sunrise|Can the greatest ...|A young man and w...|[romantic, vienna...|[Ethan Hawke, Ada...| [Richard Link

In [74]:
content_based_recommender(movie, 4232, cosine_sim).show()

24/04/15 17:46:23 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/04/15 17:46:23 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB

+------+---------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+-----------------+--------------------+
|    id|          title|             tagline|            overview|         poster_path|original_language|release_date|runtime|        popularity|vote_count|     vote_average|     combined_column|
+------+---------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+-----------------+--------------------+
|   948|      Halloween|The Night He Came...|Fifteen years aft...|/wijlZ3HaYMvlDTPq...|               en|  1978-10-24|     91| 78.03600311279297|      5286|7.557000160217285|Halloween The Nig...|
|  4233|       Scream 2|Someone has taken...|Away at college, ...|/isdgZMoH1QMpfvzM...|               en|  1997-12-12|    120| 71.05599975585938|      3926|              6.5|Scream 2 Someone ...|
|  4234|       Screa

                                                                                

In [73]:
movie.filter(F.col('title') == "Scream").show()

+------+------+--------------------+--------------------+--------------------+-----------------+------------+-------+-----------------+----------+-----------------+--------------------+
|    id| title|             tagline|            overview|         poster_path|original_language|release_date|runtime|       popularity|vote_count|     vote_average|     combined_column|
+------+------+--------------------+--------------------+--------------------+-----------------+------------+-------+-----------------+----------+-----------------+--------------------+
|  4232|Scream|Someone's Taken T...|After a series of...|/3O3klyyYpAZBBE4n...|               en|  1996-12-20|    112| 66.1709976196289|      6578|7.421000003814697|Scream Someone's ...|
|646385|Scream|It's always someo...|Twenty-five years...|/1m3W6cpgwuIyjtg5...|               en|  2022-01-12|    114|71.06199645996094|      3047|6.699999809265137|Scream It's alway...|
+------+------+--------------------+--------------------+-------------

In [129]:
class PopularityRecommender:
    def __init__(self, movies):
        self.movies = movies

    def weighted_rating(self, v, R, m, C):
        return (v / (v + m) * R) + (m / (m + v) * C)

    def recommend(self, top_n=10):
        C = self.movies.agg(F.avg('vote_average')).collect()[0][0]
        m = self.movies.approxQuantile('vote_count', [0.9], 0.01)[0]
        one_year_ago = F.current_date() - F.expr("INTERVAL 1 YEAR")
        filtered_movies = self.movies.filter((F.col('vote_count') >= m) & (F.col('release_date') >= one_year_ago))
        score_movies = filtered_movies.withColumn('score_rating',
                                                   self.weighted_rating(F.col('vote_count'), F.col('vote_average'), m, C))
        score_movies = score_movies.orderBy(F.col('score_rating').desc()).limit(top_n)
        return score_movies

In [130]:
recommender = PopularityRecommender(movie)

recommended_movies = recommender.recommend()
recommended_movies.show()


+-------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+------------------+--------------------+------------------+
|     id|               title|             tagline|            overview|         poster_path|original_language|release_date|runtime|        popularity|vote_count|      vote_average|     combined_column|      score_rating|
+-------+--------------------+--------------------+--------------------+--------------------+-----------------+------------+-------+------------------+----------+------------------+--------------------+------------------+
| 569094|Spider-Man: Acros...|It's how you wear...|After reuniting w...|/8Vt6mWEReuy4Of61...|               en|  2023-05-31|    140| 227.3280029296875|      6083| 8.399999618530273|Spider-Man: Acros...| 8.230673860792432|
| 872585|         Oppenheimer|The world forever...|The story of J. R...|/8Gxv8gSFCU0XGDyk...|               en| 

In [114]:
from pyspark.sql.types import StringType
def combine_actor_character(actor, character):
    return f"{actor} playing {character}"

def combination_tto_characters_actors_directors(movie):
    joined_data = crew.join(person, crew["person_id"] == person["id"], "left")
    directors = joined_data.filter(F.col("job") == "Director")
    directors = directors.withColumn("directors_text", F.concat_ws(" ", F.lit("director filma"), person.name))
    directors = directors.select(crew.movie_id,'directors_text').groupBy("movie_id").agg(
        F.collect_list("directors_text").alias("directors")
    )

    joined_data = cast.join(person, cast["person_id"] == person["id"], "left")
    window_spec = Window.partitionBy("movie_id").orderBy(F.desc("popularity"))
    top_actors = (joined_data.withColumn("rank", F.row_number().over(window_spec))
                .filter(F.col("rank") <= 3)
                .select(cast.movie_id,cast.character,person.name.alias("actor_name")))
    combine_actor_character_udf = F.udf(combine_actor_character, StringType())
    top_actors = top_actors.withColumn("actor_character_pairs", combine_actor_character_udf("actor_name", "character"))
    top_actors = (top_actors.
                groupBy("movie_id").agg(
                F.collect_list("actor_character_pairs").alias("actors")
    ))

    movie_actors = movie.join(top_actors, top_actors["movie_id"] == movie["id"], "left")
    movie_actors_directors = movie_actors.join(directors,directors['movie_id'] == movie_actors['id'],'left')

    movie_actors_directors = movie_actors_directors.select('id','title','tagline','overview','actors','directors')

    movie_actors_directors = (movie_actors_directors
                                    .withColumn("combined_column",
                                      F.concat_ws(" ", *[F.col(c) for c in movie_actors_directors.columns if c != 'id'])))
    
    return movie_actors_directors.select('id','combined_column')

In [115]:
movie_actors_characters_directors = combination_tto_characters_actors_directors(movie)

In [117]:
class SimilaritySearch:
    def __init__(self, description_var,spark):
        self.spark = spark
        self.description_var = description_var
        self.tokenizer = RegexTokenizer(inputCol=description_var, outputCol="words", pattern="\\W")
        self.remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
        self.hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features")
        self.idf = IDF(inputCol="raw_features", outputCol="features")
        self.normalizer = Normalizer(inputCol="features", outputCol="normalized_features")
        self.pipeline = Pipeline(stages=[self.tokenizer, self.remover, self.hashing_tf, self.idf, self.normalizer])
        self.postfix = 'right'
    
    def fit_transform(self, dataframe):
        self.model = self.pipeline.fit(dataframe)
        self.dataframe = self.transform(dataframe)
        
    
    def transform(self, dataframe):
        return self.model.transform(dataframe).drop('filtered_words', 'raw_features', 'words', 'features')
    
    def search(self,description_user:str,top_n = 5):
        row = [{self.description_var: description_user}]
        single_row_df = self.spark.createDataFrame(row)
        transformed_single_row_df = self.transform(single_row_df)
        
        transformed_df = transformed_single_row_df.crossJoin(self.dataframe
                                              .select([F.col(c).alias(f'{c}_{self.postfix}') for c in self.dataframe.columns]))
        
        transformed_df = transformed_df.withColumn('cos_sim', cos_sim(F.col('normalized_features'), F.col(f'normalized_features_{self.postfix}')))
        
        return transformed_df.orderBy(F.desc('cos_sim')).limit(top_n).select(F.col('id_right').alias('movie_id'))

In [118]:
similarity_search = SimilaritySearch("combined_column",spark)
similarity_search.fit_transform(movie_actors_characters_directors)
desc_terminator = "Tobey Maguire plays peter parker. Kirsten Duns plays merry jane. Peter Parker is bitten by a radioactive spider."

similar_movies = similarity_search.search(description_user=desc_terminator)
similar_movies.show()

24/04/15 23:34:20 WARN DAGScheduler: Broadcasting large task binary with size 4.2 MiB

+--------+
|movie_id|
+--------+
|  324857|
|  569094|
|     557|
|  225925|
|  225914|
+--------+



                                                                                