### Setup

Let's setup Spark on  Colab environment.  Run the cell below!

In [None]:
!pip install pyspark
!apt update
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:5 http://security.ubuntu.com/ubuntu bionic-security InRelease
Hit:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:9 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:10 http://archive.ubuntu.com/ubuntu bionic-backports InRelease
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/graphics-drivers/ppa/ubun

Now we import some of the libraries usually needed by our workload.





In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf


Mounting Google drive to access data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
cur_path = "/content/drive/My Drive/Colab Notebooks/big data project/"
os.chdir(cur_path)
!ls

BR_videos_data.csv  FR_videos_data.csv	JP_videos_data.csv  RU_videos_data.csv
CA_videos_data.csv  IN_videos_data.csv	MX_videos_data.csv  US_videos_data.csv


Let's initialize the Spark context.

In [None]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

You can easily check the current version and get the link of the web interface. In the Spark UI, you can monitor the progress of your job and debug the performance bottlenecks (if your Colab is running with a **local runtime**).

In [None]:
spark

### Data Preprocessing

For convenience, given that the dataset is small, we load the spark dataframe

In [None]:
#video_data=pd.read_csv('US_videos_data.csv/US_videos_data.csv')
df = spark.read.csv('US_videos_data.csv/US_videos_data.csv',header=True)



Fixing Column types

In [None]:
df=df.withColumn("view_count", df["view_count"].cast("int"))\
.withColumn("likes", df["likes"].cast("int"))\
.withColumn("dislikes", df["dislikes"].cast("int"))\
.withColumn("comment_count", df["comment_count"].cast("int"))\
.withColumn("trending_date", to_timestamp(df["trending_date"], "yy.dd.MM"))\
.withColumn("time_published", to_timestamp(df["publishedAt"], "yyyy-MM-dd"))\
.withColumn("categoryId", df["categoryId"].cast('int'))\
.select('video_id','view_count','title','tags','channelTitle','categoryId','likes','dislikes','comment_count')


In [None]:
## Data for modelling
Regression_data=df

Tokenizing the column 'title'

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
tokenizer = Tokenizer(inputCol="title", outputCol="Tokens_title")
df = tokenizer.transform(df)

In [None]:
df.limit(3).show()

Removing Stop Words from Title

In [None]:
from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="Tokens_title", outputCol="filtered_tokens_title")
df=remover.transform(df)

In [None]:
df.limit(3).show()

#### XG boost model to predict view count on a video

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
Regression_data=Regression_data.filter(Regression_data['likes'].isNotNull())

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
vec_assembler = VectorAssembler(inputCols = ['categoryId','likes','dislikes','comment_count'], outputCol='features')

final_data = vec_assembler.transform(Regression_data)
final_data=final_data.select('view_count','features')

(trainingData, testData) = final_data.randomSplit([0.8, 0.2])

# Train a GBT model.
gbt = GBTRegressor(labelCol='view_count', featuresCol="features", maxIter=8)
gbt_model= gbt.fit(trainingData)

# Make predictions.
predictions = gbt_model.transform(testData)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="view_count", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)



In [None]:
predictions.select("prediction", "view_count", "features").show(5)

#### Using Generalized Linear method for Regression

In [None]:
from pyspark.ml.regression import GeneralizedLinearRegression

In [None]:
glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3,labelCol='view_count',featuresCol='features')
model= glr.fit(trainingData)
predictions = model.transform(testData)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="view_count", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
! pip install spark-nlp==3.0.2

#### Trying out models with Video Titles embedded using Word2Vec

In [None]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.embeddings import *
from pyspark.ml.feature import Word2Vec

In [None]:
Regression_data.columns

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
tokenizer = Tokenizer(inputCol="title", outputCol="Tokens_title")
Regression_data = tokenizer.transform(Regression_data)


from pyspark.ml.feature import StopWordsRemover
remover = StopWordsRemover(inputCol="Tokens_title", outputCol="filtered_tokens_title")
Regression_data=remover.transform(Regression_data)


word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="filtered_tokens_title", outputCol="title_embedding")
model = word2Vec.fit(Regression_data)

embedded_data = model.transform(Regression_data)

In [None]:
embedded_data.show()

Creating the new feature vector with title embeddings

In [None]:
vec_assembler = VectorAssembler(inputCols = ['categoryId','likes','dislikes','comment_count','title_embedding'], outputCol='features')

final_data = vec_assembler.transform(embedded_data)
final_data=final_data.select('view_count','features')

(trainingData, testData) = final_data.randomSplit([0.8, 0.2])

In [None]:
gbt = GBTRegressor(labelCol='view_count', featuresCol="features", maxIter=8)
gbt_model= gbt.fit(trainingData)

# Make predictions.
predictions = gbt_model.transform(testData)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="view_count", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

In [None]:
gbt_model.featureImportances