In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
import numpy as np
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Youtube Predictive Title').master("local").getOrCreate()

In [3]:
#spark.conf.set("spark.executor.memory", "4g")

In [3]:
spark

#### USING SPARK TO LOAD THE DATA FROM LOCAL SYSTEM

In [4]:
sqlContext = SQLContext(spark)

In [5]:
data = sqlContext.read.format('csv').options(header='true', inferschema='true').load('/Users/anggapradiktas/Documents/JupyterNotebook/YT/titleYoutube_clean.csv')


In [6]:
data_select = data.select('videoTitle','viewGroup')

In [7]:
data_select.show(2, False)

+------------------------------------------------------------------+----------+
|videoTitle                                                        |viewGroup |
+------------------------------------------------------------------+----------+
|Di Balik Layar Hiduplah Hari Ini - Dialog Dini Hari | Kejar Tayang|0 - 100000|
|Interview with SISITIPSI                                          |0 - 100000|
+------------------------------------------------------------------+----------+
only showing top 2 rows



In [8]:
data_select.show()

+--------------------+---------------+
|          videoTitle|      viewGroup|
+--------------------+---------------+
|Di Balik Layar Hi...|     0 - 100000|
|Interview with SI...|     0 - 100000|
|SISITIPSI - Joni ...|     0 - 100000|
|Realitas - Langka...|     0 - 100000|
|Target Operasi - ...|100000 - 200000|
|NSI - Petaka Pandawa|     0 - 100000|
|Primetime News - ...|     0 - 100000|
|Primetime News: J...|     0 - 100000|
|360 - Pemuda Berg...|     0 - 100000|
|Primetime News - ...|     0 - 100000|
|Primetime News - ...|     0 - 100000|
|1000 Meter - Mena...|     0 - 100000|
|Economic Challeng...|     0 - 100000|
|Primetime News - ...|     0 - 100000|
|Realitas - Gaduh ...|     0 - 100000|
|Target Operasi - ...|     0 - 100000|
|NSI - Teka - Teki...|     0 - 100000|
|Primetime News - ...|     0 - 100000|
|I'm Possible - Ba...|     0 - 100000|
|Primetime News - ...|     0 - 100000|
+--------------------+---------------+
only showing top 20 rows



In [9]:
data_select.printSchema()

root
 |-- videoTitle: string (nullable = true)
 |-- viewGroup: string (nullable = true)



In [10]:
from pyspark.sql.functions import col

In [11]:
data_select.groupby('viewGroup') \
    .count() \
    .orderBy(col('count').desc()) \
    .show()

+-----------------+------+
|        viewGroup| count|
+-----------------+------+
|       0 - 100000|345040|
|  100000 - 200000| 18476|
|  200000 - 300000|  8614|
|  300000 - 400000|  5050|
|  400000 - 500000|  3508|
|  500000 - 600000|  2473|
|  600000 - 700000|  1898|
|  700000 - 800000|  1443|
|  800000 - 900000|  1188|
| 900000 - 1000000|   970|
|1000000 - 1100000|   758|
|1100000 - 1200000|   691|
|1200000 - 1300000|   549|
|1300000 - 1400000|   512|
|1400000 - 1500000|   422|
|1500000 - 1600000|   352|
|1600000 - 1700000|   337|
|1700000 - 1800000|   311|
|1800000 - 1900000|   295|
|1900000 - 2000000|   246|
+-----------------+------+
only showing top 20 rows



In [12]:
data_select_noNA = data_select.na.drop()

