In [1]:
import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.sql.functions as func
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import mean, min, max


In [2]:
conf = SparkConf().set("spark.ui.port", "4050")
spark = SparkSession.builder.getOrCreate()



In [3]:
spark



In [4]:
df_movies = spark.read.load('/home/peder/dev/big-data/movie-recommendation-system/data/movies_metadata.csv',
                           format="csv",
                           sep=",",
                           inferSchema="true",
                           header="true")

In [5]:
df_movies.printSchema()


root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nu

In [6]:
print("The shape of the dataset is {:d} rows by {:d} columns".format(df_movies.count(), len(df_movies.columns)))

The shape of the dataset is 45572 rows by 24 columns


In [7]:
print(df_movies.filter(col("vote_average").isNull()).count())
print(df_movies.filter(col("vote_count").isNull()).count())
print(df_movies.filter(col("overview").isNull()).count())

498
389
985


In [8]:
df_movies = df_movies.na.drop(subset=["vote_average"])
df_movies = df_movies.na.drop(subset=["vote_count"])
df_movies = df_movies.na.drop(subset=["overview"])


In [9]:
df_movies = df_movies.withColumn("vote_average", df_movies["vote_average"].cast("double"))
df_movies = df_movies.withColumn("vote_count", df_movies["vote_count"].cast("int"))

In [10]:
df_movies = df_movies.filter((df_movies.vote_average >=0) & (df_movies.vote_average<=10))

In [11]:
df_movies.select(['vote_average']).describe().show()


+-------+------------------+
|summary|      vote_average|
+-------+------------------+
|  count|             40786|
|   mean| 5.612511975530867|
| stddev|1.9231620784205472|
|    min|               0.0|
|    max|              10.0|
+-------+------------------+



In [12]:
df_movies.select(['vote_count']).describe().show()


+-------+-----------------+
|summary|       vote_count|
+-------+-----------------+
|  count|            40760|
|   mean|112.1123405299313|
| stddev|490.1629388596058|
|    min|                0|
|    max|            12269|
+-------+-----------------+



In [13]:
df_movies.select(['overview']).show(truncate=False, n=1)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|overview                                                                                                                                                                                                                                                                                                       |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Led by Woody, Andy's toys live happily in his room until Andy's birthday brings B

In [18]:
def clean_text(df, column_name="content"):
    """
    This fucntion takes the raw text data and apply a standard NLP preprocessing pipeline consisting of the following steps:
      - Text cleaning
      - Tokenization
      - Stopwords removal
      - Stemming (Snowball stemmer)

    parameter: dataframe
    returns: the input dataframe along with the `cleaned_content` column as the results of the NLP preprocessing pipeline

    """
    from pyspark.sql.functions import udf, col, lower, trim, regexp_replace, concat_ws
    from pyspark.ml.feature import Tokenizer, StopWordsRemover
    from nltk.stem.snowball import SnowballStemmer

    # Text preprocessing pipeline
    print("***** Text Preprocessing Pipeline *****\n")

    # 1. Text cleaning
    print("# 1. Text Cleaning\n")
    # 1.a Case normalization
    print("1.a Case normalization:")
    lower_case_news_df = df.select("id", lower(col(column_name)).alias(column_name))
    lower_case_news_df.show(10)
    # 1.b Trimming
    print("1.b Trimming:")
    trimmed_news_df = lower_case_news_df.select("id", trim(col(column_name)).alias(column_name))
    trimmed_news_df.show(10)
    # 1.c Filter out punctuation symbols
    print("1.c Filter out punctuation:")
    no_punct_news_df = trimmed_news_df.select("id", (regexp_replace(col(column_name), "[^a-zA-Z\\s]", "")).alias(column_name))
    no_punct_news_df.show(10)
    # 1.d Filter out any internal extra whitespace
    print("1.d Filter out extra whitespaces:")
    cleaned_news_df = no_punct_news_df.select("id", trim(regexp_replace(col(column_name), " +", " ")).alias(column_name))

    # 2. Tokenization (split text into tokens)
    print("# 2. Tokenization:")
    tokenizer = Tokenizer(inputCol=column_name, outputCol="tokens")
    tokens_df = tokenizer.transform(cleaned_news_df)

    print("# 3. Stopwords removal:")
    stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="terms")
    terms_df = stopwords_remover.transform(tokens_df)

    print("# 4. Stemming:")
    stemmer = SnowballStemmer(language="english")
    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
    terms_stemmed_df = terms_df.withColumn("terms_stemmed", stemmer_udf("terms"))
    
    print("# 5. untokenize")
    terms_joined_df = terms_stemmed_df.withColumn("terms_join", concat_ws(" ", "terms_stemmed"))
    return terms_joined_df

In [50]:
from pyspark.sql.functions import udf, col, lower, trim, regexp_replace, concat_ws
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer

def clean_text_exp(df):    
    lower_df = df.withColumn("overview_lower", lower(col("overview")))
    trim_df = lower_df.withColumn("overview_trim", trim(col("overview_lower")))
    no_punct_df = trim_df.withColumn("overview_no_punct", regexp_replace(col("overview_trim"), "[^a-zA-Z\\s]", ""))
    no_whitespace_df = no_punct_df.withColumn("overview_no_whitespace", trim(regexp_replace(col("overview_no_punct"), " +", " ")))
    return no_whitespace_df                  
    
def tokenize_text(df, column_name):
    tokenizer = Tokenizer(inputCol=column_name, outputCol="tokens")
    return tokenizer.transform(df)

def stem_tokens(df, column_name):
    stemmer = SnowballStemmer(language="english")
    stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
    terms_stemmed_df = terms_df.withColumn("terms_stemmed", stemmer_udf("terms"))

In [51]:
df_clean = clean_text(df_movies, 'overview')
df_clean
#from pyspark.ml.feature import Tokenizer
#tokenizer = Tokenizer(inputCol="overview", outputCol="terms_stemmed")
#tokens_df = tokenizer.transform(df_movies)
#df_clean= tokens_df

***** Text Preprocessing Pipeline *****

# 1. Text Cleaning

1.a Case normalization:
+-----+--------------------+
|   id|            overview|
+-----+--------------------+
|  862|led by woody, and...|
| 8844|when siblings jud...|
|15602|a family wedding ...|
|11862|just when george ...|
|  949|obsessive master ...|
|11860|an ugly duckling ...|
|45325|a mischievous you...|
| 9091|international act...|
|  710|james bond must u...|
| 9087|widowed u.s. pres...|
+-----+--------------------+
only showing top 10 rows

1.b Trimming:
+-----+--------------------+
|   id|            overview|
+-----+--------------------+
|  862|led by woody, and...|
| 8844|when siblings jud...|
|15602|a family wedding ...|
|11862|just when george ...|
|  949|obsessive master ...|
|11860|an ugly duckling ...|
|45325|a mischievous you...|
| 9091|international act...|
|  710|james bond must u...|
| 9087|widowed u.s. pres...|
+-----+--------------------+
only showing top 10 rows

1.c Filter out punctuation:
+-----+--

DataFrame[id: string, overview: string, tokens: array<string>, terms: array<string>, terms_stemmed: array<string>, terms_join: string]

In [52]:
df_clean_test = clean_text_exp(df_movies)
df_clean_test.select("overview_no_whitespace").show()


+----------------------+
|overview_no_whitespace|
+----------------------+
|  led by woody andy...|
|  when siblings jud...|
|  a family wedding ...|
|  just when george ...|
|  obsessive master ...|
|  an ugly duckling ...|
|  a mischievous you...|
|  international act...|
|  james bond must u...|
|  widowed us presid...|
|  when a lawyer sho...|
|  an outcast halfwo...|
|  an allstar cast p...|
|  morgan adams and ...|
|  the life of the g...|
|  rich mr dashwood ...|
|  its ted the bellh...|
|  summoned from an ...|
|  a vengeful new yo...|
|  an agoraphobic ps...|
+----------------------+
only showing top 20 rows



In [38]:
df_clean = df_clean.withColumn("overview_lower", lower(col("overview")))
df_clean.select("overview_lower").show()

+--------------------+
|      overview_lower|
+--------------------+
|led by woody andy...|
|when siblings jud...|
|a family wedding ...|
|just when george ...|
|obsessive master ...|
|an ugly duckling ...|
|a mischievous you...|
|international act...|
|james bond must u...|
|widowed us presid...|
|when a lawyer sho...|
|an outcast halfwo...|
|an allstar cast p...|
|morgan adams and ...|
|the life of the g...|
|rich mr dashwood ...|
|its ted the bellh...|
|summoned from an ...|
|a vengeful new yo...|
|an agoraphobic ps...|
+--------------------+
only showing top 20 rows



In [45]:
df_clean.select('overview_no_whitespace').show(truncate=False, n=1)


