### Spark Hw3: Youtube comments analysis 推荐给动手能强力的同学, laioffer student used only.
#### Write a Spark program to analyze the text data.
#### 选作项目, soft dd: 03/20/2019, Publish your notebook and send the link to mike@laioffer.com

In this notebook, we have a dataset of user comments for youtube videos related to animals or pets. We will attempt to identify cat or dog owners based on these comments, find out the topics important to them, and then identify video creators with the most viewers that are cat or dog owners.

The dataset provided for this coding test are comments for videos related to animals and/or pets. The dataset is 240MB compressed; please download the file using this google drive link:
https://drive.google.com/file/d/1o3DsS3jN_t2Mw3TsV0i7ySRmh9kyYi1a/view?usp=sharing

 The dataset file is comma separated, with a header line defining the field names, listed here:
● creator_name. Name of the YouTube channel creator.
● userid. Integer identifier for the users commenting on the YouTube channels.
● comment. Text of the comments made by the users.

Please use a recent version of PySpark (version 2.2 or higher) to analyze the data. Do not use
any external libraries; just use the native methods from pyspark.sql and pyspark.ml. (Do not
use pyspark.mllib as this has been deprecated.) Keep your code clean and efficient, with
enough documentation so that the grader can easily follow your train of thought. Summarize
the key results from each step. Explain how to execute your code from a command line
interface.

Step 1: Identify Cat And Dog Owners
Find the users who are cat and/or dog owners.

Step 2: Build And Evaluate Classifiers
Build classifiers for the cat and dog owners and measure the performance of the classifiers.

Step 3: Classify All The Users
Apply the cat/dog classifiers to all the users in the dataset. Estimate the fraction of all users
who are cat/dog owners.

Step 4: Extract Insights About Cat And Dog Owners
Find topics important to cat and dog owners.

Step 5: Identify Creators With Cat And Dog Owners In The Audience
Find creators with the most cat and/or dog owners. Find creators with the highest statistically
significant percentages of cat and/or dog owners.

In [3]:
# link: https://drive.google.com/file/d/1o3DsS3jN_t2Mw3TsV0i7ySRmh9kyYi1a/view?usp=sharing
# command: wget https://drive.google.com/file/d/1o3DsS3jN_t2Mw3TsV0i7ySRmh9kyYi1a/view?usp=sharing

#### 0. Data Exploration and Cleaning

In [5]:
df_clean=spark.read.csv("/FileStore/tables/animals_comments.csv",inferSchema=True,header=True)
df_clean.show(10)

In [6]:
df_clean.count() 

In [7]:
df_clean = df_clean.na.drop(subset=["comment"])
df_clean.count()

In [8]:
df_clean.show()

In [9]:
# find user with preference of dog and cat
from pyspark.sql.functions import when
from pyspark.sql.functions import col

# you can user your ways to extract the label

df_clean = df_clean.withColumn("label", \
                           (when(col("comment").like("%my dog%"), 1) \
                           .when(col("comment").like("%I have a dog%"), 1) \
                           .when(col("comment").like("%my cat%"), 1) \
                           .when(col("comment").like("%I have a cat%"), 1) \
                           .when(col("comment").like("%my puppy%"), 1) \
                           .when(col("comment").like("%my pup%"), 1) \
                           .when(col("comment").like("%my kitty%"), 1) \
                           .when(col("comment").like("%my pussy%"), 1) \
                           .otherwise(0)))

# (large,small)=df_clean.filter(col('label')==1).randomSplit([0.99, 0.01],seed = 100)
# df_clean = small
df_clean.createOrReplaceTempView('iden_owner')

In [10]:
%sql

Select label,count(*)
From iden_owner
Group by 1


label,count(1)
1,40225
0,5778759


In [11]:
df_clean.show()

#### 1. Data preprocessing and Build the classifier

In [13]:
from pyspark.ml.feature import RegexTokenizer, Word2Vec, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="words", pattern="\\W")

remover = StopWordsRemover(inputCol="words", outputCol="filtered")

