In [1]:
import pyspark

In [2]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

22/04/14 18:22:18 WARN Utils: Your hostname, Myungjongs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.6 instead (on interface en0)
22/04/14 18:22:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/04/14 18:22:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /Users/yoonjung/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [4]:
nltk.download('stopwords')

from nltk.corpus import stopwords

stop_en = stopwords.words('english')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/yoonjung/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


# Step 1: Data Loading and pre-processing


In [None]:
import pandas as pd

pd_df = pd.read_csv('IMDB_Dataset.csv')
pd_df.

In [5]:
# ************ function to remove load our CSV into a dataframe ************
def load_csv_to_df(file_loc):
  # n limits # of rows loaded

  # File location and type
    file_location = file_loc
    file_type = "csv"

    # CSV options
    infer_schema = "true"
    first_row_is_header = "true"
    delimiter = ","

  # The applied options are for CSV files. For other file types, these will be ignored.
      df = spark.read.format(file_type) \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .option("multiLine",True) \
        .option("quote", "\"") \
        .option("escape", "\"") \
        .load(file_location)
    return df

In [6]:
# ********** Function to preprocess our dataframe **********
from bs4 import BeautifulSoup
import re
from nltk.stem import SnowballStemmer
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf


@udf(returnType=ArrayType(StringType()))
def preprocess_body(body):
  # note: not HTML therefore beautiful soup not technically required
  body = BeautifulSoup(body)
  
  # remove any urls
    urls  =  body.find_all('a')
    if len(urls) > 0: body.a.clear()
  
    # remove code snippets
    codes = body.find_all('code')
    if len(codes) > 0: body.code.clear()

    # delete preformatted text
    pres = body.find_all('pre')
    if len(pres) > 0: body.pre.clear()

    # start with our list of words
    text = body.get_text()

    # blank list we will append to
    words = []

    # make lowercase and strip whitespace
    text = text.lower().strip()

    # remove punctuation
    text = re.sub(r'([^\s\w_]|_)+', '', text)

    # snowball stemmer object
    snowball = SnowballStemmer("english")


    # tokenize into sentences
    sents = nltk.sent_tokenize(text)
    for sent in sents:
    # tokenize each sentance into words
    for word in nltk.word_tokenize(sent):
        if word in stop_en: continue  # remove stopwords
        if len(word) < 3: continue  # remove words < 3 characters
        if not word.isalpha(): continue  # remove numbers

    words.append(snowball.stem(word))  # append stemmed version of word to list

    return words

In [7]:
from pyspark.ml.feature import StringIndexer

# convert sentiment into numerical values
def make_target_vector_numerical(df):
    sentiment_to_num = StringIndexer(inputCol='sentiment',outputCol='label')
    model = sentiment_to_num.fit(df)
    df = model.transform(df)
    return df
  

## Loading Dataset

In [15]:
# imdb dataset
file_location = "IMDB_Dataset.csv"
df_ori = load_csv_to_df(file_location)
df_ori.show()

+--------------------+---------+
|              review|sentiment|
+--------------------+---------+
|One of the other ...| positive|
|A wonderful littl...| positive|
|I thought this wa...| positive|
|Basically there's...| negative|
|Petter Mattei's "...| positive|
|Probably my all-t...| positive|
|I sure would like...| positive|
|This show was an ...| negative|
|Encouraged by the...| negative|
|If you like origi...| positive|
|Phil the Alien is...| negative|
|I saw this movie ...| negative|
|So im not a big f...| negative|
|The cast played S...| negative|
|This a fantastic ...| positive|
|Kind of drawn in ...| negative|
|Some films just s...| positive|
|This movie made i...| negative|
|I remember this f...| positive|
|An awful film! It...| negative|
+--------------------+---------+
only showing top 20 rows



In [16]:
# preprocess the body
df_ori = df_ori.withColumn('filtered_body', preprocess_body(df_ori['review'])) 

# convert sentiment into numerical values
df_ori = make_target_vector_numerical(df_ori)

## Step 2: Create Numerical Feature Vectors


Create term frequency vector using HashingTF