In [13]:
data_select_noNA.groupBy("videoTitle") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+--------------------+-----+
|          videoTitle|count|
+--------------------+-----+
|"ILC tvOne ""KPK ...|    8|
|"Mario Teguh Supe...|    6|
|"KECIL KECIL HEBA...|    4|
|"Dialog: ""Peneng...|    4|
|"Coffee Break - "...|    3|
|"Indonesia Lawyer...|    3|
|"Cover Story One ...|    3|
|"Cover Story One ...|    3|
|Asmara Selebriti ...|    1|
|Keberhasilan Lest...|    1|
|Palang Pintu Beta...|    1|
|Commuter Line - A...|    1|
|Festival Ramadan ...|    1|
|Opi Safarraz Lata...|    1|
|Sering Dijuliti,M...|    1|
|Lesti Belajar Men...|    1|
|Bentuk Perhatian ...|    1|
|Cerita Haru Soima...|    1|
|Highlight D'Acade...|    1|
|Highlight - Konse...|    1|
+--------------------+-----+
only showing top 20 rows



##### Model Pipeline

In [14]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression

In [15]:
regexTokenizer = RegexTokenizer(inputCol = 'videoTitle', outputCol = 'words')

In [16]:
import nltk
nltk.download("stopwords")

stopwordList = nltk.corpus.stopwords.words('indonesian')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/anggapradiktas/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [17]:
from pyspark.sql.functions import udf, col, lower, regexp_replace

In [18]:
df_clean = data.select((lower(regexp_replace('videoTitle', "[^a-zA-Z\\s]", "")).alias('videoTitle')), 'viewGroup')

In [19]:
df_clean.show()

+--------------------+---------------+
|          videoTitle|      viewGroup|
+--------------------+---------------+
|di balik layar hi...|     0 - 100000|
|interview with si...|     0 - 100000|
|sisitipsi  joni s...|     0 - 100000|
|realitas  langkah...|     0 - 100000|
|target operasi  c...|100000 - 200000|
| nsi  petaka pandawa|     0 - 100000|
|primetime news  m...|     0 - 100000|
|primetime news je...|     0 - 100000|
|  pemuda bergelar...|     0 - 100000|
|primetime news  d...|     0 - 100000|
|primetime news  m...|     0 - 100000|
| meter  menanti p...|     0 - 100000|
|economic challeng...|     0 - 100000|
|primetime news  m...|     0 - 100000|
|realitas  gaduh d...|     0 - 100000|
|target operasi  k...|     0 - 100000|
|nsi  teka  teki a...|     0 - 100000|
|primetime news  m...|     0 - 100000|
|im possible  bant...|     0 - 100000|
|primetime news  p...|     0 - 100000|
+--------------------+---------------+
only showing top 20 rows



In [20]:
df_words_token = regexTokenizer.transform(df_clean).select('words', 'viewGroup')
df_words_token.show()