AnalysisException: "cannot resolve '`overview_no_whitespace`' given input columns: [overview, terms, id, terms_join, tokens, terms_stemmed];;\n'Project ['overview_no_whitespace]\n+- Project [id#15, overview#1825, tokens#1828, terms#1832, terms_stemmed#1838, concat_ws( , terms_stemmed#1838) AS terms_join#1844]\n   +- Project [id#15, overview#1825, tokens#1828, terms#1832, <lambda>(terms#1832) AS terms_stemmed#1838]\n      +- Project [id#15, overview#1825, tokens#1828, UDF(tokens#1828) AS terms#1832]\n         +- Project [id#15, overview#1825, UDF(overview#1825) AS tokens#1828]\n            +- Project [id#15, trim(regexp_replace(overview#1811,  +,  ), None) AS overview#1825]\n               +- Project [id#15, regexp_replace(overview#1797, [^a-zA-Z\\s], ) AS overview#1811]\n                  +- Project [id#15, trim(overview#1783, None) AS overview#1797]\n                     +- Project [id#15, lower(overview#19) AS overview#1783]\n                        +- Filter ((vote_average#249 >= cast(0 as double)) && (vote_average#249 <= cast(10 as double)))\n                           +- Project [adult#10, belongs_to_collection#11, budget#12, genres#13, homepage#14, id#15, imdb_id#16, original_language#17, original_title#18, overview#19, popularity#20, poster_path#21, production_companies#22, production_countries#23, release_date#24, revenue#25, runtime#26, spoken_languages#27, status#28, tagline#29, title#30, video#31, vote_average#249, cast(vote_count#33 as int) AS vote_count#274]\n                              +- Project [adult#10, belongs_to_collection#11, budget#12, genres#13, homepage#14, id#15, imdb_id#16, original_language#17, original_title#18, overview#19, popularity#20, poster_path#21, production_companies#22, production_countries#23, release_date#24, revenue#25, runtime#26, spoken_languages#27, status#28, tagline#29, title#30, video#31, cast(vote_average#32 as double) AS vote_average#249, vote_count#33]\n                                 +- Filter AtLeastNNulls(n, overview#19)\n                                    +- Filter AtLeastNNulls(n, vote_count#33)\n                                       +- Filter AtLeastNNulls(n, vote_average#32)\n                                          +- Relation[adult#10,belongs_to_collection#11,budget#12,genres#13,homepage#14,id#15,imdb_id#16,original_language#17,original_title#18,overview#19,popularity#20,poster_path#21,production_companies#22,production_countries#23,release_date#24,revenue#25,runtime#26,spoken_languages#27,status#28,tagline#29,title#30,video#31,vote_average#32,vote_count#33] csv\n"

In [None]:
#df_clean.select('terms_join').show(truncate=False, n=2)



In [None]:
from pyspark.ml.feature import HashingTF, CountVectorizer, IDF
from pyspark.ml import Pipeline

cv = CountVectorizer(inputCol="terms_stemmed", outputCol="tf_features", vocabSize=20000, minDF=2)
idf = IDF(inputCol="tf_features", outputCol="features")

pipeline = Pipeline(stages=[cv, idf])
features = pipeline.fit(df_clean)
tf_idf_features_df = features.transform(df_clean)

In [None]:
@udf("long")
def num_nonzeros(v):
    return v.numNonzeros()

In [None]:
print("Total n. of zero-length vectors: {:d}".
      format(tf_idf_features_df.where(num_nonzeros("features") == 0).count()))

In [None]:
tf_idf_features_df = tf_idf_features_df.where(num_nonzeros("features") > 0)

In [None]:
print("Total n. of zero-length vectors (after removal): {:d}".
      format(tf_idf_features_df.where(num_nonzeros("features") == 0).count()))

In [None]:
tf_idf_features_df.select('features').count()

In [None]:
def cosine_similarity_vec(a, b):
    return 1 - a.dot(b)/(a.norm(2)*b.norm(2))

def cosine_similarity(id1, id2):
    df_1 = tf_idf_features_df.select("features").where(tf_idf_features_df.id == id1).first()
    df_2 = tf_idf_features_df.select("features").where(tf_idf_features_df.id == id2).first()
    return cosine_similarity_vec(df_1.features, df_2.features)

cosine_similarity("238","240") # toy story 1 and 2

In [None]:
tf_idf_features_df.select(["id","title"]).filter(lower(tf_idf_features_df.title).like('%toy%')).show(truncate=False,n=3)

In [None]:
tf_idf_features_df.select(["id","title"]).filter(lower(tf_idf_features_df.title).like('%batman%')).show(truncate=False, n=5)


In [None]:
def cos_sim_dataset(movie_vec):
    udf_func = udf(lambda df:str(cosine_similarity_vec(movie_vec.features, df)))
    return tf_idf_features_df.withColumn("cos_sim", udf_func("features"))

In [None]:
df_vec_batman_forever = tf_idf_features_df.select("features").where(tf_idf_features_df.id == "414").first()
 = cos_sim_dataset(df_vec_batman_forever)
df_sim.select(["cos_sim", "id"]).show(truncate=False, n=5)

In [None]:
#df_movies.select(["title","id"]).where(df_movies.id == "8844").first()

In [None]:
#df_movies.select(["id","title"]).filter(lower(df_movies.title).like('%the godfather%')).show()


In [None]:
df_sim = df_sim.withColumn("cos_sim", df_sim["cos_sim"].cast("double"))


In [None]:
df_sim.orderBy('cos_sim', ascending=False).select(["cos_sim", "id"]).show()

In [None]:
df_sim.groupby(["cos_sim"]).count().sort("cos_sim", ascending=False).show(10)
