### Youtube Comment Analysis
In this notebook, we have a dataset of user comments for youtube videos related to animals or pets. We will attempt to identify cat or dog owners based on these comments, find out the topics important to them, and then identify video creators with the most viewers that are cat or dog owners.

#### 0. Data Exploration and Cleaning

In [3]:
# read data
df_raw = spark.read.load("/FileStore/tables/animals_comments.csv", format='csv', header = True, inferSchema = True)
df_raw.show(10)

In [4]:
df, df_rest= df_raw.randomSplit([0.05, 0.95])

In [5]:
# find user with preference of dog and cat
# note: please propose your own approach and rule to label data 
#cond = (df_clean["comment"].like("%my dog%") | df_clean["comment"].like("%I have a dog%")\
#        | df_clean["comment"].like("%my cat%") | df_clean["comment"].like("%I have a cat%"))

cond = (df["comment"].like("%my dog%") | df["comment"].like("%I have a dog%") | df["comment"].like("%my dogs%") | df["comment"].like("%I have dog%")
        | df["comment"].like("%my cat%") | df["comment"].like("%my cats%") | df["comment"].like("%I have a cat%") | df["comment"].like("%I have cat%") 
        | df["comment"].like("%my puppy%") | df["comment"].like("%my puppies%") | df["comment"].like("%my kitty%") | df["comment"].like("%my kitties%") 
        | df["comment"].like("%I have a kitty%") | df["comment"].like("%I have kitties%") | df["comment"].like("%I have a puppy%") | df["comment"].like("%I have puppies%"))

df_clean = df.withColumn('dog_cat',  cond)

# find user do not have 
df_clean = df_clean.withColumn('no_pet', ~df_clean["comment"].like("%my%") & ~df_clean["comment"].like("%have%")) 
df_clean = df_clean.withColumn('label', col("dog_cat").cast(IntegerType()).cast('double'))

df_clean.show()

In [6]:
for colume in df_clean.columns:
  df_clean=df_clean.filter(df_clean[colume].isNotNull())

In [7]:
from pyspark.ml.feature import StopWordsRemover

# Define a list of stop words or use default list
remover = StopWordsRemover()
stopwords = remover.getStopWords() 

# Display some of the stop words
stopwords[:10]

In [8]:
# data preprocessing 
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol="comment", outputCol="text", pattern="\\W")
df_clean= regexTokenizer.transform(df_clean)
 

In [9]:

remover.setInputCol("text")
remover.setOutputCol("vector_no_stopw")
df_clean = remover.transform(df_clean)
df_clean.show(10)


#### 1. Build the classifier 
In order to train a model against the comments, you can use RegexTokenizer to split each comment into a list of words and then use Word2Vec or other model to convert the list to a word vector. What Word2Vec does is to map each word to a unique fixed-size vector and then transform each document into a vector using the average of all words in the document.

In [11]:
%sh /home/ubuntu/databricks/python/bin/pip install nltk

In [12]:
from nltk.stem.porter import *

# Instantiate stemmer object
stemmer = PorterStemmer()

# Create stemmer python function
def stem(in_vec):
    out_vec = []
    for t in in_vec:
        t_stem = stemmer.stem(t)
        if len(t_stem) > 2:
            out_vec.append(t_stem)       
    return out_vec

# Create user defined function for stemming with return type Array<String>
from pyspark.sql.types import *
stemmer_udf = udf(lambda x: stem(x), ArrayType(StringType()))

# Create new column with vectors containing the stemmed tokens 
df_clean = df_clean.withColumn("vector_stemmed", stemmer_udf("vector_no_stopw"))

df_clean.show()

In [13]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import col,size,count,when,isnan
from pyspark.sql import *
from functools import reduce

df_clean.na.drop()
hashingTF = HashingTF(inputCol="vector_stemmed", outputCol="tf", numFeatures=200)
featurizedData = hashingTF.transform(df_clean)
featurizedData.na.drop()

featurizedData.withColumn('userid', col('userid').cast('float').cast(IntegerType()))

featurizedData.show()

In [14]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [15]:
idf = IDF(inputCol="tf", outputCol="features")
idfModel = idf.fit(featurizedData)


In [16]:
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()