# countvec = CountVectorizer(inputCol='filtered', outputCol='features')
word2Vec = Word2Vec(inputCol="filtered", outputCol="features",vectorSize=20)

In [14]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[regexTokenizer, remover, word2Vec])

# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(df_clean)
dataset = pipelineFit.transform(df_clean)

In [15]:
dataset.show()

In [16]:
(label0_train,label0_test)=dataset.filter(col('label')==1).randomSplit([0.7, 0.3],seed = 100)
(label1_train, label1_ex)=dataset.filter(col('label')==0).randomSplit([0.005, 0.995],seed = 100)
(label1_test, label1_ex2)=label1_ex.randomSplit([0.002, 0.998],seed = 100)

In [17]:
trainingData = label0_train.union(label1_train)
train1, train2 = trainingData.randomSplit([0.5, 0.5],seed = 100)
trainingData = train1
testData=label0_test.union(label1_test)

In [18]:
trainingData.createOrReplaceTempView('train')
testData.createOrReplaceTempView('test')

In [19]:
%sql

Select label,count(*)
From train
group by 1

-- view training data's number of each label

label,count(1)
1,14233
0,14294


In [20]:
%sql

Select label,count(*)
From test
group by 1

-- view test data's number of each label

label,count(1)
1,11937
0,11402


In [21]:
# print("Dataset Count: " + str(dataset.count()))
# print("Training Dataset Count: " + str(trainingData.count()))
# print("Test Dataset Count: " + str(testData.count()))

##### LogisticRegression

In [23]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=8, regParam=0.05, elasticNetParam=0.8)
lrModel = lr.fit(trainingData)

print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

##### Parameter Tuning and K-fold cross-validation

In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder()\
            .addGrid(lr.regParam,[0.05,0.1,0.2,0.4])\
            .addGrid(lr.maxIter,[8,10,12,14])\
            .build()
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5,
                          seed = 42)
cvModel = crossval.fit(trainingData)


In [26]:
# best_model = cvModel.bestModel
# best_model.extractParamMap()

In [27]:
# from pyspark.ml.evaluation import BinaryClassificationEvaluator

# evaluator=BinaryClassificationEvaluator()
# evaluator.evaluate(cvModel.transform(trainingData))

##### RandomForest

In [29]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='label', featuresCol='features', seed=42)
rfModel = rf.fit(trainingData)
prediction = rfModel.transform(trainingData)

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(prediction)

In [30]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder()\
            .addGrid(rf.numTrees,[50,100])\
            .addGrid(rf.maxDepth,[6,8,10])\
            .build()
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5,
                          seed = 42)
cv_rfModel = crossval.fit(trainingData)


In [31]:
# best_model = cv_rfModel.bestModel
# best_model.extractParamMap()

##### Gradient boosting

In [33]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol='label', featuresCol='features', seed=42)

paramGrid = ParamGridBuilder()\
            .addGrid(gbt.stepSize,[0.05,0.1])\
            .addGrid(gbt.maxIter,[6,8,10])\
            .addGrid(gbt.maxDepth,[5,6])\
            .build()
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5,
                          seed = 42)
cv_gbtModel = crossval.fit(trainingData)

In [34]:
best_model = cv_gbtModel.bestModel
best_model.extractParamMap()

#### Get the best model with best hyper-parameter

In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()

rf_predict = cv_rfModel.transform(testData)
evaluator.evaluate(rf_predict)

In [37]:
lr_predict = cvModel.transform(testData)
evaluator.evaluate(lr_predict)

In [38]:
gbt_predict = cv_gbtModel.transform(testData)
evaluator.evaluate(gbt_predict)

In [39]:
rf_predict.createOrReplaceTempView('rf')

In [40]:
%sql

Select label,prediction,count(*)
From rf
Where label!=prediction
Group by 1,2

label,prediction,count(1)
1,0.0,1099
0,1.0,2133


#### 2. Classify All The Users

In [42]:
prediction = cv_rfModel.transform(dataset)
evaluator.evaluate(prediction)

In [43]:
prediction.createOrReplaceTempView('predict')

In [44]:
%sql

