# Search Project for CST 495

> CMU Movie Summary Corpus
http://www.cs.cmu.edu/~ark/personas/

We will be using Spark, so the first step is to ensure we have installed the module.

In [1]:
! pip install findspark

[33mYou are using pip version 8.1.2, however version 9.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
# First we specify the path to spark
import findspark
import os
findspark.init(os.getenv('HOME') + '/spark-1.6.0-bin-hadoop2.6')
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell'

# Now we can import pyspark and get the spark context
# - spark context is the entry point to spark for a spark application
import pyspark
try: 
    print(sc)
except NameError:
    sc = pyspark.SparkContext()
    print(sc)

<pyspark.context.SparkContext object at 0x10b8be450>


# Resilient Distributed Dataset (RDD)

From the Spark documentation:

_"A Resilient Distributed Dataset (RDD), the basic abstraction in Spark, represents an immutable, partitioned collection of elements that can be operated on in parallel."_

_"Parallelized collections are created by calling SparkContext’s parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel."_ 

In [6]:
# creating an RDD

rdd = sc.textFile(os.getcwd()+'/data/MovieSummaries/plot_summaries.txt')
print(rdd)

rdd.take(3)

MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:-2


Py4JJavaError: An error occurred while calling o29.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/dustin/Dropbox/CST495/data/MovieSummaries/plot_summaries.txt
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:64)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:46)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
	at py4j.Gateway.invoke(Gateway.java:259)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:209)
	at java.lang.Thread.run(Thread.java:745)


> **Counting words**

In [None]:
words_per_line = rdd.map(lambda s: len(s.split())).filter(lambda x : x > 2)

total_words = words_per_line.reduce(lambda x,y : x+y)

print(total_words)

> Term Frequency

In [None]:
# use Spark API for optimization
import re

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())
    #return re.compile(r'\b[a-zA-Z]+\b').split(text.lower())

def toCSVLine(data):
    return ','.join(str(d) for d in data)

rdd = sc.textFile(os.getcwd()+'/data/MovieSummaries/plot_summaries.txt')
rdd.cache()
#words = rdd.flatMap(lambda x: x.split())
words = rdd.flatMap(normalizeWords)
wordCounts = words.countByValue()

wordCounts = words.map(lambda x: (x,1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = wordCounts.map(lambda (x,y) : (y,x)).sortByKey()
results = wordCountsSorted.collect()

import csv
with open(os.getcwd() + '/data/MovieSummaries/plot_sum.csv', 'wb') as csvfile:
    spamwriter = csv.writer(csvfile, delimiter=' ')#,
                            #quotechar=' ', quoting=csv.QUOTE_MINIMAL)

    for result in results:
        count = str(result[0])
        word = result[1].encode('ascii', 'ignore')
        
        if(word and int(count)>10000): # (word.isdigit()):# and int(count)<2):
                print word + ":\t\t" + count

                #limit csv filr for now
                spamwriter.writerow([count] + [","] + [word])
          

         
            

> Testing the Spark DataFrames API with the Data

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='false', inferSchema='true').load(os.getcwd() 
        + '/data/MovieSummaries/plot_sum.csv').selectExpr("C0 as id","C1 as words")

df.show()

In [None]:
df.schema

In [None]:
sqlContext.registerDataFrameAsTable(df,'plotTerms')
sqlContext.tableNames()

sqlContext.sql("select * from plotTerms order by id limit 20").show()

In [None]:
sqlContext.tableNames()

In [None]:
df.take(10)

> **Inverted Index**

In [None]:
index = df.flatMap(lambda row : [ ( word, row[0]) for word in row[1].strip().split(' ') ] ) 
index.take(20)

In [None]:
index = df.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ).groupByKey()
index.take(10)

In [None]:
index = df.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ).groupByKey().map(lambda x : (x[0], list(x[1])))
index.filter(lambda x : x[0] == 'father').collect()

In [None]:
index = df.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ).groupByKey().map(lambda x : (x[0], list(x[1])))
index.filter(lambda x : x[0] == 'mother').collect()

# Spark DataFrames API

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
mov_df = sqlContext.read.format('com.databricks.spark.csv').options(delimiter='\t', header='false', inferSchema='true').load(os.getcwd() 
        + '/data/MovieSummaries/movie.metadata.tsv').selectExpr("C0 as wiki_id","C2 as movie_title", "C3 as release_date", "C4 as box_office_rev", "C5 as runtime"
                                                               ,"C6 as languages", "C7 as countries")

mov_df.show()

In [None]:
plot_df = sqlContext.read.format('com.databricks.spark.csv').options(delimiter="\t", header='false', inferSchema='true').load(os.getcwd() 
        + '/data/MovieSummaries/plot_summaries.txt').selectExpr("C0 as wiki_id", "C1 as plot")

plot_df.show()

In [None]:
# turn movie data frame into table
sqlContext.registerDataFrameAsTable(mov_df,'movieMeta')
sqlContext.tableNames()

sqlContext.sql("select * from movieMeta order by release_date desc limit 20").show()

In [None]:
sqlContext.registerDataFrameAsTable(plot_df,'plotTerms')
sqlContext.tableNames()

sqlContext.sql("select * from plotTerms order by wiki_id limit 20").show()

In [None]:
sqlContext.sql("select * from movieMeta where wiki_id >4725 and wiki_id <4731 limit 20").show()

In [None]:
new_df = sqlContext.sql("select movie_title, words from movieMeta left outer join plotTerms")

sqlContext.registerDataFrameAsTable(new_df,'titleWord')
sqlContext.tableNames()

In [None]:
rdd = plot_df.rdd

rdd.take(1)

In [None]:
index = rdd.flatMap(lambda row : [ ( word, row[0]) for word in row[1].split(' ') ] ) 
index.take(50)

