#### 0. Data Exploration and Cleaning

In [2]:
# read data
df = spark.read.load("/FileStore/tables/animals_comments_csv-5aaff.gz", format='csv', header = True, inferSchema = True)
df.show(10)

In [3]:
df.dtypes

In [4]:
#print('There are {} rows in this dataset'.format(df.count()))
print("Number of rows in df:", df.count())


In [5]:
# Count null values in each columns 

#print('There are {} null values in creator_name column'.format(df.filter(df['creator_name'].isNull()).count()))
#print('There are {} null values in userid column'.format(df.filter(df['userid'].isNull()).count()))
#print('There are {} null values in comment column'.format(df.filter(df['comment'].isNull()).count()))



print('Number of null values in creator_name: ',df.filter(df['creator_name'].isNull()).count())
print('Number of null values in userid: ',df.filter(df['userid'].isNull()).count())
print('Number of null values in comment: ',df.filter(df['comment'].isNull()).count())

In [6]:
# drop out rows with no comment and no userid

def pre_process(df):
  df_drop = df.filter(df['comment'].isNotNull())
  df_drop = df_drop.filter(df_drop['userid'].isNotNull())
  df_drop = df_drop.dropDuplicates()
  
  print('After dropping, we have ', str(df_drop.count()), 'row in dataframe')
  return df_drop

df_drop = pre_process(df)

In [7]:
import pyspark.sql.functions as F
#convert text in comment to lower case.
df_clean = df_drop.withColumn('comment', F.lower(F.col('comment')))

In [8]:
df_clean.show()

##### This is an unlabeled dataset and we want to train a clasifier to identify cat and dog owners. Thus first thing to do is to label each comment. 
1. Label comment when he/she has dogs or cats.   
2. label comment when he/she don't have a dog or cat.   
3. Combine 1 and 2 as our training dataset, and rest of the dataset will be the data we predict.   
4. The strategy to tell if a user own or not own is just using key words (like I have a dog) to tell. Otherwise we can't have better ways and don't have labels.

In [10]:
# find user with preference of dog and cat
# note: please propose your own approach and rule to label data 
cond = (df_clean["comment"].like("%my dog%") | df_clean["comment"].like("%i have a dog%")\
        | df_clean["comment"].like("%my cat%") | df_clean["comment"].like("%i have a cat%") \
        | df_clean["comment"].like("%my dogs%") | df_clean["comment"].like("%my cats%")\
        | df_clean["comment"].like("%my puppies%") | df_clean["comment"].like("%i have dogs%")\
        | df_clean["comment"].like("%i have cats%") | df_clean["comment"].like("%my puppy%")\
        | df_clean["comment"].like("%my kitten%") | df_clean["comment"].like("%i have a puppy%")\
        | df_clean["comment"].like("%i have puppies%"))



df_clean = df_clean.withColumn('dog_cat', cond)

# find user do not have dog or cat


no = (df_clean['comment'].like('%my%') | df_clean['comment'].like('%have%') \
      | df_clean['comment'].like('%my dog%') | df_clean['comment'].like('%my cat%') \
      | df_clean['comment'].like('%my dogs%') | df_clean['comment'].like('%my cats%'))
df_clean = df_clean.withColumn('no_pets', ~no)

In [11]:
df_clean.show()

#### 1. Build the classifiter
In order to train a model against the comments, you can use RegexTokenizer to split each comment into a list of words and then use Word2Vec or other model to convert the list to a word vector. What Word2Vec does is to map each word to a unique fixed-size vector and then transform each document into a vector using the average of all words in the document.

In [13]:
from pyspark.ml.feature import RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol = 'comment', outputCol = 'text', pattern = '\\W')
df_clean = regexTokenizer.transform(df_clean)
df_clean.show()

#### Optional: only select 1000000 rows for testing. (In final version, try to use all dataset)

In [15]:
from pyspark.sql.functions import rand

df_clean.orderBy(rand(seed=0)).createOrReplaceTempView('table')
df_clean = spark.sql("select * from table limit 1000000")