In [14]:
'''from pyspark.ml.feature import HashingTF, IDF, Tokenizer
# create term frequency vectors using HashingTF
# used to have numFeatures=20...
hashingTF = HashingTF(inputCol="filtered_body", outputCol="rawFeatures_htf")
df_ori = hashingTF.transform(df_ori)'''

### 2.1 Create term frequency vectors using CountVectorizer instead of HashingTF

In [17]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="filtered_body", outputCol="rawFeatures_cv")
cvModel_ori = cv.fit(df_ori)
df_ori = cvModel_ori.transform(df_ori).cache()

                                                                                

In [18]:
# apply the Inverse Document Frequency (IDF)
idf = IDF(inputCol="rawFeatures_cv", outputCol="features_idf")

idfModel_ori = idf.fit(df_ori)

df_ori = idfModel_ori.transform(df_ori).cache()

22/04/14 18:36:03 WARN DAGScheduler: Broadcasting large task binary with size 2.0 MiB
                                                                                

### 2.2 Create feature vector utilizing Word2Vec

In [19]:
from pyspark.ml.feature import Word2Vec

#Create Word2Vec Model with vector size = 100 and context size = 5
word2Vec = Word2Vec(vectorSize=100, windowSize=5, inputCol="filtered_body", outputCol="word2vec")

# creates word vectors
w2vModel_ori = word2Vec.fit(df_ori)

# average word vectors for each review into one review vector
df_ori = w2vModel_ori.transform(df_ori).cache()


22/04/14 18:41:45 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 18:41:47 WARN MemoryStore: Not enough space to cache rdd_88_0 in memory! (computed 260.1 MiB so far)
22/04/14 18:41:47 WARN BlockManager: Persisting block rdd_88_0 to disk instead.
22/04/14 18:41:54 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 18:41:57 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/04/14 18:41:57 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [20]:
# our 3 cleaned dataframes to work with
df_ori.cache()

22/04/14 18:44:33 WARN CacheManager: Asked to cache already cached data.


DataFrame[review: string, sentiment: string, filtered_body: array<string>, label: double, rawFeatures_cv: vector, features_idf: vector, word2vec: vector]

## Step 3: Test our features using some models

In this step, we run our feature vectors through different models

## Decision Tree Model

In [26]:
#using CountVector
dt_cv_df = df_ori.select(['label', 'rawFeatures_cv']).limit(1000).cache()

#using CountVector and Inverse Document Frequency
dt_idf_df = df_ori.select(['label', 'features_idf']).limit(1000).cache()

#using Word2Vec
dt_w2v_df = df_ori.select(['label', 'word2vec']).limit(1000).cache()

22/04/14 18:50:57 WARN CacheManager: Asked to cache already cached data.
22/04/14 18:50:57 WARN CacheManager: Asked to cache already cached data.
22/04/14 18:50:57 WARN CacheManager: Asked to cache already cached data.


In [29]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

def DecisionTreeAccuracy(df, label, feature, bins, depth):
    dt = DecisionTreeClassifier(labelCol=label, featuresCol=feature, maxBins=bins, maxDepth=depth, seed=42)
    (training, testing) = df.randomSplit([0.7,0.3], seed=42)
  
    # fit our model to the training data
    pred = dt.fit(training)

    # apply model to test data
    test_results = pred.transform(testing)
    acc_eval = MulticlassClassificationEvaluator()
    accuracy = acc_eval.evaluate(test_results)
    return accuracy

# determine accuracy of model using each type of feature
dt_cv_acc = DecisionTreeAccuracy(dt_cv_df, "label", "rawFeatures_cv", 10, 5)
dt_idf_acc = DecisionTreeAccuracy(dt_idf_df, "label", "features_idf", 10, 5)
dt_w2v_acc = DecisionTreeAccuracy(dt_w2v_df, "label", "word2vec", 10, 5)
print(f"Decision Tree count vectorizer accuracy: {dt_cv_acc}")
print(f"Decision Tree TF-IDF accuracy: {dt_cv_acc}")
print(f"Decision Tree Word2Vec accuracy: {dt_w2v_acc}")