In [None]:
index = rdd.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ) \
            .groupByKey()
index.take(10)

In [None]:
index = rdd.flatMap(lambda row : [ (word,  row[0]) for word in row[1].split(' ') ] ) \
            .groupByKey() \
            .map(lambda x : (x[0], list(x[1]))).cache()

In [None]:
indices = index.filter(lambda x : x[0] == 'green').take(10)
tup = tuple(indices[0][1])

In [None]:
sqlContext.sql("select movie_title from movieMeta where wiki_id in " + str(tup) + " order by wiki_id ").show()

# Let's try this again

>this time we will bring it back to the basics. We will normalise the text by removing unwanted characters and converting to lowercase

In [None]:
import csv
import re

with open("data/MovieSummaries/plot_summaries.tsv") as f:
    r = csv.reader(f, delimiter='\t', quotechar='"')
    tag = re.compile(r'\b[0-9]+\b')
    rgx = re.compile(r'\b[a-zA-Z]+\b')
    #docs = [ (' '.join(re.findall(tag, x[0])).lower(), ' '.join(re.findall(rgx, x[1])).lower()) for i,x in enumerate(r) if r>1 ]
    docs= {}
    for i,x in enumerate(r):
        if i >1:
            docs[' '.join(re.findall(tag, x[0])).lower()] = ' '.join(re.findall(rgx, x[1])).lower()
#print(docs[0][0], docs[0][1])    

#item_t = [ d[0] for d in docs ] # item titles
#tem_d = [ d[1] for d in docs ] # item description
#item_i = range(0 , len(item_t)) # item id





> now to normalize the movie meta data to swap the item titles with index from above

** just the basics for now to get index, possibility to get genre if needed **

In [None]:
import csv
import re

with open("data/MovieSummaries/movie.metadata.tsv") as f:
    r = csv.reader(f, delimiter='\t', quotechar='"')
    tag = re.compile(r'\b[0-9]+\b')
    rgx = re.compile(r'\b[a-zA-Z]+\b')
    #docs2 = [ (' '.join(re.findall(tag, x[0])).lower(), ' '.join(re.findall(rgx, x[2])).lower()) for i,x in enumerate(r) if r>1 ]
    docs2= {}
    for i,x in enumerate(r):
        if i >1:
            docs2[' '.join(re.findall(tag, x[0])).lower()] = ' '.join(re.findall(rgx, x[2])).lower()
            
print(docs2)

> now is the time to join the docs together

In [None]:
doc = [(docs2.get(x), y) for x, y in docs.items() if docs2.get(x)]



# for testing
# import random
 #print doc[random.randint(0, len(doc)-1)]
print doc[0][0], doc[0][1]

items_t = [ d[0] for d in doc ] # item titles
items_d = [ d[1] for d in doc ] # item description
items_i = range(0 , len(items_t)) # item id



# term freq

In [None]:
corpus = items_t[0:25]
print corpus

>> start by computing frequncy of entire corpus

In [None]:
tf = {}
for doc in corpus:
    for word in doc.split():
        if word in tf:
            tf[word] += 1
        else:
            tf[word] = 1
print(tf)

>> now that we have normailised the data we can compute the term frequency


In [None]:
from collections import Counter

def get_tf(corpus):
    tf = Counter()
    for doc in corpus:
        for word in doc.split():
            tf[word] += 1
    return tf

tf = get_tf(corpus)
print(tf)
    

# doc freq
> 

In [None]:
import collections

def get_tf(document):
    tf = Counter()
    for word in document.split():
        tf[word] += 1
    return tf

def get_dtf(corpus):
    dtf = {}
    for i,doc in enumerate(corpus):
        dtf[i]= get_tf(doc)
    return dtf

dtf = get_dtf(items_t)
dtf[342]

> compute dtf for item descriptions

In [None]:
dtf = get_dtf(items_d)
dtf[12]

# term freq matrix

In [None]:
def get_lexicon(corpus):
    lexicon = set()
    for doc in corpus:
        lexicon.update([word for word in doc.split()])
    return list(lexicon)

test_corpus = ['mountain bike', 'road bike carbon', 'bike helmet']
lexicon = get_lexicon(test_corpus)
print lexicon

> with the lexicon we are able to compute the term freq matrix

In [None]:
def get_tfm(corpus):
    
    def get_lexicon(corpus):
        lexicon = set()
        for doc in corpus:
            lexicon.update([word for word in doc.split()])
        return list(lexicon)
    
    lexicon = get_lexicon(corpus)
    
    tfm =[]
    for doc in corpus:
        tfv = [0]*len(lexicon)
        for term in doc.split():
            tfv[lexicon.index(term)] += 1
    
        tfm.append(tfv)
    
    return tfm, lexicon

test_corpus = ['mountain bike', 'road bike carbon', 'bike helmet']
tfm, lexicon = get_tfm(test_corpus)
print lexicon
print tfm


    

# sparsity of term frequency matrix

In [None]:
! pip install bokeh

In [None]:
import pandas as pd
from bokeh.plotting import figure, output_notebook, show, vplot

# sparsity as a function of document count
n = []
s = []
for i in range(100,1000,100):
    corpus = items_t[0:i]
    tfm, lexicon = get_tfm(corpus)
    c = [ [x.count(0), x.count(1)] for x in tfm]
    n_zero = sum([ y[0] for y in c])
    n_one = sum( [y[1] for y in c])
    s.append(1.0 - (float(n_one) / (n_one + n_zero)))
    n.append(i)
    
output_notebook(hide_banner=True)
p = figure(x_axis_label='Documents', y_axis_label='Sparsity', plot_width=400, plot_height=400)
p.line(n, s, line_width=2)
p.circle(n, s, fill_color="white", size=8)
show(p)