In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import nltk

In [2]:
path_title_year_canonical = 'data/movie_titles_canonical.txt'
path_id_title_year = 'data/netflix_movie_titles.txt'
title_year = sc.textFile(path_title_year_canonical)
id_title_year = sc.textFile(path_id_title_year)
print('title_year count : %d' % title_year.count())
print('title_year_distinct_count : %d ' % title_year.distinct().count())
print('id_title_year_count: %d' % id_title_year.count())
print('id_title_year_distinct_count: %d' % id_title_year.distinct().count())

title_year count : 12862
title_year_distinct_count : 12846 
id_title_year_count: 17770
id_title_year_distinct_count: 17770


### remove duplicate

In [3]:
title_year = title_year.distinct()

### separate function conver string into tuple
- there are "," in title, so we cant direct split by ","

In [4]:
def separate_id_year_title(line):
    point = [i for i, w in enumerate(line) if w == ','][:2]
    return line[:point[0]],line[point[0]+1:point[1]],line[point[1]+1:]

def separate_title_year(line):
    point = [i for i, w in enumerate(line) if w == ','][-1]
    return line[:point],line[point+1:]

### create Dataframe and generate title.lower()
using Dataframe rather than RDD is because 
- Dataframe is more convenient to do join on any column with any condition.
- Dataframe can name each column, which is better for codign and understanding.
- Dataframe is easy to apply functions to specific column

In [5]:
parts = id_title_year.map(lambda l: separate_id_year_title(l)) \
                     .map(lambda p: (p[0],p[1],p[2],p[2].lower())) \
                     .map(lambda p: Row(id=p[0], year=p[1],title=p[2],clean_title=p[3]))
idty_df = spark.createDataFrame(parts)
parts = title_year.map(lambda l:separate_title_year(l))\
                  .map(lambda p:(p[0],p[1],p[0].lower()))\
                  .map(lambda p: Row(real_title=p[0],year=p[1],clean_title=p[2]))
ty_df = spark.createDataFrame(parts)

### Convert 'N/A' and 'NULL' in year to ""

In [6]:
udf = UserDefinedFunction(lambda year:year if year.isdigit() else '',StringType())
idty_df = idty_df.withColumn('year',udf(idty_df.year))
ty_df = ty_df.withColumn('year',udf(ty_df.year))

### first Join with very less modify on them
Considering the efficiency, the algorithm first join with less modify on titles. And then select the unmatched title to do fuzzy match which is more complex.

In [7]:
cond = [idty_df.year==ty_df.year,idty_df.clean_title==ty_df.clean_title]
join_df1 = idty_df.join(ty_df,cond,'leftouter').select(idty_df.id,idty_df.title,idty_df.year,idty_df.clean_title,ty_df.real_title)

### Cache matched titles and filter unmatch titles

In [8]:
df2 = join_df1.where(join_df1.real_title.isNull()).select(join_df1.id,join_df1.title,join_df1.year,join_df1.clean_title)
join_df1 = join_df1.where(join_df1.real_title.isNotNull())
join_df1.cache()

DataFrame[id: string, title: string, year: string, clean_title: string, real_title: string]

In [9]:
print ('unmatched title %d' % df2.count())
print ('matched title %d' %join_df1.count())

unmatched title 14139
matched title 3631


### remove stop words
- try to use stopwords provided by nltk, however, it provides "too much" stop words for the title and some title like"who am I" will reduce to empty. To solve this problem, I only use three common words the a an as stop word

In [10]:
from nltk.corpus import stopwords
#cachedStopWords = stopwords.words("english")
cachedStopWords = ['the','a','an']
udf = UserDefinedFunction(lambda title: " ".join([word for word in title.split() if word not in cachedStopWords]) ,StringType())
df2 = df2.withColumn('clean_title',udf(df2.clean_title))
ty_df = ty_df.withColumn('clean_title',udf(ty_df.clean_title))

### stemming words 
- using nltk snowball algorithm to stem word. e.g. boys -> boy, really -> real

In [11]:
from nltk.stem.snowball import SnowballStemmer 
sbs = SnowballStemmer('english',ignore_stopwords=False)
udf = UserDefinedFunction(lambda title: " ".join([sbs.stem(word) for word in title.split()]) ,StringType())
df2 = df2.withColumn('clean_title',udf(df2.clean_title))
ty_df = ty_df.withColumn('clean_title',udf(ty_df.clean_title))

In [12]:
#remove un alphabetic
#udf = UserDefinedFunction(lambda title: " ".join([word for word in title.split() if word.isalpha()]) ,StringType())
#df2 = df2.withColumn('clean_title',udf(df2.clean_title))
#ty_df = ty_df.withColumn('clean_title',udf(ty_df.clean_title))

### second match with clean title

In [13]:
cond = [df2.year==ty_df.year,df2.clean_title==ty_df.clean_title]
join_df2 = df2.join(ty_df,cond,'leftouter').select(df2.id,df2.title,df2.year,df2.clean_title,ty_df.real_title)
join_df2.cache()

DataFrame[id: string, title: string, year: string, clean_title: string, real_title: string]

In [14]:
#filter unmatch title
df3 = join_df2.where(join_df2.real_title.isNull()).select(join_df2.id,join_df2.title,join_df2.year,join_df2.clean_title)
join_df2 = join_df2.where(join_df2.real_title.isNotNull())

In [15]:
print ('unmatched title %d' % df3.count())
print ('matched title %d' %join_df2.count())

unmatched title 14087
matched title 52


In [16]:
append = join_df1.union(join_df2)
rdd1 = append.select(append.id,append.real_title).rdd.map(lambda x:(x[0],x[1]))
#rdd2 = df3.select(df3.id,df3.title).rdd.map(lambda x:(x[0],x[1]))
#rdd = rdd1.union(rdd2).cache()
rdd = rdd1

### Broadcast map(id,title) to all workers.

In [None]:
b_id_title = sc.broadcast({k:v for (k,v) in rdd.collect()})

In [None]:
rdd.saveAsPickleFile('data/id_title_mapping1')