### Youtube comments analysis

In this notebook, we have a dataset of user comments for youtube videos related to animals or pets. We will attempt to identify cat or dog owners based on these comments, find out the topics important to them, and then identify video creators with the most viewers that are cat or dog owners.

#### Data Exploring and Cleaning

#### Data Preprocess

In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
import numpy as np
from pyspark.sql.types import ArrayType, FloatType, StringType, BooleanType
from scipy.spatial import distance
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import RegexTokenizer
#Word2Vec
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import Word2VecModel

import string
# read data

class data_preprocess: 
  def __init__(self):
    
    '''
    Class Usage:
    Dprep = data_preprocess()
    df_clean = Dprep.load_data()
    df_filtered = Dprep.english_filter()
    Dprep.label_cat_dog()
    df_tokened = Dprep.tokenize()
    df_tokened = Dprep.redundant_filter()
    #Dprep.w2v_train(vectorSize = 6, seed = 1024, MinCount = 2)
    df_transformed= Dprep.w2v_transform()
    
    comment out the w2v_train function if you already have w2v model trained.
    '''
    
    self.df = None
    self.df_eng_filtered= None
    self.df_labeled = None
    self.df_tokened = None
    self.df_transformed = None
    # some user says 'My husky' isdead of 'My dog', 
    # cat/dog breeds dictionary is used to filter everything.
    self.cat_breeds = spark.read.load("/FileStore/tables/cats.csv", format='csv', header = True, inferSchema = True)
    self.dog_breeds = spark.read.load("/FileStore/tables/dogs.csv", format='csv', header = True, inferSchema = True)
    
    # params for w2v model.
    self.w2v_model = None 
    self.temp_path = '/FileStore/my-stuff'
    self.model_path = self.temp_path + "/word2vec-model"

  
  # load the data
  def load_data(self):
    # comments data load and describe
    df = spark.read.load("/FileStore/tables/animals_comments_csv-5aaff.gz", format='csv', header = True, inferSchema = True)        
    self.df = df.na.drop(subset=df.columns)
    self.df.describe().show()
    print('Loading complete!')
    return self.df
  
  
  # keep only english comments, drop all other rows
  def english_filter(self):
    # string.printable[10:62] ： alphabet
    # english comments only
    english_filter_udf = udf(lambda comment: any(char in string.printable[10:62] for char in comment), BooleanType())
    self.df_eng_filtered = self.df.withColumn('eng', english_filter_udf(col('comment')))
    self.df_eng_filtered = self.df_eng_filtered.where(self.df_eng_filtered['eng'] == True).drop('eng')
    # drop na rows
    self.df_eng_filtered = self.df_eng_filtered.na.drop(subset = self.df_eng_filtered.columns)
    
    # transformed to lower case
    lower_udf = udf(lambda comment: comment.lower(), StringType())
    self.df_eng_filtered = self.df_eng_filtered.withColumn('comments', lower_udf(col('comment'))).drop('comment')
    
    print('English filtering complete! df_filtered: ')
    self.df_eng_filtered.show(3)
    return self.df_eng_filtered
  

  # add columns: cat_dog, and no pet
  def label_cat_dog(self, col_name = 'comments'):
    
    cat_breeds = [str(i.breeds) for i in self.cat_breeds.select('breeds').collect()]
    dog_breeds = [str(i.breeds) for i in self.dog_breeds.select('breeds').collect()]
    
    # user defined filter to check any breeds of cat/dog indicators
    # call self.*** in udf may cause error
    cat_dog_filter_udf = udf(lambda comment:
                         any('my '+ word in comment for word in cat_breeds)|
                         any('my '+ word in comment for word in dog_breeds)|
                         any('I have a '+ word in comment for word in cat_breeds)|
                         any('I have a '+ word in comment for word in cat_breeds),
                         BooleanType())

    df_labeled = self.df_eng_filtered.withColumn('cat_dog',cat_dog_filter_udf(col(col_name)))
    # df_labeled = df_labeled.withColumn('dog',dog_filter_udf(col('comment')))                                 
    
    cond_no_pet = (~df_labeled[col_name].like("%my%") & ~df_labeled[col_name].like("%have%"))
    df_labeled = df_labeled.na.drop(subset=[col_name])
    self.df_labeled = df_labeled.withColumn('no_pet',cond_no_pet)
    print('Labeling complete! df_labeled: ')
    #print('totally '+ str(self.df_labeled.where(self.df_labeled['cat_dog'] == True).count())+ ' cat/dog owners.')
    #print('totally '+ str(self.df_labeled.where(self.df_labeled['no_pet'] == True).count())+ ' no pet users.')
    self.df_labeled.show(3)
    return self.df_labeled
  
  def tokenize(self):
    if not self.df_labeled:
      print('None type df_labeled.')
      return
    regexTokenizer = RegexTokenizer(inputCol="comments", outputCol="text", pattern="\\W")
    self.df_tokened = regexTokenizer.transform(self.df_labeled)
    print('Tokenizing complete! df_tokened:')
    self.df_tokened.show(3)
    return self.df_tokened
  
  '''
  redundant fiter tends to remove all meaningless rows/tokens.
  '''
  def redundant_filter(self):
    # drop all sentence less than 3 words and with words longer than 20 chars.
    # display how many short sentences to dump
    
    self.df_tokened = self.df_tokened.filter(size(col('text')) >=3)
    
    # word length filter
    word_length_udf = udf(lambda text: [word for word in text if len(word)<20], ArrayType(StringType()))
    self.df_tokened = self.df_tokened.withColumn('text2', word_length_udf(col('text'))).drop('text')
      
    # remove all non English symbles ',./$_____' e.g. from comments
    empty_token_udf = udf(lambda text: [token for token in text if all(char in string.printable[10:62] for char in token)], ArrayType(StringType()))
    self.df_tokened = self.df_tokened.withColumn('tokens', empty_token_udf(col('text2'))).drop('text2')
    
    # remove redundent words if half of words is repeated 
    redundent = udf(lambda text: ((len(text)/(len(set(text))+1))>1)|(len(set(text)) <= 2) , BooleanType())
    self.df_tokened = self.df_tokened.withColumn('redundent', redundent(col('tokens'))).filter(col('redundent') == False).drop('redundent')
    
    # remove all url comments:
    url_udf = udf(lambda text: any(token == 'https' for token in text)|any(token == 'http' for token in text), BooleanType())
    self.df_tokened = self.df_tokened.withColumn('url', url_udf(col('tokens'))).filter(col('url') == False).drop('url')             
    # set_udf = udf(lambda tokens: set(tokens), ArrayType(StringType()))
    print('redundant filtering complete! df_tokened: ')
    self.df_tokened.na.drop(subset = ['tokens'])
    self.df_tokened.show()
    return self.df_tokened
    
    
  def w2v_train(self, vectorSize = 6, seed = 1024, MinCount = 2):
    word2Vec = Word2Vec(vectorSize = vectorSize, seed = seed, inputCol = "tokens", outputCol='Word2Vector').setMinCount(MinCount)
    self.w2v_model = word2Vec.fit(self.df_tokened)
    print('vectors:')
    self.w2v_model.getVectors()
    self.w2v_model.write().overwrite().save(self.model_path)
    print('w2v training completed and saved.')
    return self.w2v_model
    
  def w2v_transform(self):
    if not self.df_tokened:
      print('None type df_tokened.')
      return
    self.w2v_model = Word2VecModel.load(self.model_path)
    self.df_transformed = self.w2v_model.transform(self.df_tokened)
    print('w2v transforming complete! df_transformed count:')
    print(self.df_transformed.count())
    return self.df_transformed

