# Youtube Comments Analysis Development in Apache Spark

## Outline

***Aim***

In this notebook, wtih a dataset of user comments for youtube videos related to animals or pets,  attempt to identify cat or dog owners based on these comments, find out the topics important to them, and then identify video creators with the most viewers that are cat or dog owners.

***Data Source***

The dataset provided for this coding test are comments for videos related to animals and/or pets. The dataset is 700MB.

 ***Data Schema***

The dataset file is comma separated, with a header line defining the field names, listed here:
● creator_name. Name of the YouTube channel creator.
● userid. Integer identifier for the users commenting on the YouTube channels.
● comment. Text of the comments made by the users.

***Conclusion***

1.   Identified cat or dog owners based on Youtube comments, 0.7 GB dataset, recommended important topics to them
2.   Built data processing pipeline for vectoring based on Spark, dealt with unbalanced data by oversampling
3.   Search model hyper-parameters tuning with Spark ML cross-evaluation by Logistic Regression, Random Forest and Gradient Boosting for classification
4.   Applied Gradient Boosting to build recommend system and got 97.48% accuracy

## Content


*   Data exploration and cleaning
     1. Spark pipeline: RegexTokenizer, Word2Vec
     2. Deal with unbalance data
*   Data preprocessing and build the classifier

     1.   Logistical Regression
     2.   Random Forest
     3.   Gradient Boost Tree
*   Classify all The users on the best model
      1.  Grid search hyper-parameter
*   Get insights of result
*   Identify Creators With Cat And Dog Owners In The Audience

## 0.Data Exploration and Cleaning

In [5]:
df_clean=spark.read.csv("/FileStore/tables/animals_comments.csv",inferSchema=True,header=True)
df_clean.show(10)

In [6]:
df_clean.count() 

In [7]:
df_clean = df_clean.na.drop(subset=["comment"])
df_clean.count()

In [8]:
df_clean.show()

In [9]:
# find user with preference of dog and cat
from pyspark.sql.functions import when
from pyspark.sql.functions import col

# you can user your ways to extract the label

df_clean = df_clean.withColumn("label", \
                           (when(col("comment").like("%my dog%"), 1) \
                           .when(col("comment").like("%I have a dog%"), 1) \
                           .when(col("comment").like("%my cat%"), 1) \
                           .when(col("comment").like("%I have a cat%"), 1) \
                           .when(col("comment").like("%my puppy%"), 1) \
                           .when(col("comment").like("%my pup%"), 1) \
                           .when(col("comment").like("%my kitty%"), 1) \
                           .when(col("comment").like("%my pussy%"), 1) \
                           .otherwise(0)))

In [10]:
df_clean.show()

## 1. Data preprocessing and Build the classifier

In [12]:
from pyspark.ml.feature import RegexTokenizer, Word2Vec
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="words", pattern="\\W")

word2Vec = Word2Vec(inputCol="words", outputCol="features")

In [13]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, word2Vec])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df_clean)
dataset = pipelineFit.transform(df_clean)

In [14]:
dataset.show()

In [15]:
(lable0_train,lable0_test)=dataset.filter(col('label')==0).randomSplit([0.005, 0.995],seed = 100)
(lable1_train, lable1_ex)=dataset.filter(col('label')==1).randomSplit([0.3, 0.7],seed = 100)
(lable1_test, lable1_ex2)=lable1_ex.randomSplit([0.002, 0.998],seed = 100)

In [16]:
trainingData = lable0_train.union(lable1_train)
testData=lable0_test.union(lable1_test)

In [17]:
print("Dataset Count: " + str(dataset.count()))
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

### Apply LogisticRegression

In [19]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [20]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [21]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well. For example's sake we will choose age & occupation
selected = predictions.select("label", "prediction", "creator_name", "userid", "comment", "words", "features")

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [23]:
evaluator.getMetricName()

In [24]:
print(lr.explainParams())

### Parameter Tuning and K-fold cross-validation

In [26]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [27]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [28]:
# Use test set to measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [29]:
# cvModel uses the best model found from the Cross Validation
# Evaluate best model
evaluator.evaluate(predictions)

In [30]:
print('Model Intercept: ', cvModel.bestModel.intercept)

In [31]:
weights = cvModel.bestModel.coefficients
weights = [(float(w),) for w in weights]  # convert numpy type to float, and to tuple
weightsDF = sqlContext.createDataFrame(weights, ["Feature Weight"])
display(weightsDF.take(8))

Feature Weight
-1.039185419992263
-0.7442855786512187
-1.651323678024798
1.5853979854927644
2.2875601058150545
-5.887394090676607
-3.528712481862166
-0.1042890401044868


In [32]:
# View best model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction")
display(selected.take(8))

label,prediction
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0
0,0.0


### RandomForest

In [34]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)

# Train model with Training Data
dtModel = dt.fit(trainingData)

In [35]:
print("numNodes = ", dtModel.numNodes)
print("depth = ", dtModel.depth)

In [36]:
display(dtModel)

treeNode
"{""index"":5,""featureType"":""continuous"",""prediction"":null,""threshold"":-0.011046313208860799,""categories"":null,""feature"":41,""overflow"":false}"
"{""index"":3,""featureType"":""continuous"",""prediction"":null,""threshold"":-0.0691588376394727,""categories"":null,""feature"":53,""overflow"":false}"
"{""index"":1,""featureType"":""continuous"",""prediction"":null,""threshold"":0.03441051441784297,""categories"":null,""feature"":83,""overflow"":false}"
"{""index"":0,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":2,""featureType"":null,""prediction"":1.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":4,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"
"{""index"":6,""featureType"":null,""prediction"":0.0,""threshold"":null,""categories"":null,""feature"":null,""overflow"":false}"


