Set up HDFS and Google credentials

In [1]:
sc


In [2]:
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="./imdb-e9e7ce7a779d.json"
os.environ["HDFSCLI_CONFIG"]="./.hdfscli.cfg"
os.environ["HADOOP_CONF_DIR"]="/opt/hadoop-3.1.0/etc/hadoop"
sc.environment["GOOGLE_APPLICATION_CREDENTIALS"]="/MovieScope-1bf4856cc738.json"

List filenames of reviews from HDFS and parallelize in preparation from processing

Parallelise the reviews and use Google NLP API to extract entities and related sentiment.

In [3]:
# Imports the Google Cloud client library
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
from functools import reduce

def collectEntities(x, y):
    # The first reduce call doesn't pass a list for x, so we need to check for that.
    if not isinstance(x, list):
        x=[x]
        

    xd = dict(x)
    #print(xd)
    
    if not isinstance(y, list):
        y = [y]
        
    for ye in y:
        if ye[0] in xd:
            try:
                xd[ye[0]] = (xd[ye[0]]+ye[1])/2
            except:
                Null
        else:
            xd[ye[0]] = ye[1]
    
    return [o for o in xd.items()]
        


In [41]:
orientation = "neg"
collection="reviews"
urlsCollection="train"

In [5]:
tf = sc.wholeTextFiles("hdfs://sp-master:8020/user/lmrd/"+collection+"/"+orientation)
tf = tf.repartition(5)

In [6]:
tf.take(5)