Select label,prediction,count(*)
From predict
Where label!=prediction
Group by 1,2

-- find error prediction number

label,prediction,count(1)
1,0.0,2992
0,1.0,1065726


#### 3. Get insigts of Users

In [46]:
%sql

Select label,prediction,count(*)
From predict
Group by 1,2

label,prediction,count(1)
1,0.0,2992
0,0.0,4713033
1,1.0,37233
0,1.0,1065726


In [47]:
%sql

with total as
(
Select count(*) as total
From predict
),
par as
(
Select prediction,count(*) as num
From predict
Group by 1
)

Select prediction, num, num/total as frac
From par cross join total

-- have a look at total fraction

prediction,num,frac
0.0,4716025,0.8104550553842389
1.0,1102959,0.1895449446157611


#### Use CountVectorizer to transform words to vectors in order to view words in each topic

In [49]:
from pyspark.ml.feature import CountVectorizer

countvec = CountVectorizer(inputCol='filtered', outputCol='vectors')
count_model = countvec.fit(label0_train)
train = count_model.transform(label0_train)

In [50]:
from pyspark.ml.clustering import LDA

lda = LDA(maxIter=10,k=20,featuresCol='vectors',seed=2)
model = lda.fit(train)

In [51]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType

vocab = count_model.vocabulary

def trans(x):
  return [vocab[i] for i in x]

idx2word = udf(lambda y: trans(y), ArrayType(StringType()))

In [52]:
topics = model.describeTopics(10).withColumn("terms",idx2word("termIndices"))
display(topics)

topic,termIndices,termWeights,terms
0,"List(1, 0, 2, 15, 3, 7, 2682, 5, 137, 81)","List(0.0010836911664018245, 7.246053847748408E-4, 6.960238701490542E-4, 4.2934145975010883E-4, 3.1549689608651615E-4, 2.6107524508118917E-4, 2.556864588178471E-4, 2.504254899292929E-4, 2.4009823968146882E-4, 2.1220520684696114E-4)","List(cat, dog, like, looks, dogs, im, brownie, cats, many, keep)"
1,"List(0, 2, 5, 15, 7, 1, 171, 95, 122, 22)","List(2.4609383813925414E-4, 2.3496217190483349E-4, 1.940620414078316E-4, 1.7135488277154073E-4, 1.486646498112639E-4, 1.4668509942329324E-4, 1.329662979274939E-4, 1.2344544851862718E-4, 1.039618757091071E-4, 1.0383387830179859E-4)","List(dog, like, cats, looks, im, cat, gave, made, kitten, video)"
2,"List(4, 0, 61, 10, 53, 5, 1, 34, 6, 2)","List(4.85000151050246E-4, 3.681647506006394E-4, 3.0652336331612545E-4, 2.875380973149959E-4, 2.3429075444645202E-4, 2.2904835832170847E-4, 2.2656171808059592E-4, 2.256605215662432E-4, 2.2435609319293077E-4, 2.090062243967697E-4)","List(love, dog, hope, know, animals, cats, cat, videos, one, like)"
3,"List(0, 2, 1, 10, 7, 3, 9, 4, 21, 54)","List(0.0035910772957740227, 0.0013737863861962357, 0.001357556013977024, 0.0012229348662404502, 8.246955528033576E-4, 7.837567653044138E-4, 6.931469342851252E-4, 6.651085661713792E-4, 6.187643085770144E-4, 6.030979381659583E-4)","List(dog, like, cat, know, im, dogs, dont, love, people, feel)"
4,"List(0, 1, 5, 2, 6, 4, 19, 51, 91, 6039)","List(7.645693020858014E-4, 6.84344339082834E-4, 5.738714738964939E-4, 4.2541193550508805E-4, 3.594318911644296E-4, 2.6318185934928756E-4, 2.2184165679738546E-4, 2.113573963591227E-4, 2.0107263834237538E-4, 1.9924303837836203E-4)","List(dog, cat, cats, like, one, love, even, little, found, frodo)"
5,"List(3, 2, 6, 0, 12, 44, 323, 2029, 5, 7)","List(3.3200992064487975E-4, 2.8469356741428974E-4, 1.9381313369709423E-4, 1.6565065463659853E-4, 1.505064438403149E-4, 1.2673606580980817E-4, 1.2143562913664653E-4, 1.1981821522952478E-4, 1.1970116228658028E-4, 1.1697450653945935E-4)","List(dogs, like, one, dog, time, cute, brother, account, cats, im)"
6,"List(0, 1, 45, 2, 7, 3, 53, 8, 6, 4949)","List(0.0010217601289505564, 7.93476045221303E-4, 4.406112780851706E-4, 3.8980850339124205E-4, 3.694899459959033E-4, 3.6494420473999446E-4, 3.535436578937345E-4, 3.238541845992745E-4, 3.079767776972558E-4, 3.043498255644679E-4)","List(dog, cat, food, like, im, dogs, animals, get, one, lease)"
7,"List(0, 3, 1, 2, 5, 4, 27, 6, 11, 9)","List(0.0041295584170933704, 0.0038608489256848717, 0.0026360296289947927, 0.002047400418488415, 0.0017881671163066377, 0.0014409864902449748, 0.0011661277103107283, 0.0011131663594914905, 0.0011067603156637144, 0.001101583548190007)","List(dog, dogs, cat, like, cats, love, think, one, got, dont)"
8,"List(3241, 0, 3, 5, 13, 7, 12, 2, 34, 31)","List(6.272396691920975E-4, 6.25476125831048E-4, 4.0298631353893964E-4, 3.3779398969765176E-4, 3.174455766853994E-4, 3.062433735239554E-4, 2.985741656529036E-4, 2.757408986380282E-4, 2.674177635904797E-4, 2.5992034126861945E-4)","List(kss, dog, dogs, cats, really, im, time, like, videos, make)"
9,"List(0, 1, 2, 4, 3, 17, 18, 8, 38, 11)","List(0.0016513476060225523, 0.0014509651605466196, 7.126682073795536E-4, 6.559557367547349E-4, 4.747546057564457E-4, 3.4318587492710286E-4, 3.230425670633077E-4, 3.0903518419565226E-4, 2.981861932816422E-4, 2.892777790679338E-4)","List(dog, cat, like, love, dogs, want, go, get, died, got)"


