In [1]:
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings("ignore")
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.sparse import coo_matrix, hstack
from sklearn.metrics.pairwise import linear_kernel
import numpy as np



In [2]:
data = pd.read_csv("./clean_df/clean_df.csv")

In [3]:
data = data.drop(['Unnamed: 0'],axis = 1)

In [4]:
data = data.dropna(subset=['overview','director','runtime','year']).reset_index(drop=True)

In [5]:
data = data.fillna('')

In [6]:
text_data = ['title','director','actor','overview','genres_list','key','country']
data[text_data] = data[text_data].astype(str)

In [7]:
data["key"] = data["key"].str.encode('ascii', 'ignore').str.decode('ascii')

In [8]:
def to_dummy(col,num = None):
    li = set()
    for i in range(len(data[col])):
        if num is None:
            try:
                num = len(data[col][i].split(','))
            except:
                print(data[col][i])
        for act in data[col][i].split(',')[:num]:
            li.add(act)
    li = list(li)
    for element in li:
        data[element] = data[col].astype(str).str.contains(element, case=False).astype(int)

In [9]:
dum = ['country','director','actor','genres_list','key']
for d in dum:
    if d == 'actor':
        to_dummy(d,num = 4)
    else:
        to_dummy(d)
    print(d)

country
director
actor
genres_list
key


### sklearn

In [None]:
X = data.drop(['id','title','director','actor', 'overview','genres_list','key','country'], axis=1)

In [11]:
scaler = StandardScaler()
X = scaler.fit_transform(X)

In [12]:
X.shape

(4770, 19229)

In [13]:
tfidf = TfidfVectorizer(stop_words = 'english')  # initialising the TF-IDF Vector object
tfidf_matrix = tfidf.fit_transform(data['overview'])  # Constructing the TF-IDF Matrix (no. of movies x every word in vocabulary)
tfidf_matrix.shape

(4770, 20876)

In [14]:
X = hstack([X,tfidf_matrix]).toarray()

In [15]:
cosine_sim = linear_kernel(X, X)  # Constructing the Cosine Similarity Matrix (no. of movies x no. of movies)
cosine_sim.shape

(4770, 4770)

In [16]:
cosine_sim

array([[ 1.95256851e+04, -1.22054500e+01, -4.22517965e+00, ...,
        -4.97278741e+01, -3.84939272e+01, -3.74299288e+01],
       [-1.22054500e+01,  1.73025379e+04,  3.82652490e+02, ...,
        -2.18727192e+01, -2.94114679e+01, -3.92261041e+01],
       [-4.22517965e+00,  3.82652490e+02,  1.45797755e+04, ...,
        -3.10945839e+01, -1.83495339e+01, -2.80929736e+01],
       ...,
       [-4.97278741e+01, -2.18727192e+01, -3.10945839e+01, ...,
         2.06418193e+04, -9.02643636e-01, -9.89766374e+00],
       [-3.84939272e+01, -2.94114679e+01, -1.83495339e+01, ...,
        -9.02643636e-01,  8.04462451e+03,  5.49807696e+00],
       [-3.74299288e+01, -3.92261041e+01, -2.80929736e+01, ...,
        -9.89766374e+00,  5.49807696e+00,  1.23084216e+04]])

In [17]:
with open('cosine_sim.npy', 'wb') as f:
    np.save(f, cosine_sim)

In [18]:
with open('cosine_sim.npy', 'rb') as f:
    cosine_sim = np.load(f)

In [19]:
indices = pd.Series(data.index, index = data['title']).drop_duplicates()

In [20]:
# Function that inputs movie titles and outputs top 10 movies similar to it

def get_recommendations(title, cosine_sim = cosine_sim):
  idx = indices[title]
  
  sim_scores = list(enumerate(cosine_sim[idx]))  # Get the similarity scores of all movies wrt input movie
  sim_scores = sorted(sim_scores, key = lambda x : x[1], reverse = True)
  sim_scores = sim_scores[1:11]
  
  movie_indices = [i[0] for i in sim_scores]
  
  return data['title'].iloc[movie_indices]

In [21]:
get_recommendations('The Dark Knight Rises')

65                     The Dark Knight
119                      Batman Begins
2207                         12 Rounds
303                           Catwoman
95                        Interstellar
708     Maze Runner: The Scorch Trials
933                   Shanghai Knights
96                           Inception
744                     The Lego Movie
2721                 Seven Psychopaths
Name: title, dtype: object

### Use pyspark

In [22]:
spark = SparkSession.builder.getOrCreate()

22/12/16 06:35:05 WARN Utils: Your hostname, codespaces-80813b resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
22/12/16 06:35:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/16 06:35:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [23]:
data_spark=spark.createDataFrame(data)

In [24]:
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import split
from pyspark.sql.functions import lower, col
data_spark = data_spark.withColumn("overview_splitted", split(lower(col("overview")), " "))

In [25]:
word2Vec = Word2Vec(vectorSize=100, minCount=0, maxIter=100, inputCol="overview_splitted", outputCol="features")
model = word2Vec.fit(data_spark)

22/12/16 06:50:46 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/12/16 06:50:46 WARN TaskSetManager: Stage 0 contains a task of very large size (11315 KiB). The maximum recommended task size is 1000 KiB.


[Stage 0:>                                                        (0 + 16) / 16]

22/12/16 06:50:48 ERROR Executor: Exception in task 9.0 in stage 0.0 (TID 9)
java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.reflect.Array.newArray(Native Method)
	at java.base/java.lang.reflect.Array.newInstance(Array.java:78)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2121)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at

Py4JJavaError: An error occurred while calling o48.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 0.0 failed 1 times, most recent failure: Lost task 9.0 in stage 0.0 (TID 9) (cf9082fa-5906-43c7-86cb-03df5beff200.internal.cloudapp.net executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.reflect.Array.newArray(Native Method)
	at java.base/java.lang.reflect.Array.newInstance(Array.java:78)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2121)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:647)
	at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$1(ParallelCollectionRDD.scala:73)
	at org.apache.spark.rdd.ParallelCollectionPartition$$Lambda$2351/0x00000008018e8458.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1470)
	at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:69)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.mllib.feature.Word2Vec.learnVocab(Word2Vec.scala:191)
	at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:312)
	at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:182)
	at org.apache.spark.ml.feature.Word2Vec.fit(Word2Vec.scala:121)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.lang.reflect.Array.newArray(Native Method)
	at java.base/java.lang.reflect.Array.newInstance(Array.java:78)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2121)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
	at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2157)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1721)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:647)
	at org.apache.spark.rdd.ParallelCollectionPartition.$anonfun$readObject$1(ParallelCollectionRDD.scala:73)
	at org.apache.spark.rdd.ParallelCollectionPartition$$Lambda$2351/0x00000008018e8458.apply$mcV$sp(Unknown Source)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1470)
	at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:69)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
	at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)


In [None]:
result = model.transform(data_spark)

In [None]:
result

In [None]:
from pyspark.ml.feature import VectorAssembler
assemble=VectorAssembler(inputCols=[
 'popularity',
 'vote_average',
 'year'], outputCol='feature')
assembled_data=assemble.transform(data_spark)