In [5]:
Dprep = data_preprocess()
df_clean = Dprep.load_data()
df_filtered = Dprep.english_filter()
Dprep.label_cat_dog()
df_tokened = Dprep.tokenize()
df_tokened = Dprep.redundant_filter()
#Dprep.w2v_train(vectorSize = 6, seed = 1024, MinCount = 2)
df_transformed = Dprep.w2v_transform()

#### Clustering Analysis

In [7]:
import numpy as np
import matplotlib.pyplot as plt
from pyspark.ml.clustering import KMeans
from pyspark.sql.functions import *
from pyspark.ml.feature import PCA
from pyspark.sql.window import Window
from pyspark.sql.types import ArrayType, FloatType, StringType, BooleanType
from scipy.spatial import distance
from pyspark.ml.linalg import Vectors, VectorUDT

class kmeans_analysis:
  def __init__(self, 
               dataframe, 
               featureCol = 'features',
               predictionCol = 'prediction',
               k = 10):
    
    self.df = dataframe
    self.df_ranked = None
    self.featureCol = featureCol
    self.predictionCol = predictionCol
    self.k = k
    self.cost = np.zeros(60)
    self.model = None
    
  def optimize_k(self):    
    for k in range(2,60):
      kmeans = KMeans(featuresCol = self.featureCol, predictionCol = self.predictionCol).setK(k).setSeed(1)
      model = kmeans.fit(self.df)
      self.cost[k] = model.computeCost(self.df)
      self._plot_k()
      
      
  def _plot_k(self):
    fig, ax = plt.subplots(1,1,figsize = (8,6))
    ax.plot(range(2,60),self.cost[2:60])
    ax.set_xlabel('k')
    ax.set_ylabel('cost')
    display(fig)
  
  def kmeans_fit(self):
    kmeans = KMeans(featuresCol = self.featureCol, predictionCol = self.predictionCol).setK(self.k).setSeed(1024)
    self.model = kmeans.fit(self.df)
    self.df = self.model.transform(self.df)
       
  #kmeans display using pca
  def kmeans_display(self):    
    if 'pca_features' in self.df.columns:
      self.df = self.df.drop('pca_features')
    if 'pca1' in self.df.columns:
      self.df = self.df.drop('pca1')
    if 'pca2' in self.df.columns:
      self.df = self.df.drop('pca2')
      
    pca = PCA(k=2, inputCol= self.featureCol, outputCol="pca_features")
    pca_model = pca.fit(self.df)
    self.df = pca_model.transform(self.df)
    firstelement= udf(lambda v:float(v[0]),FloatType())
    secondelement = udf(lambda v:float(v[1]),FloatType())
    self.df = self.df.withColumn('pca1', firstelement('pca_features')).withColumn('pca2', secondelement('pca_features'))
    display(self.df)
  
  
  # generate distance from each sentence to its cluster center
  def kmeans_distance(self):
    if not self.model:
      print('kmeans_fit first. No model found.')
      return    
    # centers to be a list of k centers from our model
    centers = self.model.clusterCenters()    
    # transfer each element to vector type
    for i in range(len(centers)):
      centers[i] = Vectors.dense(centers[i])    
      
    get_center_udf = udf(lambda pred: centers[pred], VectorUDT())
    distance_udf = udf(lambda local, cent: distance.euclidean(local, cent), FloatType())    
    if 'center' in self.df.columns:
      self.df = self.df.drop('center')      
    #Generate a column called 'center', based on column 'prediction'
    self.df = self.df.withColumn('center', get_center_udf(self.predictionCol))  
    if 'distance' in self.df.columns:
      self.df = self.df.drop('distance')    
    self.df = self.df.withColumn('distance', distance_udf(col(self.featureCol), col('center')))
    if 'distance' in self.df.columns:
      print('distance feature generated.')
      return self.df
    else:
      print('distance generate failed')
      return 
  
                             
  # n: number of points showed in return
  def k_closest(self, n = 5):    
    if 'prediction' not in self.df.columns:
      print('Predict using kmeans first.')
      return
    # Rank each observation based on distance to its cluster center of descending order. 
    # Get top K of these observations. 
    window = Window.partitionBy(self.df['prediction']).orderBy(self.df['distance'].desc())
    self.df_ranked = self.df.withColumn('rank', dense_rank().over(window))
    display(self.df_ranked.select('tokens').filter(col('rank') < n))
    return 
  