In [53]:
from pyspark.sql.functions import explode, arrays_zip

topics_words = topics.withColumn("termWithProb", explode(arrays_zip("terms", "termWeights")))
topics_words.createOrReplaceTempView("topics")


In [54]:
%sql

Select topic, termWithProb["terms"] as term, termWithProb["termWeights"] as probability
From topics

-- get words in each topics with probability

topic,term,probability
0,cat,0.0010836911664018
0,dog,0.0007246053847748408
0,like,0.0006960238701490542
0,looks,0.0004293414597501088
0,dogs,0.0003154968960865161
0,im,0.0002610752450811892
0,brownie,0.0002556864588178471
0,cats,0.0002504254899292929
0,many,0.0002400982396814688
0,keep,0.00021220520684696117


In [55]:
# from pyspark.ml.clustering import LDA

# lda = LDA(maxIter=5,seed=2)
# model = lda.fit(train1)


In [56]:
# topics = model.describeTopics(3)
# print("The topics described by their top-weighted terms:")
# topics.show(truncate=False)

In [57]:
# topics = model.topicsMatrix()

# for topic in range(3):
#     print("Topic " + str(topic) + ":")
#     for word in range(model.vocabSize()):
#         print(" " + str(topics[word,topic]))

#### 4. Identify Creators With Cat And Dog Owners In The Audience

In [59]:

top = spark.sql(
      """Select creator_name, count(*) as num
      From predict
      Where prediction=1
      Group by 1
      Order by 2 DESC
      limit 20""")

display(top)

# top 20 creators with most audience pet owner users

creator_name,num
The Dodo,108175
Brave Wilderness,74341
Robin Seplut,58568
Taylor Nicole Dean,58075
Hope For Paws - Official Rescue Channel,43909
Brian Barczyk,41152
Gohan The Husky,36110
Vet Ranch,32227
Gone to the Snow Dogs,26969
Viktor Larkhill,26764