22/04/14 18:59:24 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 18:59:25 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 18:59:25 WARN DAGScheduler: Broadcasting large task binary with size 6.4 MiB
22/04/14 18:59:39 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
22/04/14 18:59:40 WARN MemoryStore: Not enough space to cache rdd_3653_0 in memory! (computed 276.7 MiB so far)
22/04/14 18:59:40 WARN BlockManager: Persisting block rdd_3653_0 to disk instead.
22/04/14 18:59:42 WARN MemoryStore: Not enough space to cache rdd_3653_0 in memory! (computed 416.2 MiB so far)
22/04/14 18:59:42 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
22/04/14 18:59:43 WARN MemoryStore: Not enough space to cache rdd_3653_0 in memory! (computed 416.2 MiB so far)
22/04/14 18:59:44 WARN DAGScheduler: Broadcasting large task binary with size 7.8 MiB
22/04/14 18:59:44 WARN MemoryStore: Not enough space to cache rdd_

Decision Tree count vectorizer accuracy: 0.6725432109764791
Decision Tree TF-IDF accuracy: 0.6725432109764791
Decision Tree Word2Vec accuracy: 0.7184921144324226


22/04/14 19:00:12 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB


**We see a slight edge with Word2Vec - since these models train quickly we can attempt some further tuning**


we can try to apply different tuning in order to get best Decision Tree --> 

In [None]:
'''
# using cross validation and Word2Vec
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

train, test = dt_w2v_df.randomSplit([0.7,0.3], seed=42)

dt = DecisionTreeClassifier(labelCol="label", featuresCol="word2vec", seed=42)
  
paramGrid = ParamGridBuilder()\
    .addGrid(dt.impurity, ["entropy", "gini"])\
    .addGrid(dt.maxBins, [5, 10, 15])\
    .addGrid(dt.maxDepth, [5, 10, 15, 20])\
    .build()

crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=3, seed=42)

cvModel = crossval.fit(train)

bestModel = cvModel.bestModel
 
print("Impurity parameter: ")
print(bestModel._java_obj.getImpurity())
print("maxDepth best param: ")
print(bestModel._java_obj.getMaxDepth())

'''

In [32]:
# function used to give the accuracy, f1 score, precision, recall, and confusion matrix for a model on a specified dataframe. 
from pyspark.mllib.evaluation import MulticlassMetrics

def model_evaluation(df, model, dataSourceName):
    (training, testing) = df.randomSplit([0.7,0.3], seed=42)
  
    # fit our model to the training data
    pred = model.fit(training)

    # apply model to test data
    test_results = pred.transform(testing)
  
    # print out accuracy
    acc_eval = MulticlassClassificationEvaluator()
    accuracy = acc_eval.evaluate(test_results)

    # other metrics
    predictionAndLabel = test_results.select("prediction", "label").rdd
    multiMetrics = MulticlassMetrics(predictionAndLabel)
    precision = multiMetrics.weightedPrecision
    recall = multiMetrics.weightedRecall
    f1 = multiMetrics.weightedFMeasure()
    matrix = multiMetrics.confusionMatrix().toArray()
  
    print(f"Using model on {dataSourceName} data")
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1: {f1}")
    print("Confustion Matrix: ")
    print(matrix)
    print("-------------------")


In [33]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
dt_tuned_w2v = DecisionTreeClassifier(labelCol="label", featuresCol="word2vec", maxBins=16, maxDepth=10, seed=42, impurity="entropy")
dt_df_ori = df_ori.select(['label', 'word2vec']).cache()
 
model_evaluation(dt_df_ori, dt, "report") 

22/04/14 19:05:16 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:18 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:19 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:21 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:21 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:21 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:22 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:22 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:22 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:23 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
22/04/14 19:05:25 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB


Using model on report data
Accuracy: 0.7634236158029044
Precision: 0.7634395444323038
Recall: 0.7634322373696872
F1: 0.7634236158029044
Confustion Matrix: 
[[5644. 1800.]
 [1740. 5780.]]
-------------------


                                                                                