In [8]:
KMA = kmeans_analysis(df_transformed, featureCol = 'Word2Vector')
KMA.kmeans_fit()
KMA.kmeans_display()
KMA.kmeans_distance()

creator_name,userid,comments,cat_dog,no_pet,tokens,Word2Vector,prediction,pca_features,pca1,pca2
Doug The Pug,87.0,i shared this to my friends and mom the were lol,False,False,"List(i, shared, this, to, my, friends, and, mom, the, were, lol)","List(1, 6, List(), List(-0.3201561488888481, -0.03205943514000286, -0.044752599840814415, 0.07024491292593832, -0.19831698252396152, 0.5348494974049655))",4,"List(1, 2, List(), List(0.3785119241208788, 0.020154190450322178))",0.37851194,0.020154191
bulletproof,530.0,stop saying get em youre literally dumb . have some common sense or dont own this kind of dog. fucking retarded i swear,False,False,"List(stop, saying, get, em, youre, literally, dumb, have, some, common, sense, or, dont, own, this, kind, of, dog, fucking, retarded, i, swear)","List(1, 6, List(), List(-0.4307098683308471, 0.09237943803468211, 0.038621348667551174, -0.17921919587322258, -0.06691320071166212, 0.4112489667128433))",2,"List(1, 2, List(), List(0.3044013708958779, -0.08107833780968862))",0.30440137,-0.081078336
Meu Zoológico,670.0,tenho uma jiboia e um largato,False,True,"List(tenho, uma, jiboia, e, um, largato)","List(1, 6, List(), List(-0.4519168728341659, -0.41367907325426734, -1.030401716629664, 0.30368262141322094, 0.5453239679336548, -0.7784908960262934))",3,"List(1, 2, List(), List(-1.3183306959894745, 0.3396536024627873))",-1.3183306,0.3396536
ojatro,1031.0,i wanna see what happened to the pigs after that please,False,True,"List(i, wanna, see, what, happened, to, the, pigs, after, that, please)","List(1, 6, List(), List(-0.4353814511136575, 0.26644242758100684, 0.03670473130081188, -0.09481648139824922, 0.024751239943064073, 0.4750547165220434))",2,"List(1, 2, List(), List(0.3173595795056212, 0.07611243122250286))",0.31735957,0.076112434
Tingle Triggers,1212.0,well shit now im hungry,False,True,"List(well, shit, now, im, hungry)","List(1, 6, List(), List(-0.44204549491405487, 0.009815030172467232, -0.03198935240507126, -0.08084417004138232, -0.37253583669662477, 0.34343627691268924))",7,"List(1, 2, List(), List(0.4032054023548983, -0.1177516129665596))",0.4032054,-0.11775161
Hope For Paws - Official Rescue Channel,2036.0,holy crap. that is quite literally the most adorable pup ive ever seen.,False,True,"List(holy, crap, that, is, quite, literally, the, most, adorable, pup, ive, ever, seen)","List(1, 6, List(), List(-0.4741069720341609, -0.27302189400562876, 0.13620598370639178, -0.07352973977462031, -0.32502602270016306, 0.29902670790369695))",5,"List(1, 2, List(), List(0.41631847563055924, -0.17050284093442866))",0.41631848,-0.17050284
Brian Barczyk,2698.0,call the teddy larry,False,True,"List(call, the, teddy, larry)","List(1, 6, List(), List(-0.3901917636394501, -0.28775641322135925, 0.27719611674547195, -0.04181652161059901, 0.05713285505771637, 0.23385472502559423))",5,"List(1, 2, List(), List(0.24152831137798686, -0.03201563484017274))",0.24152832,-0.032015637
Hope For Paws - Official Rescue Channel,2911.0,that mother cat looks like my own im guessing she is a russian blue due to her looks and unusual coping skills.,False,False,"List(that, mother, cat, looks, like, my, own, im, guessing, she, is, a, russian, blue, due, to, her, looks, and, unusual, coping, skills)","List(1, 6, List(), List(-0.4058401674370874, -0.2874150675594468, 0.10263016345826062, -0.08500934287241597, -0.19557340317194097, 0.32157727872783487))",5,"List(1, 2, List(), List(0.3197478580404739, -0.16155733957532734))",0.31974787,-0.16155735
Hope For Paws - Official Rescue Channel,2911.0,its people like hope for paws who truly make the world a better place <3,False,True,"List(its, people, like, hope, for, paws, who, truly, make, the, world, a, better, place)","List(1, 6, List(), List(-0.18379263433494736, 0.004865771259314248, 0.009815264360180922, -0.18924807728866913, -0.14915816485881805, 0.5048231386712619))",4,"List(1, 2, List(), List(0.33626850498563765, -0.1972438681183716))",0.3362685,-0.19724387
Talking Kitty Cat,2911.0,steve: no wet food for a month!:cats immediately stop fighting:,False,True,"List(steve, no, wet, food, for, a, month, cats, immediately, stop, fighting)","List(1, 6, List(), List(-0.27378980815410614, -0.06392708424986764, 0.03711289641532031, -0.10594939880750397, 0.014484082263979046, 0.4332600222392516))",4,"List(1, 2, List(), List(0.2301858865932369, -0.07512625439199543))",0.23018588,-0.07512625