In [60]:
top.createOrReplaceTempView("top_creators")

In [61]:
%sql

with total as
  (Select creator_name, sum(part_num) as sum_n
  From top_label
  Group by 1)

Select t.creator_name, num/sum_n as fraction
From top_creators as t
Inner join total as total on t.creator_name=total.creator_name
Order by 2 DESC

-- get pet owner users fraction of those top video creators

creator_name,fraction
Zak Georges Dog Training rEvolution,0.6211439588688946
Robin Seplut,0.5025268775687062
Gone to the Snow Dogs,0.4882327383323074
Paws Channel,0.484881699296999
stacyvlogs,0.4716986183999143
Cole & Marmalade,0.4564152402863999
Gohan The Husky,0.3626375833534185
Think Like A Horse,0.3599107946640478
Viktor Larkhill,0.3505986533574367
Hope For Paws - Official Rescue Channel,0.3438018729055091


In [62]:
%sql

with filter as 
  (Select creator_name, count(*) as num
  From predict
  Group by 1
  Having num>=10000
  ),
  part as 
  (Select pre.creator_name, prediction, count(*) as part_num
  From predict as pre
  Inner join filter as f on pre.creator_name=f.creator_name
  Group by 1,2
  Order by 1)

Select p.creator_name, part_num, part_num/num as fraction
From part as p
Inner join filter as f on p.creator_name=f.creator_name
Where p.prediction = 1
Order by 3 DESC

-- get highest statistically significant percentages of pet owner users with more than 10000 comments

creator_name,part_num,fraction
Zak Georges Dog Training rEvolution,11598,0.6211439588688946
MaxluvsMya,8378,0.5654315988391713
Lennon The Bunny,5802,0.5399218313791178
Robin Seplut,58568,0.5025268775687062
Kitten Academy,9419,0.4911612869583355
Gone to the Snow Dogs,26969,0.4882327383323074
Paws Channel,12829,0.484881699296999
meow meow,7892,0.4767427812009182
Cat Man Chris,6029,0.4740898010537076
stacyvlogs,17617,0.4716986183999143


In [63]:
# %sql

# with filter as 
#   (Select creator_name, count(*) as num
#   From predict
#   Group by 1
#   Having num>=10000
#   ),
#   part as 
#   (Select t.creator_name, label, count(*) as part_num
#   From predict as t
#   Inner join filter as f on t.creator_name=f.creator_name
#   Group by 1,2
#   Order by 1)

# Select p.creator_name, label, part_num, part_num/num as fraction
# From part as p
# Inner join filter as f on p.creator_name=f.creator_name

creator_name,label,part_num,fraction
Jonathan Seabolt,0,1951,0.9994877049180328
Jonathan Seabolt,1,1,0.0005122950819672131
Parrot Wizard,0,2046,0.9990234375
Parrot Wizard,1,2,0.0009765625
DanceMomsFan43,0,2481,1.0
Sandy G,0,1531,1.0
Keystone Puppies,0,1221,0.9975490196078431
Keystone Puppies,1,3,0.0024509803921568
Loki the Red Fox,0,22418,0.9933094067083168
Loki the Red Fox,1,151,0.0066905932916832


#### 5. Analysis and Future work

## Summary

1.Performed analysis on comments for Youtube videos related to pets.  
2.Built data processing pipeline with tokenizer and word2vec to transform comments to feature vectors and identify pet owners based on keywords of comments.  
3.Constructed and tuned classification models based on Logistic Regression, Random Forest and Gradient Boosting to predict if a user is pet owner.  
4.Chose the best model - Random Forest with 93% accuracy to estimate fraction of all users who are pet owners.  
5.Extracted main topics of pet owners using Latent Dirichlet Allocation (LDA).  
6.Identified top 20 attractive video creators to pet owner users based on total number and highest percentage.

In [66]:
#md
From project to your CV format 
1. overview of project 
2. data clean and modeling 
3. data analysis 
4. build ml model
5. recommendation based on the model results
