# ***Content-Based Recommender***



# ***Pyspark***

In [None]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 67kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 34.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=4a3e2658254478b0dd9fa6d62336f9d33ae23c17bfcef66d6a76b6185b798fad
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Movies Recommendation").config("spark.driver.memory","15g").config("spark.executor.memory", "15g").getOrCreate()

In [None]:
credits = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/credits.csv',header=True, inferSchema=True)
keywords =spark.read.csv('/content/drive/MyDrive/Colab Notebooks/keywords.csv',header=True,inferSchema=True,escape='\"')
metadata = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/movies_metadata.csv',header=True,inferSchema=True).limit(30000)

In [None]:
df = metadata.join(keywords,on=['id'])
df = df.join(credits,on=['id'])

In [None]:
print(metadata.count())
print(keywords.count())
print(credits.count())

30000
46419
45476


In [None]:
df.count()

18704

In [None]:
# df.toPandas()

In [None]:
from pyspark.sql.types import StringType,ArrayType
from pyspark.sql.functions import col, udf

In [None]:
# keywords.where(col("id")=="4975").collect()[0]["keywords"]

In [None]:
# df.where(col("id")=="1265").collect()[0]["keywords"]

In [None]:
def clean_data(x):
    if isinstance(x, list):
        return [str.lower(i.replace(" ", "")) for i in x]
    else:
        #Check if director exists. If not, return empty string
        if isinstance(x, str):
            return str.lower(x.replace(" ", ""))
        else:
            return ''

In [None]:
import numpy as np
def get_director(x):
    for i in eval(x):
        if i['job'] == 'Director':
            return clean_data(i['name'])
    return clean_data(np.nan)

In [None]:
def get_list(x):
    if isinstance(x, list):
        names = [i['name'] for i in x]
        #Check if more than 3 elements exist. If yes, return only first three. If no, return entire list.
        if len(names) > 3:
            names = names[:3]
        return clean_data(names)

    #Return empty list in case of missing/malformed data
    return clean_data([])

In [None]:
from ast import literal_eval
def literal_eval_convert(x):
    return get_list(eval(x))


In [None]:
convert_UDF = udf(lambda x: literal_eval_convert(x) if x is not None else None)
get_director_UDF = udf(lambda x: get_director(x) if x is not None else None)

In [None]:
from pyspark.sql.functions import col, udf
features = ['cast', 'crew', 'keywords', 'genres']
df_1=df.withColumn("cast",convert_UDF("cast"))

In [None]:
# df_1.collect()

In [None]:
df_2=df_1.withColumn("keywords",convert_UDF("keywords"))

In [None]:
# df_2.toPandas()

In [None]:
df_3=df_2.withColumn("genres",convert_UDF("genres"))

In [None]:
# df_3.collect()

In [None]:
df_4=df_3.withColumn("director",get_director_UDF("crew"))

In [None]:
# df_4.collect()

In [None]:
df_5 = df_4.select('cast', 'director', 'keywords', 'genres','title','id')

In [None]:
# df_5.collect()

In [None]:
def create_soup( cast,director,keywords,genres):
    # print(type( x['keywords']))
    return ' '.join(keywords.strip('][').split(', ')) + ' ' + ' '.join(cast.strip('][').split(', ')) + ' ' + director + ' ' + ' '.join(genres.strip('][').split(', '))

In [None]:
#  a=df_5.collect()[0]

In [None]:
# create_soup({"cast":a["cast"],"director":a["director"],"keywords":a["keywords"],"genres":a["genres"]})

In [None]:
create_soup_UDF = udf(lambda cast,director,keywords,genres: create_soup(cast,director,keywords,genres))

In [None]:
df_6 = df_5.withColumn("features",create_soup_UDF(col("cast"),col("director"),col("keywords"),col("genres")))

In [None]:
# df_6.collect()

In [None]:
from pyspark.sql.functions import monotonically_increasing_id,row_number
from pyspark.sql import Window
# This will return a new DF with all the columns + id
# df_7 = df_6.withColumn("id", monotonically_increasing_id())
window = Window.orderBy(col('id'))
df_7 = df_6.withColumn('index', row_number().over(window))

In [None]:
data = df_7.select("features").orderBy("index").rdd.flatMap(lambda x: x).collect()

In [None]:
# df_7.orderBy("index").show()