#### 2. Classify All The Users
We can now apply the cat/dog classifiers to all the other users in the dataset.

Comments from Cats and Dogs owners contain different features. However, there might be multiple communities share similar contexts among dog owners and cat owners. Each of them indicates different lifestyle and trends in pet raising. Thus, we would like to cluster the users based on their word vectors to find features of multiple communities. After that, we would be able to label users as cat owners, dog owners or users with both kinds.

##### Clustering

Unbalanced data cat_dog: 38833, no_pets: 3069234

##### The kmeans clustering shows unique feature of each cluster. However, each cluster don't have to be of single type of pet. Different of promotions could be sent based on these user features.

### Logistic Regression Model with ML Pipeline

In [15]:
df_transformed.show()

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

class lr_pipeline:
  def __init__(self,
               data_frame,
               feature_name = 'Word2Vector'
              ):
    self.df = data_frame
    self.df_train = None
    self.lr = LogisticRegression(featuresCol = feature_name, labelCol = 'label', maxIter = 100)
    self.paramGrid = ParamGridBuilder()\
                     .addGrid(self.lr.regParam, [0.1,0.01]) \
                     .addGrid(self.lr.maxIter, [100, 200]) \
                     .build()
    
    self.evaluator = BinaryClassificationEvaluator()
    self.crossval = CrossValidator(estimator = self.lr,
                          estimatorParamMaps= self.paramGrid,
                          evaluator= self.evaluator,
                          numFolds=10)  
    
  def labeling(self):
    label_udf = udf(lambda cat_dog, no_pet: 1.0 if cat_dog == True else 0.0 if no_pet == True else -1.0, FloatType())
    self.df_train = self.df.withColumn('label',label_udf(col('cat_dog'), col('no_pet')))\
                             .select(['creator_name','userid','comments','Word2Vector','label'])\
                             .filter(col('label')>=0)
    print('Finished Labeling')
  
  def balancing(self, type = 'normal'):
    try:
      if not self.df_train:
        num_y1 = self.df.filter(col('label') == 1).count()
        num_y0 = self.df.filter(col('label') == 0).count()
        self.df_train = self.df_train.sampledBy('label', fractions = {0: num_y1/num_y0, 1:1.0}, seed = 1024)
        print('Finished Balancing.')
    except:
      print('Add labels first.')
    
  def cross_validating(self):
    cv_model = self.crossval.fit(self.df_train)
    result = cv_model.transform(self.df_train)                   
    print('cv best evaluate score:'+ str(self.evaluator.evaluate(result)))
    return result    
          

