In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
!echo spark.driver.memory 5g  > /content/spark-2.4.7-bin-hadoop2.7/conf/spark-defaults.conf


In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SQLContext 
from pyspark import SparkContext 
sc =SparkContext()
sqlContext = SQLContext(sc)



In [50]:
from pyspark.sql.functions import col, concat_ws , monotonically_increasing_id

from pyspark.ml.feature import CountVectorizer ,Tokenizer ,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from nltk.tokenize import WordPunctTokenizer
from bs4 import BeautifulSoup
import re
import numpy as np
from pyspark.sql.functions import *


### Data cleaning and preprocessing

In [5]:
toknizer = WordPunctTokenizer()
pat1 = r'@[A-Za-z0-9]+'
pat2 = r'https?://[A-Za-z0-9./]+'
combined_pat = r'|'.join((pat1, pat2))

def data_cleaner(text):
    soup = BeautifulSoup(text, 'lxml')
    souped_text = soup.get_text()
    stripped = re.sub(combined_pat, '', souped_text)
    try:
        clean = stripped.decode("utf-8-sig").replace(u"\ufffd", "?")
    except:
        clean = stripped
    letters = re.sub("[^a-zA-Z]", " ", clean)
    lowercase = letters.lower()
    words = toknizer.tokenize(lowercase)
    return (" ".join(words)).strip()

In [6]:
API_comment_cat = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/content/drive/Shareddrives/YT-data/catagorical.csv')


In [7]:
drop_list = ["likes"  ,"replies"]
API_comment_cat = API_comment_cat.select([column for column in API_comment_cat.columns if column not in drop_list])
API_comment_cat = API_comment_cat.withColumn("comment_text", concat_ws(",",col("comment_text")))

In [8]:
API_comment_cat = API_comment_cat.rdd.map(lambda x : ( x[0] , x[1] , data_cleaner(x[2])) ).toDF().selectExpr( "_1 as video_id","_2 as category_id" ,"_3 as comment_text")


In [9]:
API_comment_cat = API_comment_cat.withColumn("_id", monotonically_increasing_id())

## Model Training

In [10]:
tokenizer = Tokenizer(inputCol="comment_text", outputCol="tokens")

c_v = CountVectorizer(vocabSize=3000 , inputCol="tokens", outputCol="features")

indexer = StringIndexer(inputCol="category_id", outputCol="label")

LR_classifier = LogisticRegression(family="multinomial")

LR_classifier_pipeline = Pipeline(stages=[tokenizer, c_v , indexer ,LR_classifier])

model = LR_classifier_pipeline.fit(API_comment_cat.limit(240000))


In [11]:
train_predictions= model.transform(API_comment_cat.limit(240000))
lr_model_evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")

train_accuracy = lr_model_evaluator.evaluate(train_predictions)

In [None]:
print("Accuracy = %g" % (train_accuracy))

# Testing 

In [39]:
test_set = API_comment_cat.filter ((API_comment_cat._id > 240000 ) & (API_comment_cat._id < 300000) )
test_set = test_set.where(test_set.category_id.isin([24 , 22 , 17 , 10 , 28 , 1 , 2 , 15 , 17 , 18   ,20 ,23 ,25 ,26 ,27]))

In [138]:
test_predictions =  model.transform(test_set)

In [None]:
lr_model_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

test_accuracy = lr_model_evaluator.evaluate(test_predictions)

In [None]:
print("Accuracy = %g" % (test_accuracy))

## Video cateogry distribution

In [None]:
vid_prob= LR_predictions.rdd.map(lambda row : (row["video_id"],row["probability"]) ).reduceByKey( lambda x , y : x + y)
vid_count= LR_predictions.rdd.map(lambda row : (row["video_id"],1) ).reduceByKey( lambda x , y : x + y)

In [None]:
vid_prob_count = vid_prob.join(vid_count)
vid_cat_dist= vid_prob_count.map(lambda row : (row[0] , row[1][0]/ row[1][1] ))

## Comment cateogry distribution

In [None]:
comment_cat_dist = LR_predictions.rdd.map(lambda row : (row["comment_text"],row["probability"]) )