It is quite poor performance for randomforest to deal with unbalance data, a good idea is to oversampling or undersampling the unbalance data. If not, the decision tree will be 0 depth and 1 nodes, it will also be able to make sure high accuracy.

In [38]:
# Make predictions on test data using the Transformer.transform() method.
predictions = dtModel.transform(testData)

In [39]:
predictions.printSchema()

In [40]:
# View model's predictions and probabilities of each prediction class
selected = predictions.select("label", "prediction", "creator_name", "userid", "comment")

In [41]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Improving accuracy from 0.5 to 0.87 because of oversampling.

In [43]:
dt.getImpurity()

### Gradient boosting

In [45]:
from pyspark.ml.classification import GBTClassifier

# declare the model
gbt = GBTClassifier()
# using cross validation to search best hyper-parameter
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxIter, [10, 20])
             .build())
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# running
cvModel = cv.fit(trainingData)
predictions = cvModel.transform(testData)
evaluator.evaluate(predictions)

In [46]:
# obviously GDBT has the best model which accuracy is 97.47%, which is larger than classification 97.07%
best_model = cvModel.bestModel
print(best_model)
print(best_model._java_obj.getMaxIter())
print(best_model._java_obj.getMaxDepth())

## 2. Classify All The Users

#### the best model is GDBT with hyper-parameter: {MaxIter: 20, MaxDepth: 6}

In [49]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=20, maxDepth=6)
gbtModel = gbt.fit(trainingData)
predictions = gbtModel.transform(lable1_ex2)
# drop the creator_name is null
predictions.na.drop(subset=["creator_name"]).show(10)

In [50]:
# Estimate the fraction of all users who are cat/dog owners
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

Int = udf(lambda x: int(x), IntegerType())

result = predictions.groupBy('creator_name').agg(count('*').alias('total_comments'), sum(Int('prediction')).alias('follows'))
display(result.take(10))

creator_name,total_comments,follows
Jonathan Seabolt,1,0
Parrot Wizard,1,1
Loki the Red Fox,117,90
Veronica Morris,3,3
Keystone Puppies,2,1
Aaron Rift,9,7
FROSTY Life,114,97
Shamama Hunting Owl,2,2
Miss Aww,9,7
Ashley Siemon,1,1


## 3. Get insigts of Users

In [52]:
# find the important topics for dog person or cat person

result = predictions.where('label=="1"').select('userid','comment').show(10)

## 4. Identify Creators With Cat And Dog Owners In The Audience

In [54]:
result = predictions.groupBy('creator_name').agg(count('*').alias('total_comments'), sum(Int('prediction')).alias('follows')).orderBy('follows', ascending=False)
display(result.take(10))

creator_name,total_comments,follows
The Dodo,2872,2480
Cole & Marmalade,2008,1657
Gohan The Husky,1668,1412
Zak Georges Dog Training rEvolution,1532,1260
Hope For Paws - Official Rescue Channel,1314,1118
Gone to the Snow Dogs,1191,999
stacyvlogs,1078,991
Vet Ranch,1173,947
Brian Barczyk,1105,919
Robin Seplut,1091,900


In [55]:
# add column to record percent of cat and dog owners
result2 = predictions.groupBy('creator_name').agg(count('*').alias('total_comments'), sum(Int('prediction')).alias('follows')).withColumn("percent", round(col("follows")/col("total_comments"),5))

display(result2.take(10))

creator_name,total_comments,follows,percent
Jonathan Seabolt,1,0,0.0
Parrot Wizard,1,1,1.0
Loki the Red Fox,117,90,0.76923
Veronica Morris,3,3,1.0
Keystone Puppies,2,1,0.5
Aaron Rift,9,7,0.77778
FROSTY Life,114,97,0.85088
Shamama Hunting Owl,2,2,1.0
Miss Aww,9,7,0.77778
Ashley Siemon,1,1,1.0


In [56]:
# calculate z score

from pyspark.sql.types import FloatType
import math

# calculate average percent, we get 0.83637
# display(result2.agg((sum('follows')/sum('total_comments')).alias('average_level')))
ap = 0.83637     

# t, f , p stands for total_comments, follows, percent, z_score should be Z = ap-p/sqrt((1-p)*p/t)
def z_score(t, f, p):
 return __builtin__.abs( (p-ap)/ math.sqrt(((1-p)*p)/t) ) if p != 1 and p != 0 else 0.0    # 0 will be null, use 0.9
  
Z = udf(lambda t,f,p: z_score(t,f,p), FloatType())

result3 = result2.withColumn('z_score', Z('total_comments','follows','percent'))
display(result3.take(10))

creator_name,total_comments,follows,percent,z_score
Jonathan Seabolt,1,0,0.0,0.0
Parrot Wizard,1,1,1.0,0.0
Loki the Red Fox,117,90,0.76923,1.7236794233322144
Veronica Morris,3,3,1.0,0.0
Keystone Puppies,2,1,0.5,0.9513980150222778
Aaron Rift,9,7,0.77778,0.4227900803089142
FROSTY Life,114,97,0.85088,0.4349283576011657
Shamama Hunting Owl,2,2,1.0,0.0
Miss Aww,9,7,0.77778,0.4227900803089142
Ashley Siemon,1,1,1.0,0.0


In [57]:
# find the highest statistically significant percentages one
final_result = result3.orderBy('z_score', ascending=False).show(1)