In [17]:
pet = rescaledData.filter("label=1.0")
pet_train, pet_test = pet.randomSplit([0.8, 0.2])
nopet = rescaledData.filter("label=0.0")
sampleRatio = float(pet.count()) / float(nopet.count())
sample_nopet = nopet.sample(False, sampleRatio)
df_sample = pet.unionAll(sample_nopet)
sample_nopet_train, sample_nopet_test = sample_nopet.randomSplit([0.8, 0.2])

df_train = pet_train.unionAll(sample_nopet_train)
df_test = pet_test.unionAll(sample_nopet_test)
print ('training size',df_train.count())
print ('testing size',df_test.count())

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

lr = LogisticRegression(maxIter=10,featuresCol='features', labelCol='label')

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01,0.1]) \
    .build()

evaluator=BinaryClassificationEvaluator()
crossval = CrossValidator(estimator = lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)


cvModel = crossval.fit(df_train)
best_model = cvModel.bestModel
trainingSummary = best_model.summary

In [19]:
path = "/FileStore/tables/"

best_model.save(path + 'best_model')

In [20]:
prediction_train = best_model.transform(df_train)
prediction_test = best_model.transform(df_test)
accuracy_train = prediction_train.filter(prediction_train.label == prediction_train.prediction).count()/float(df_train.count())
accuracy_test = prediction_test.filter(prediction_test.label == prediction_test.prediction).count()/float(df_test.count())

print('Training set areaUnderROC: ' + str(evaluator.evaluate(prediction_train)))
print('Testing set areaUnderROC ' + str(evaluator.evaluate(prediction_test)))
print('Training set accuracy: ' + str(accuracy_train))
print('Testing set accuracy ' + str(accuracy_test))

#### 2. Classify All The Users
We can now apply the cat/dog classifiers to all the other users in the dataset.

In [22]:
prediction = best_model.transform(rescaledData)

total_pet_owner = prediction.filter("prediction = 1.0").count()
total_population = df.select("userid").distinct().count()
pet_owner_ratio = float(total_pet_owner)/float(total_population)
print('total_pet_owner :',total_pet_owner)
print('total_population :',total_population)
print('pet_owner_ratio :',pet_owner_ratio)

#### 3. Get insigts of Users

In [24]:
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel

pet_owner = prediction.filter("prediction = 1.0").select('userid','vector_stemmed')

cv = CountVectorizer(inputCol="vector_stemmed", outputCol="features",
                     minTF=2, # minium number of times a word must appear in a document
                     minDF=4) # minimun number of documents a word must appear in

countVectorModel = cv.fit(pet_owner)

countVectors = (countVectorModel
                .transform(pet_owner)
                .select("userid", "features").cache())

print(len(countVectorModel.vocabulary))  # how many documents, vocab size

numTopics = 10 # number of topics

lda = LDA(k = numTopics,
          maxIter = 50 # number of iterations
          )

ldaModel = lda.fit(countVectors)


# Print topics and top-weighted terms
topics = ldaModel.describeTopics(maxTermsPerTopic=20)
vocabArray = countVectorModel.vocabulary

ListOfIndexToWords = udf(lambda wl: list([vocabArray[w] for w in wl]))
FormatNumbers = udf(lambda nl: ["{:1.4f}".format(x) for x in nl])

topics.select(ListOfIndexToWords(topics.termIndices).alias('words')).show(truncate=False, n=numTopics)

#### 4. Identify Creators With Cat And Dog Owners In The Audience

In [26]:
from pyspark.sql.functions import countDistinct
tmp = prediction.filter("prediction = 1.0")
tmp.groupBy('creator_name').agg(countDistinct('userid')).sort('count(DISTINCT userid)',ascending= False).show()

#### 5. Analysis and Future work

In [28]:

According to the work (using only 5% of data because the databricks crashed often with the whole data), around 13% of the total user who commented on Youtube in this dataset are dog or cat owners. The potential topics they are interested in are include vet, white, eye, boo, half, lion, zoo, sugar, ferre, etc. Videos related to these topics could be promoted to these cat or dog owners. Also, 'The Dodo', 'brave wilderness' and 'Robin Seplut' are the top three creators with largest distinct cat or dog owner audience population (based only on the 5% data). Ads targeting cat or dog owners will potentially have the biggest payback cooperating with these creators.

For future work, this work could be improved in the following aspects: 
  1. Use all of the data.
  2. Specify the dog and cat owners based on more information.