In [17]:
lr = lr_pipeline(df_transformed)
lr.labeling()
lr.balancing()
result = lr.cross_validating()

In [18]:
lr.df_train

In [19]:

'''
Example CV in Decision Tree:

from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def vectorizeData(data):
    return data.map(lambda r: [r[-1], Vectors.dense(r[:-1])]).toDF(['label','features'])

vectorized_CV_data = vectorizeData(stratified_CV_data)

# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',
                             outputCol='indexedLabel').fit(vectorized_CV_data)

# Automatically identify categorical features and index them
featureIndexer = VectorIndexer(inputCol='features',
                               outputCol='indexedFeatures',
                               maxCategories=2).fit(vectorized_CV_data)

# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree])

# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, [2,3,4,5,6,7]).build()

# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
                                              predictionCol='prediction', metricName='f1')    

# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

CV_model = crossval.fit(vectorized_CV_data)

# Fetch best model
tree_model = CV_model.bestModel.stages[2]
print tree_model


vectorized_test_data = vectorizeData(final_test_data)

transformed_data = CV_model.transform(vectorized_test_data)
print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)

predictions = transformed_data.select('indexedLabel', 'prediction', 'probability')
predictions.toPandas().head()


df_train = df_transformed.filter((df_transformed['cat'] == True)|(df_transformed['dog'] == True) \
                                 &~((df_transformed['cat'] == True) & (df_transformed['dog']==True)))
#df_valid = df_transformed.filter((df_transformed['no_pet']) == True)
def generate_label(cat):
  if cat is True:
    return 1.0
  else:
    return 0.0
label_udf = udf(lambda cat: generate_label(cat), FloatType())
df_train = df_train.withColumn('label', label_udf('cat'))

# this version used w2v as feature
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol = 'label', featuresCol = 'outputCol',maxIter = 400, regParam = 0.2)
lr_model = lr.fit(df_train)
lr_summary = lr_model.summary
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
display(lr_summary.roc)
print("areaUnderROC: " + str(lr_summary.areaUnderROC))
'''

###Fit the lr model and Check the performance

##Pipeline Model with Ngram

#### 3. Get insigts of Users

##### Due to the amount of users who claims to be a cat/dog owners, most of users are merely watching videos instead of owning a pet. Thus, pet adoption ads could be sent to those who are potential pet adoptors.

#### 4. Identify Creators With Cat And Dog Owners In The Audience

##### The Creators identity can be predicted by the average of its audiences categories.

In [26]:
from pyspark.sql import functions as F
df_prediction = pipeline_model.transform(df_transformed)

In [27]:
df_prediction.select('*').groupBy('creator_name').agg(F.round(F.mean('prediction'))).show()

#### 5. Analysis and Future work

##### Since the performance of the model remains to be refined. Better representations could provide better output. A Named Entity Recognition could be a candidate to extract word dependencies which would indicate the sentiments towards different objects. Different of breeds of pet could also be detected by NER model to make finer user classfication.