+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------------------+-----+
|                cast|            director|            keywords|              genres|               title|    id|            features|index|
+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------------------+-----+
|                  []|       sergiocabrera|[roommate, pastor...|     [comedy, drama]|La estrategia del...| 10000|roommate pastor s...|    1|
|[kaorufutaba, mic...|      hiroshishimizu|       [countryside]|             [drama]|       Mr. Thank You|100033|countryside kaoru...|    2|
|[jamesmitchum, ro...|         eddymatalon|[terror, blackout...|  [action, thriller]|            Blackout|100063|terror blackout c...|    3|
|[michaelj.pagan, ...|         gregorydark|[hotel, eyeball, ...|  [horror, thriller]|         See No Evil| 10007|hotel eyeball mur...|    4|
|[josefinacem

In [None]:
print(data)

['roommate pastor squatter  sergiocabrera comedy drama', 'countryside kaorufutaba michikokuwano takashiishiyama hiroshishimizu drama', 'terror blackout criminal jamesmitchum robertcarradine belindamontgomery eddymatalon action thriller', 'hotel eyeball murder michaelj.pagan samanthanoble glennthomasjacobs gregorydark horror thriller', 'womandirector josefinacembrero leonormediavilla victoriatoro martaarribas ', 'blaxploitation jimbrown martinlandau brendasykes roberthartford-davis action thriller crime', 'femalenudity cyborg nazis dominiqueswain jakebusey trevorkuhn josephj.lawson adventure horror action', 'brotherbrotherrelationship grizzlybear inuit joaquinphoenix jeremysuarez jasonraize aaronblaise adventure animation family', 'femalenudity cave bigfoot mattmccoy haleyjoel christientinsley ryanschifrin horror thriller', ' felemartínez maríaesteve adriàcollado álvarofernándezarmero horror thriller foreign', 'gay dancing fire robertenglund markpatton kimmyers jacksholder horror', 'str

In [None]:
# df_6.toPandas()

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

count = CountVectorizer(stop_words='english')
count_matrix = count.fit_transform(data)

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

cosine_sim = cosine_similarity(count_matrix,count_matrix)

In [None]:
# print(cosine_sim.shape)

In [None]:
# ('dict', list, 'series', 'split', 'records', 'index')

In [None]:
# dict_data= df.select('id','title').toPandas().set_index('title').T.to_dict("list")

  """Entry point for launching an IPython kernel.


In [None]:
# df_final.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------------------+-----+
|                cast|            director|            keywords|              genres|               title|    id|            features|index|
+--------------------+--------------------+--------------------+--------------------+--------------------+------+--------------------+-----+
|                  []|       sergiocabrera|[roommate, pastor...|     [comedy, drama]|La estrategia del...| 10000|roommate pastor s...|    1|
|[kaorufutaba, mic...|      hiroshishimizu|       [countryside]|             [drama]|       Mr. Thank You|100033|countryside kaoru...|    2|
|[jamesmitchum, ro...|         eddymatalon|[terror, blackout...|  [action, thriller]|            Blackout|100063|terror blackout c...|    3|
|[michaelj.pagan, ...|         gregorydark|[hotel, eyeball, ...|  [horror, thriller]|         See No Evil| 10007|hotel eyeball mur...|    4|
|[josefinacem

In [None]:
# dict_data= df_7.select('id','title')

In [None]:
# dict_data.show()

+---+--------------------+
| id|               title|
+---+--------------------+
|  0|The Age of Innocence|
|  1|Common Threads: S...|
|  2|           C'mon Man|
|  3|    Brown of Harvard|
|  4|     Golden Earrings|
|  5|Kids of the Round...|
|  6|            Released|
|  7|      Donovan's Echo|
|  8|             Impulse|
|  9|         Calm at Sea|
| 10|            14782676|
| 11|                  41|
| 12|           Bongwater|
| 13|           Chop Shop|
| 14|             Big Sur|
| 15|Die Hard: With a ...|
| 16|   May in the Summer|
| 17|   Love Comes Softly|
| 18|One Direction: Th...|
| 19|        Bag of Bones|
+---+--------------------+
only showing top 20 rows



In [None]:
from pyspark.sql.types import IntegerType
# Function that takes in movie title as input and outputs most similar movies
def get_recommendations(title, cosine_sim=cosine_sim):
    index= df_7.where(col("title")==title).collect()[0]["index"]
    sim_scores = list(enumerate(cosine_sim[int(index)]))
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    sim_scores = sim_scores[1:11]
    movie_indices = [i[0] for i in sim_scores]
    recommendations = spark.createDataFrame(movie_indices, IntegerType())
    # recommendations.show()
    df_7.join(recommendations,recommendations.value==df_7.index).select(col('id'),col('title')).show()


In [None]:
get_recommendations('Jumanji')

+-----+
|value|
+-----+
| 9692|
|15070|
|18377|
| 5157|
| 6133|
| 8956|
| 2731|
| 3166|
| 5056|
|10632|
+-----+

+------+--------------------+
|    id|               title|
+------+--------------------+
| 18468|         Momma's Man|
| 35119|   A Cry in the Dark|
| 60573|     The Burning Bed|
| 14358|                null|
| 18242|The Secret Advent...|
|308027|People, Places, T...|
| 96239|Remington and the...|
|284581|Back Issues: The ...|
|137217|      Donovan's Echo|
|  2072|           Cyberjack|
+------+--------------------+



In [None]:
df_final.count()

18704