In [16]:
# use word2vec get text vector feature.
from pyspark.ml.feature import Word2Vec

# Learn a mapping from words to Vectors. (choose higher vectorSize here)
word2vec = Word2Vec(vectorSize = 20, minCount = 1, inputCol = 'text', outputCol = 'wordvector')
model = word2vec.fit(df_clean)


df_model = model.transform(df_clean)
df_model.show()

#### Get training dataset
Note that here I am using training dataset as "Has cat or dog" + "Don't have pets" which is "Dog_cat True" + "No_pets True"

The rest of dataset will be served as dataset for prediction.

In [18]:
df_pets = df_model.filter(F.col('dog_cat') == True)
df_no_pets = df_model.filter(F.col('no_pets') == True)
print('Number of confirmed user who has pets: ', df_pets.count())
print('Number of confirmed user who does not has pets: ', df_no_pets.count())


In [19]:
df_pets.show()

In [20]:
df_no_pets.show()

Note that number of negative labels is around 100 times more than positive labels, so here we need to downsampling the negative labels. By rule of thumb, the gap should be no more than 10 times. But here I make them balance to the ratio around 1:2 (1 for positive: 2 for negative)

In [22]:
df_no_pets.orderBy(rand()).createOrReplaceTempView('table')

Num_pos_label = df_model.filter(F.col('dog_cat') == True).count()
Num_neg_label = df_model.filter(F.col('no_pets') == True).count()

df_no_pets_down = spark.sql('Select * from table limit {}'.format(Num_pos_label * 2))

In [23]:
df_no_pets_down.show()
print('Number of rows in dataset: ', df_no_pets_down.count())

In [24]:
print('Now after balancing the lables, we have ')   
print('Positive label: ', Num_pos_label)
print('Negtive label: ', df_no_pets_down.count())

In [25]:
def get_label(df_pets, df_no_pets_down):
  df_labeled = df_pets.select('dog_cat', 'wordvector').union(df_no_pets_down.select('dog_cat','wordvector'))
  return df_labeled

df_labeled = get_label(df_pets, df_no_pets_down)
df_labeled.show()

In [26]:
#convert Boolean value to 1 and 0's
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

def multiple(x):
  return int(x*1)


udf_boolToInt = udf(lambda z: multiple(z), IntegerType())
df_labeled = df_labeled.withColumn('label', udf_boolToInt('dog_cat'))

df_labeled.show()

In [27]:
df_labeled_1 = df_labeled.withColumn('label', F.when(df_labeled['dog_cat'] == 'true', 1).otherwise(0)) 
df_labeled_1.show()

####Logistic Regression Model

In [29]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

train, test = df_labeled.randomSplit([0.8,0.2], seed = 12345)

lr = LogisticRegression(featuresCol = 'wordvector', labelCol = 'label', maxIter = 10, regParam = 0.1, elasticNetParam = 0.8)

# Run TrainValidationSplit, and choose the best set of parameters
lrmodel = lr.fit(train)



In [30]:
# Make predictions on test data. model is the model with combination of parameters
# that performed best.


predictions =lrmodel.transform(test)
predictions.show()

In [31]:
# Extract the summary from the returned LogisticRegressionModel instance trained
#Obtain ROC
trainingsummery = lrmodel.summary
trainingsummery.roc.show()

In [32]:
print('ROC:' + str(trainingsummery.areaUnderROC))

In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

