In [1]:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import scala.util.matching
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.ml.feature.Normalizer

In [2]:
val country = "US"
val usDF = spark.read.format("csv").option("header", "true")
                .load("data/" + country + "videos_new.csv")

country = US
usDF = [video_id: string, trending_date: string ... 14 more fields]


[video_id: string, trending_date: string ... 14 more fields]

In [3]:
usDF.printSchema()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)



In [4]:
val usDF1 = usDF.select($"category_id"
                        ,$"comment_count",$"dislikes",$"views",$"likes")
                        .na.drop()


usDF1.printSchema()

val usDF2 = usDF1.withColumn("category_id",col("category_id").cast(DoubleType))
    .withColumn("comment_count",col("comment_count").cast(IntegerType))
    .withColumn("dislikes",col("dislikes").cast(IntegerType))
    .withColumn("views",col("views").cast(IntegerType))
    .withColumn("likes",col("likes").cast(IntegerType))
usDF2.show(5)

root
 |-- category_id: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)

+-----------+-------------+--------+-------+------+
|category_id|comment_count|dislikes|  views| likes|
+-----------+-------------+--------+-------+------+
|       22.0|        15954|    2966| 748374| 57527|
|       24.0|        12703|    6146|2418783| 97185|
|       23.0|         8181|    5339|3191434|146033|
|       24.0|         2146|     666| 343168| 10172|
|       24.0|        17518|    1989|2095731|132235|
+-----------+-------------+--------+-------+------+
only showing top 5 rows



usDF1 = [category_id: string, comment_count: string ... 3 more fields]
usDF2 = [category_id: double, comment_count: int ... 3 more fields]


[category_id: double, comment_count: int ... 3 more fields]

In [5]:
val numNan = usDF.count - usDF1.count

numNan = 7188


7188

In [6]:
val assembler = new VectorAssembler()
                .setInputCols(Array("comment_count",
                                    "dislikes","views",
                                    "category_id"))
                .setOutputCol("features")
                .transform(usDF2)
usDF2.printSchema()

root
 |-- category_id: double (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- dislikes: integer (nullable = true)
 |-- views: integer (nullable = true)
 |-- likes: integer (nullable = true)



assembler = [category_id: double, comment_count: int ... 4 more fields]


[category_id: double, comment_count: int ... 4 more fields]

In [7]:
assembler.select($"likes",$"features").show(5)

+------+--------------------+
| likes|            features|
+------+--------------------+
| 57527|[15954.0,2966.0,7...|
| 97185|[12703.0,6146.0,2...|
|146033|[8181.0,5339.0,31...|
| 10172|[2146.0,666.0,343...|
|132235|[17518.0,1989.0,2...|
+------+--------------------+
only showing top 5 rows



In [8]:
val normalizer= new Normalizer()
                .setInputCol("features")
                .setOutputCol("normfeatures")
                .setP(2.0)
                .transform(assembler)
normalizer.show(5)
normalizer.printSchema()

+-----------+-------------+--------+-------+------+--------------------+--------------------+
|category_id|comment_count|dislikes|  views| likes|            features|        normfeatures|
+-----------+-------------+--------+-------+------+--------------------+--------------------+
|       22.0|        15954|    2966| 748374| 57527|[15954.0,2966.0,7...|[0.02131320801961...|
|       24.0|        12703|    6146|2418783| 97185|[12703.0,6146.0,2...|[0.00525172527371...|
|       23.0|         8181|    5339|3191434|146033|[8181.0,5339.0,31...|[0.00256341245751...|
|       24.0|         2146|     666| 343168| 10172|[2146.0,666.0,343...|[0.00625336276642...|
|       24.0|        17518|    1989|2095731|132235|[17518.0,1989.0,2...|[0.00835860143342...|
+-----------+-------------+--------+-------+------+--------------------+--------------------+
only showing top 5 rows

root
 |-- category_id: double (nullable = true)
 |-- comment_count: integer (nullable = true)
 |-- dislikes: integer (nullable = 

normalizer = [category_id: double, comment_count: int ... 5 more fields]


[category_id: double, comment_count: int ... 5 more fields]

In [9]:
val Array(trainingData,testData)= normalizer.randomSplit(Array(0.7,0.3))

trainingData = [category_id: double, comment_count: int ... 5 more fields]
testData = [category_id: double, comment_count: int ... 5 more fields]


[category_id: double, comment_count: int ... 5 more fields]

In [10]:
val lr = new LinearRegression()
            .setLabelCol("likes")
            .setFeaturesCol("normfeatures")
            .setMaxIter(100)
            .setRegParam(0.3)
            .setElasticNetParam(0.8)

lr = linReg_df99f1fb5096


linReg_df99f1fb5096

In [11]:
val lrModel = lr.fit(trainingData)

lrModel = linReg_df99f1fb5096


linReg_df99f1fb5096

In [12]:
val resultDF = lrModel.transform(testData).select("likes", "prediction")
resultDF.show(20)

+-----+------------------+
|likes|        prediction|
+-----+------------------+
|    1| 51837.42297407612|
|    0|60323.581137150526|
|    0|60688.804457280785|
|    0| 60840.89080161974|
|  137| 60367.33139920235|
|87892| 61068.33094708249|
| 6697|61788.914836771786|
| 6801| 61800.41177633777|
| 6797|61799.544810611755|
| 6874| 61806.70893822983|
|    0| 58158.96904331818|
|   30| 60655.00321532413|
|    9| 60452.72181766853|
|    9|60480.230733368546|
|  144|   60924.186317496|
|   49| 60932.83336414397|
|   19| 61510.39407872036|
|   38| 60960.37760228664|
|   30|  61495.9846814014|
|   55| 61699.45849228278|
+-----+------------------+
only showing top 20 rows



resultDF = [likes: int, prediction: double]


[likes: int, prediction: double]

In [13]:
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
resultDF.describe().show

numIterations: 12
objectiveHistory: [0.5,0.4969434450076498,0.49618552892904,0.4959383063420207,0.49571328696247896,0.49567620592694717,0.495621326877272,0.49560247841825567,0.4956023176438449,0.495602315636638,0.4956023155696053,0.4956023155659021]
+-------------------+
|          residuals|
+-------------------+
| -51014.02485281974|
|-60671.085639152676|
|-60685.679068569094|
| -60691.23993974924|
|-60692.955199711025|
| -60694.05234403163|
|-60831.681847169995|
|-60835.429747618735|
|-60838.604893028736|
|-60600.101575389504|
| -60600.79913348332|
|-60244.194695126265|
|  -60689.0846905224|
| -60841.47247865051|
| -60835.28746089712|
| -60828.41282691434|
| 30538.694068849087|
| 33936.689735747874|
| -55237.61818137765|
|-55009.166823633015|
+-------------------+
only showing top 20 rows

RMSE: 226901.0115856178
r2: 0.008795770530907299
+-------+-----------------+------------------+
|summary|            likes|        prediction|
+-------+-----------------+------------------+
|  cou

trainingSummary = org.apache.spark.ml.regression.LinearRegressionTrainingSummary@962473e


org.apache.spark.ml.regression.LinearRegressionTrainingSummary@962473e