+--------------------+---------------+
|               words|      viewGroup|
+--------------------+---------------+
|[di, balik, layar...|     0 - 100000|
|[interview, with,...|     0 - 100000|
|[sisitipsi, joni,...|     0 - 100000|
|[realitas, langka...|     0 - 100000|
|[target, operasi,...|100000 - 200000|
|[nsi, petaka, pan...|     0 - 100000|
|[primetime, news,...|     0 - 100000|
|[primetime, news,...|     0 - 100000|
|[pemuda, bergelar...|     0 - 100000|
|[primetime, news,...|     0 - 100000|
|[primetime, news,...|     0 - 100000|
|[meter, menanti, ...|     0 - 100000|
|[economic, challe...|     0 - 100000|
|[primetime, news,...|     0 - 100000|
|[realitas, gaduh,...|     0 - 100000|
|[target, operasi,...|     0 - 100000|
|[nsi, teka, teki,...|     0 - 100000|
|[primetime, news,...|     0 - 100000|
|[im, possible, ba...|     0 - 100000|
|[primetime, news,...|     0 - 100000|
+--------------------+---------------+
only showing top 20 rows



In [21]:
add_stopwords = ["http","https","amp","rt","t","c","the","im","i'm","di","ke","dari","-","kamu","aku","yang"] 

In [22]:
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)

In [23]:
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [24]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = 'viewGroup', outputCol = 'label')

In [25]:
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

In [26]:
pipelineFit = pipeline.fit(data_select_noNA)

In [27]:
dataset = pipelineFit.transform(data_select_noNA)

In [28]:
dataset.show()

+--------------------+---------------+--------------------+--------------------+--------------------+-----+
|          videoTitle|      viewGroup|               words|            filtered|            features|label|
+--------------------+---------------+--------------------+--------------------+--------------------+-----+
|Di Balik Layar Hi...|     0 - 100000|[di, balik, layar...|[balik, layar, hi...|(10000,[3,7,81,24...|  0.0|
|Interview with SI...|     0 - 100000|[interview, with,...|[interview, with,...|(10000,[351,5700]...|  0.0|
|SISITIPSI - Joni ...|     0 - 100000|[sisitipsi, -, jo...|[sisitipsi, joni,...|(10000,[3472],[1.0])|  0.0|
|Realitas - Langka...|     0 - 100000|[realitas, -, lan...|[realitas, langka...|(10000,[259,3592,...|  0.0|
|Target Operasi - ...|100000 - 200000|[target, operasi,...|[target, operasi,...|(10000,[506,537,6...|  1.0|
|NSI - Petaka Pandawa|     0 - 100000|[nsi, -, petaka, ...|[nsi, petaka, pan...|(10000,[4540,5864...|  0.0|
|Primetime News - ...|     0

In [29]:
(trainingData, testData) = dataset.randomSplit([0.8,0.2], seed = 100)

print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 316738
Test Dataset Count: 79289


#### Logistic Regression

In [30]:
logreg = LogisticRegression(maxIter=20, regParam = 0.3, elasticNetParam=0)

In [31]:
predictions = logreg.fit(trainingData)

In [32]:
predictions

LogisticRegressionModel: uid = LogisticRegression_b36ba974fed0, numClasses = 172, numFeatures = 10000

In [33]:
prediction_test = predictions.transform(testData)

In [34]:
prediction_test.filter(prediction_test['prediction'] == 0) \
.select('videoTitle', 'viewGroup','probability', 'label', 'prediction') \
.orderBy('probability', ascending = False) \
.show(n = 10, truncate = 30)

+------------------------------+----------+------------------------------+-----+----------+
|                    videoTitle| viewGroup|                   probability|label|prediction|
+------------------------------+----------+------------------------------+-----+----------+
|Sekolah Rusak Parah, Korban...|0 - 100000|[0.9850106757150424,0.00366...|  0.0|       0.0|
|Live report : Arus mudik 20...|0 - 100000|[0.9840827069961158,0.00450...|  0.0|       0.0|
|Live Report : sidang warga ...|0 - 100000|[0.9837374344958764,0.00464...|  0.0|       0.0|
|Live report : Arus mudik 20...|0 - 100000|[0.9832732272084416,0.00458...|  0.0|       0.0|
|Live Report: Kasus DBD meni...|0 - 100000|[0.983259535969943,0.004479...|  0.0|       0.0|
|Firza Husein Penuhi Panggil...|0 - 100000|[0.9831519295791378,0.00461...|  0.0|       0.0|
|Akibat Korsleting Listrik, ...|0 - 100000|[0.9828623222250696,0.00464...|  0.0|       0.0|
|Petugas Bea Cukai Juanda Te...|0 - 100000|[0.9827444237622174,0.00443...|  0.0|

In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol = 'prediction')
evaluator.evaluate(prediction_test)

0.8124697589318511

In [48]:
#predictions.save('model')

#### Logistic Regression using TF-IDF

In [49]:
from pyspark.ml.feature import HashingTF, IDF

In [50]:
hashingTF = HashingTF(inputCol = 'filtered', outputCol = 'rawFeatures', numFeatures=10000)

In [51]:
idf = IDF(inputCol = 'rawFeatures', outputCol = 'features', minDocFreq = 5, )

In [52]:
pipeline_idf = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

In [53]:
pipeline_fit_idf = pipeline_idf.fit(data_select_noNA)

In [54]:
dataset_idf = pipeline_fit_idf.transform(data_select_noNA)

In [55]:
dataset_idf.show(3)

+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-----+
|          videoTitle| viewGroup|               words|            filtered|         rawFeatures|            features|label|
+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-----+
|Di Balik Layar Hi...|0 - 100000|[di, balik, layar...|[balik, layar, hi...|(10000,[1313,1682...|(10000,[1313,1682...|  0.0|
|Interview with SI...|0 - 100000|[interview, with,...|[interview, with,...|(10000,[1274,1637...|(10000,[1274,1637...|  0.0|
|SISITIPSI - Joni ...|0 - 100000|[sisitipsi, -, jo...|[sisitipsi, joni,...|(10000,[1274,1731...|(10000,[1274,1731...|  0.0|
+--------------------+----------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 3 rows



In [56]:
(trainingData1, testData1) = dataset_idf.randomSplit([0.8, 0.2], seed = 100)

In [57]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

In [58]:
lrModel = lr.fit(trainingData1)

In [59]:
prediction_idf = lrModel.transform(testData1)

In [60]:
prediction_idf.filter(prediction_idf['prediction'] == 0) \
.select('videoTitle', 'viewGroup', 'probability', 'label', 'prediction') \
.orderBy('probability', ascending=False) \
.show(n=10, truncate=30)

+------------------------------+----------+------------------------------+-----+----------+
|                    videoTitle| viewGroup|                   probability|label|prediction|
+------------------------------+----------+------------------------------+-----+----------+
|KPK Tetapkan Wali kota Mala...|0 - 100000|[0.9835254567376847,0.00438...|  0.0|       0.0|
|Live report : Arus mudik 20...|0 - 100000|[0.9824952518861843,0.00476...|  0.0|       0.0|
|Dahnil Anzar Diperiksa Peny...|0 - 100000|[0.9823587398104608,0.00489...|  0.0|       0.0|
|Warga Resah Aparat Desa Lak...|0 - 100000|[0.9819520451460084,0.00514...|  0.0|       0.0|
|Live report : Arus mudik 20...|0 - 100000|[0.9819414494295952,0.00521...|  0.0|       0.0|
|MNC Peduli, Lotte Mart dan ...|0 - 100000|[0.9819024485891807,0.00512...|  0.0|       0.0|
|Sekolah Rusak Parah, Korban...|0 - 100000|[0.9818045002213764,0.00455...|  0.0|       0.0|
|Live Report : sidang warga ...|0 - 100000|[0.9815199339319132,0.00542...|  0.0|

In [61]:
evaluator_idf = MulticlassClassificationEvaluator(predictionCol = 'prediction')
evaluator_idf.evaluate(prediction_idf)

0.8117706314174596

In [64]:
#lrModel.save('model_idf')

#### Cross Validation

In [65]:
pipeline_CV = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

In [66]:
pipeline_CV_fit = pipeline_CV.fit(data_select_noNA)

In [67]:
dataset_CV = pipeline_CV_fit.transform(data_select_noNA)

In [68]:
(training_CV, test_CV) = dataset_CV.randomSplit([0.8,0.2], seed = 100)

In [69]:
logreg_CV = LogisticRegression(maxIter = 20, regParam = 0.3, elasticNetParam = 0)

In [70]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [71]:
paramGrid = (ParamGridBuilder() \
.addGrid(logreg_CV.regParam, [0.1, 0.3, 0.5]) \
.addGrid(logreg_CV.elasticNetParam, [0.0, 0.1, 0.2]) \
.build())

In [72]:
cv = CrossValidator(estimator = logreg_CV, estimatorParamMaps = paramGrid, evaluator = evaluator, numFolds=5)

In [None]:
cvModel = cv.fit(training_CV)

#### Load the Model

In [16]:
from pyspark.ml.classification import LogisticRegressionModel
model_1 = LogisticRegressionModel.load('model')

In [17]:
from pyspark.ml.classification import LogisticRegressionModel
model_2 = LogisticRegressionModel.load('model_idf')