def get_evaluation_result(predictions):
  evaluator = BinaryClassificationEvaluator(
                labelCol = 'label', rawPredictionCol = 'rawPrediction', metricName = 'areaUnderROC')
  AUC = evaluator.evaluate(predictions)
  
  TP = predictions[(predictions['label'] == 1) & (predictions['prediction'] == 1)].count()
  FP = predictions[(predictions['label'] == 0) & (predictions['prediction'] == 1)].count()
  TN = predictions[(predictions['label'] == 0) & (predictions['prediction'] == 0)].count()
  FN = predictions[(predictions['label'] == 1) & (predictions['prediction'] == 0)].count()
  
  
  accuracy = (TP + TN) / (TP + TN + FP + FN)
  precision =  TP / (TP + FP)
  recall = TP / (TP + FN)
  
  
  print ("True Positives:", TP)
  print ("False Positives:", FP)
  print ("True Negatives:", TN)
  print ("False Negatives:", FN)
  print ("Test Accuracy:", accuracy)
  print ("Test Precision:", precision)
  print ("Test Recall:", recall)
  print ("Test AUC of ROC:", AUC)

  
print("Prediction result summary for Logistic Regression Model:  ")
get_evaluation_result(predictions)

####Random Forest Model

In [35]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

# Train a RF model
rf = RandomForestClassifier(labelCol = 'label', featuresCol = 'wordvector', numTrees = 15)

rfmodel = rf.fit(train)
predictions = rfmodel.transform(test)
predictions.show()

In [36]:
print('Prediction result summary for Random Forest Model:')
get_evaluation_result(predictions)

#### 2. Classify All The Users

we can apply the cat/dog classifier to all the other users in the dataset

In [38]:
df_unknow = df_model.filter((F.col('dog_cat') == False) & (F.col('no_pets') == False))
df_unknow = df_unknow.withColumn('label', df_unknow.dog_cat.cast('integer'))
print("There are {} users whose attribute is unclear.".format(df_unknow.count()))

pred_all = rfmodel.transform(df_unknow)
pred_all.show()

Fraction of the users who are cat/dog owners (ML estimate):
* Idea is using $$ \frac{\text{Num of owner labeled} + \text{Num of owner predicted}}{ \text{Total users in our used dataset}} $$

In [40]:
# number of total user
total_user = df_model.select('userid').distinct().count()
print('Total user: ',total_user)
#number of labeled owner
owner_labeled = df_pets.select('userid').distinct().count()
print('Total labeled owner: ',owner_labeled)
#number of owner predicted
owner_pred = pred_all.filter(F.col('prediction') == 1).count()
print('Total predicted owner: ',owner_pred)




fraction = (owner_labeled + owner_pred) / total_user
print('Fraction of the users who are cat/dog owners (ML estimate): ', round(fraction,3))


#### 3.Get insights of Users

In [42]:
from pyspark.ml.feature import StopWordsRemover

df_all_owner = df_pets.select('text').union(pred_all.filter(F.col('prediction') == 1).select('text'))

stopwords_custom = ['im', 'get', 'gets', 'got', 'one', 'hes', 'shes', 'he', 'she', 'dog', 'dogs', 'cat', 'cats', 'kitty', 'much', 'really', 'like', 'dont', 'want', 'thin', 'know', 'see', 'also', 'never', 'go', 'ive']

remover1 = StopWordsRemover(inputCol ='raw', outputCol='filtered')
core = remover1.getStopWords()
core = core + stopwords_custom
remover = StopWordsRemover(inputCol='text', outputCol='filtered', stopWords = core)


df_all_owner = remover.transform(df_all_owner)

In [43]:
df_all_owner.show()

In [44]:
wc = df_all_owner.select('filtered').rdd.flatMap(lambda x: x.filtered).countByValue()

wcsorted = sorted(wc.items(), key = lambda x: x[1], reverse = True)
wcsorted

#### 4.Identify Creators With Cat And Dog Owners In The Audience
Simply filter out all dog and cat owners and aggregate the creators.

In [46]:
df_create = df_pets.select('creator_name').union(pred_all.filter(F.col('prediction') == 1).select('creator_name'))

df_create.createOrReplaceTempView('create_table')

#get counts

create_count = spark.sql("select distinct creator_name, count(*) as Number \
                          from create_table \
                          group by creator_name \
                          order by Number Desc")


In [47]:
create_count.show()

'Brave Wilderness', 'The Dodo' and 'Taylor Nicole Dean' are the top three creators with largest distinct cat or dog owner audience population