[('hdfs://sp-master:8020/user/lmrd/test_reviews/neg/10036_1.txt',
  "Are you kidding me? This is quite possibly the worst, amateur movie I've ever seen. The casting was horrible, the acting was worse than horrible and I'm sorry, the guy at the picnic speed loading his plate full of food was somewhere near pointless and the demonic turd and chamber pot chasing Drew around was nothing more than comical. When I herd about the Bell Witch, I wanted to believe. I read some literature on it and thought it sounded like it was possible a plausible story. But this movie just destroyed that. Ric White (Director, Writer, Lead Actor, etc) takes himself a bit too seriously and I think he gives himself a little more credit than he deserves....Do yourself a favor....skip this one."),
 ('hdfs://sp-master:8020/user/lmrd/test_reviews/neg/10037_1.txt',
  'In addition to the fact that this is just an abysmally made film (imagine giving a camcorder to the average high school drama club) the people who think

In [7]:
import re
import time

from pyspark.sql.types import *

def checkSentimentValue(x):
    try:
        f = float(x)
        
        return f
    
    except:
        print("Wrong sentiment value ", f)
        return 0
    
def extractEntitiesSetiment2(fileObj):
    # Instantiates a client
    client = language.LanguageServiceClient()
    
    review_contents = fileObj[1]
        
    #print(review_contents)
    document = types.Document(content = review_contents, 
                             type=enums.Document.Type.PLAIN_TEXT, language="en-US")
    
    tries=1
    
    while tries < 5:
        try:
            entities = client.analyze_entity_sentiment(document=document, encoding_type="UTF8")
            break
        except:
            f = open("/home/etienne/sparklog.txt", mode="a")
            f.write(""+str(fileObj[0])+"\n")
            f.write(""+str(entities)+"\n")
            f.close()
            time.sleep(1)
            
            tries +=1
    
    
    # Make sure we have no duplicate entities. If we do, average their sentiment.
    justLetters = re.compile("[^a-z ]")
    response = [o for o in zip([justLetters.sub("", entity.name.lower()) for entity in entities.entities], 
                               [checkSentimentValue(entity.sentiment.score) * checkSentimentValue(entity.sentiment.magnitude) 
                                    for entity in entities.entities])]
    
    response = sorted(response, key=lambda x: x[0])
    if (len(response)>1):
        response = reduce(collectEntities, response)
    
        
    #print(fileObj[0], response)
    try:
        fid = int(fileObj[0])
    except:
        fid=0
    
    return (fid, response)

def extractOrdering(rec):
    filenameRegexp = ".*/([0-9]*)_.*\.txt$"
    r = re.search(filenameRegexp, rec[0])

    return (int(r.groups()[0])+1, rec[1])
    #hdfs://localhost:9000/user/lmrd/reviews/pos/3467_7.txt


#sc.broadcast(filenameRegexp)
filesRdd = tf.map(extractOrdering)
filesRdd = filesRdd.repartition(5)

schema1 = StructType([
    StructField("ID", IntegerType(), False),
    StructField("ENTITY_SENTIMENT", ArrayType(
            StructType([StructField("ENTITY", StringType(), False), 
                        StructField("SENTIMENT", FloatType(), False)])), nullable=True)])


entity_documents_info = filesRdd.map(extractEntitiesSetiment2)

entity_documents_info.cache()
#entity_documents_info.saveAsTextFile("hdfs://sp-master:8020/user/lmrd/reviews/temp_pos3.txt")


entity_documents_info = spark.createDataFrame(filesRdd.map(extractEntitiesSetiment2), schema1)#schema=["ID", "ENTITIY_SENTIMENT"])
#entity_documents_info = entity_documents_info.rdd.repartition(5)

In [8]:
entity_documents_info.rdd.getNumPartitions()

5

In [9]:
#entity_documents_info = spark.createDataFrame(filesRdd.map(extractEntitiesSetiment2), schema=["ID", "ENTITIY_SENTIMENT"])

entity_documents_info.write.parquet("hdfs://spark-master:8020/user/lmrd/"+collection+"/"+orientation+"_doc_info.pq", mode="overwrite")

In [42]:
# Make sure we don't trigger Google Cloud API again
entity_documents_info = spark.read.parquet("hdfs://spark-master:8020/user/lmrd/"+collection+"/"+orientation+"_doc_info.pq")
entity_documents_info.show(5)

+-----+--------------------+
|   ID|    ENTITY_SENTIMENT|
+-----+--------------------+
|10127|[[adventure, 0.48...|
|10128|[[back garden, -0...|
|10129|[[acting, -0.09],...|
|10130|[[acting, -0.9799...|
| 1013|[[actors, 0.0], [...|
+-----+--------------------+
only showing top 5 rows



Load genre information from file (previously collected using IMDB API)

In [43]:
import pickle
import pandas as pd
import base64
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

def decodeGenre(x):
    try: 
        g = pickle.loads(base64.b64decode(x[2:-1]), encoding="bytes") 
        if (len(g)==0):
            return ["NA"]
        else:
            return g
    except:
        return ["NA"]    
        
        
genres = pd.read_csv("Data/genres_"+urlsCollection+"_urls_"+orientation+".csv", sep="\t", index_col=0, usecols=[1, 2, 3])
#print(genres.head())
genres = genres.fillna(value="b''")
genres["GENRE"] = genres["GENRE"].apply(decodeGenre) 

# Get list of unique genre values
#unique_genres = set(reduce(lambda x, y: x+y, genres["GENRE"].values))
#print(unique_genres)

#print(genres.head())
#print(genres[["ID", "GENRE"]])
#z = zip(genres["ID"], genres["GENRE"])


#genres_rdd = sc.parallelize([(int(k)-1, v[0], v[1]) for (k, v) in genres.iteritems()])

schema = StructType([
    StructField("FILM_ID", IntegerType(), True),
    StructField("GENRE", ArrayType(StringType(), containsNull=True), True)])

genres_df = spark.createDataFrame(genres, schema)

from pyspark.sql.functions import monotonically_increasing_id

# This will return a new DF with all the columns + id
genres_df = genres_df.withColumn("ID_TEMP", monotonically_increasing_id())#.limit(10)

genres_df = genres_df.withColumn("ID",F.row_number().over(W.orderBy("ID_TEMP"))).select(["FILM_ID", "GENRE", "ID"])#.limit(10)

#df1.withColumn("idx", F.row_number())
genres_df.show(5)
#genres_rdd.collect()

+-------+--------------------+---+
|FILM_ID|               GENRE| ID|
+-------+--------------------+---+
|  64354|            [Comedy]|  1|
| 100680|    [Drama, Romance]|  2|
| 100680|    [Drama, Romance]|  3|
| 100680|    [Drama, Romance]|  4|
|  47200|[Horror, Mystery,...|  5|
+-------+--------------------+---+
only showing top 5 rows



In [44]:
entity_documents_info = entity_documents_info.alias("df1").join(genres_df.alias("df2"), entity_documents_info.ID == genres_df.ID)#.select(["df1.*", "df2.FILM_ID", "df2.GENRE"])

entity_documents_info.show(5)

+---+--------------------+-------+--------------------+---+
| ID|    ENTITY_SENTIMENT|FILM_ID|               GENRE| ID|
+---+--------------------+-------+--------------------+---+
|  1|[[chantings, -0.0...|  64354|            [Comedy]|  1|
|  2|[[book, 0.0], [ca...| 100680|    [Drama, Romance]|  2|
|  3|[[acting, 0.48999...| 100680|    [Drama, Romance]|  3|
|  4|[[adaptation, -0....| 100680|    [Drama, Romance]|  4|
|  5|[[another, 0.0100...|  47200|[Horror, Mystery,...|  5|
+---+--------------------+-------+--------------------+---+
only showing top 5 rows



Zip the document-entity-sentiment rdd with the genre rdd.
There should be exactly the same number of reviews as records in the genres rdd.

Group documents by genre

In [45]:
def separateGenres(rec):
    print(len(rec))
    return [[genre, rec[0]] for genre in rec[1][1]]

def separateGenres2(rec):
    return [[genre, e, s] for (e, s) in rec[0] for genre in rec[1][1]]

def separateGenres3(rec):
    print(rec)
    return [[genre, e, s] for (e, s) in rec.ENTITY_SENTIMENT for genre in rec.GENRE]
    
#grouped_entities = entity_documents_info.flatMap(separateGenres).reduceByKey(collectEntities)
grouped_entities = entity_documents_info.rdd.flatMap(separateGenres3)
grouped_entities.repartition(5)
grouped_entities_df = spark.createDataFrame(data=grouped_entities, schema=["genre", "entity", "sentiment"])
#grouped_entities_df.show()
grouped_entities_df.cache()



DataFrame[genre: string, entity: string, sentiment: double]

In [46]:
#grouped_entities_df.show(5)
grouped_entities_df.write.parquet("hdfs://spark-master:8020/user/lmrd/"+urlsCollection+"_"+orientation+"_grouped_entities.pq", mode="overwrite")

In [16]:
from pyspark.sql import Row
from pyspark.sql.functions import collect_list

grouped_entities_df = spark.read.parquet("hdfs://spark-master:8020/user/lmrd/"+urlsCollection+"_"+orientation+"_grouped_entities.pq")

grouped_entity_words = grouped_entities_df.select(["genre", "entity"]).groupBy("genre").agg(collect_list("entity").alias("entities"))
grouped_sentiment = grouped_entities_df.select(["genre", "sentiment"]).groupBy("genre").agg(collect_list("sentiment").alias("sentiment"))
#grouped_entity_words.show()
#grouped_sentiment.show()

In [17]:
grouped_sentiment.show()

+-----------+--------------------+
|      genre|           sentiment|
+-----------+--------------------+
|      Crime|[0.0, 0.010000000...|
|    Romance|[0.0, 0.0, 0.8099...|
|   Thriller|[-0.0100000007078...|
|  Adventure|[-0.0400000028312...|
|         NA|[0.0, 0.0, -0.489...|
|      Drama|[-0.0400000028312...|
|        War|[0.0, 0.0, 0.8099...|
|Documentary|[0.0, 0.0, 0.0, 0...|
| Reality-TV|[0.0, 0.0, 0.0, 0...|
|     Family|[0.0, 0.0, -0.040...|
|    Fantasy|[0.01000000070780...|
|  Game-Show|[0.0, 0.0, 0.0, 0...|
|      Adult|[-0.25, -0.25, -0...|
|    History|[0.0, 0.0, 0.0, 0...|
|    Mystery|[-0.0400000028312...|
|    Musical|[0.0, 0.0, 0.8099...|
|  Animation|[0.0, 0.0, -0.040...|
|      Music|[0.0, -0.04000000...|
|  Film-Noir|[0.0, 0.0, 0.0, 0...|
|      Short|[0.0, 0.0, -0.040...|
+-----------+--------------------+
only showing top 20 rows



In [48]:
from pyspark.ml.feature import CountVectorizer, IDF

# remove sentiment info for use by hashingTF/tfif

# Load documents (one per line).

countVec = CountVectorizer(inputCol="entities", outputCol="tf")
cvmodel = countVec.fit(grouped_entity_words)

tf = cvmodel.transform(grouped_entity_words)
tf.show()
#sc.broadcast(hashingTF)

# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF(inputCol="tf", outputCol="tfidf").fit(tf)
tfidf = idf.transform(tf)
tfidf.show()
# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
# idfIgnore = IDF(minDocFreq=2).fit(tf)
# tfidfIgnore = idfIgnore.transform(tf)

+-----------+--------------------+--------------------+
|      genre|            entities|                  tf|
+-----------+--------------------+--------------------+
|      Crime|[all, anger, best...|(74427,[0,1,2,3,4...|
|    Romance|[book, caricature...|(74427,[0,1,2,3,4...|
|   Thriller|[another, cast, e...|(74427,[0,1,2,3,4...|
|  Adventure|[action, artists,...|(74427,[0,1,2,3,4...|
|         NA|[any, attachment,...|(74427,[0,1,2,3,4...|
|      Drama|[book, caricature...|(74427,[0,1,2,3,4...|
|        War|[actors, addition...|(74427,[0,1,2,3,4...|
|Documentary|[absurd, action, ...|(74427,[0,1,2,3,4...|
| Reality-TV|[all, ass, compan...|(74427,[2,3,4,6,7...|
|     Family|[action, artists,...|(74427,[0,1,2,3,4...|
|    Fantasy|[anything, barrag...|(74427,[0,1,2,3,4...|
|  Game-Show|[all, ass, compan...|(74427,[2,3,6,7,1...|
|      Adult|[adult movies, be...|(74427,[0,1,2,3,4...|
|    History|[aaron sherritt, ...|(74427,[0,1,2,3,4...|
|    Mystery|[another, cast, e...|(74427,[0,1,2,

In [49]:
from pyspark.sql import Row
from pyspark.sql.functions import explode
import numpy as np

vocab = tfidf.select(["genre", "tfidf"])
genreVocabs = dict()

for genre in vocab.collect():
    genreName = genre.genre
    
    t=genre.tfidf
    genreVocabs[genreName] = t
    
globalVocab = list(cvmodel.vocabulary)
    
sc.broadcast(globalVocab)
sc.broadcast(genreVocabs)

def remapEntitiesByTfidf(row):
    tfidfMappings = genreVocabs[row.genre]
    tfIndex = globalVocab.index(row.entity)
    tfidf = tfidfMappings[tfIndex]
    
    return Row(genre=row.genre, entity=row.entity, tfidf=float(tfidf), vocabIndex=int(tfIndex))
    
genreCorpora=dict()

for genre in genreVocabs.keys():
    genreEntities = tfidf.where(tfidf.genre==genre).select("genre", explode("entities").alias("entity"))
    
    #genreEntities.show()
    
    #data = genreEntities.rdd.map(remapEntitiesByTfidf)

    entitiesByTfidf = spark.createDataFrame(data=genreEntities.rdd.map(remapEntitiesByTfidf), schema=["entity", "genre", "tfidf", "vocabIndex"])
    #entitiesByTfidf.show()
    entitiesByTfidf = entitiesByTfidf.join(grouped_entities_df, on=["genre", "entity"], how="inner" ).groupBy(["genre", "entity", "tfidf", "vocabIndex"]).avg("sentiment").sort("tfidf", ascending=False)
    
    genreCorpora[genre] = entitiesByTfidf
    


In [50]:
from pyspark.sql.functions import *

for (genreName, corpus) in genreCorpora.items():
    print(genreName)
    df = corpus.select(col("genre"), col("entity"), col("tfidf"), col("vocabIndex"), col("avg(sentiment)").alias("sentiment"))

    df.write.parquet("hdfs://spark-master:8020/user/lmrd/"+genreName+"_"+orientation+"_tfidf.pq", mode="overwrite")

Crime
Romance
Thriller
Adventure
NA
Drama
War
Documentary
Reality-TV
Family
Fantasy
Game-Show
Adult
History
Mystery
Musical
Animation
Music
Film-Noir
Horror
Short
Western
Biography
Comedy
Action
Sport
Talk-Show
Sci-Fi
News


In [3]:
crime_df = spark.read.parquet("hdfs://spark-master:8020/user/lmrd/Crime_pos_tfidf.pq")

In [4]:
crime_df.show(5)

+-----+---------------+-----------------+----------+--------------------+
|genre|         entity|            tfidf|vocabIndex|           sentiment|
+-----+---------------+-----------------+----------+--------------------+
|Crime|bullfight scene|1.791759469228055|     27869|-0.09000000357627869|
|Crime|     dan kolton|1.791759469228055|     29876|                 0.0|
|Crime|     family dog|1.791759469228055|     26937|-0.04000000283122063|
|Crime|    go hk films|1.791759469228055|     36170|                 0.0|
|Crime| grandpa walton|1.791759469228055|     31916|                 0.0|
+-----+---------------+-----------------+----------+--------------------+
only showing top 5 rows



In [5]:
crime_df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

In [8]:
crime_df.orderBy("tfidf", ascending=False).show(200)

+-----+--------------------+------------------+----------+--------------------+
|genre|              entity|             tfidf|vocabIndex|           sentiment|
+-----+--------------------+------------------+----------+--------------------+
|Crime|                film|155.26441535013046|         1|0.029679202453845893|
|Crime|               movie|114.10543845742593|         0| 0.07445113833786685|
|Crime|     richard widmark|62.577351002094204|      1134|-0.00279069650744976|
|Crime|                plot| 60.34843529879897|        14|-0.00158043935778...|
|Crime|               story| 60.16085444061885|         3| 0.03526734755270105|
|Crime|                role|54.149502367804516|        26| 0.04536823003089545|
|Crime|        dana andrews| 50.37257551355662|      2154| -0.3444000040739775|
|Crime|         jean peters| 50.37257551355662|      2228| 0.04440000131726265|
|Crime|       thelma ritter| 50.37257551355662|      2241| 0.05919999949634075|
|Crime|         dirty harry| 46.67369946