# Machine learning using PySpark

In [1]:
import os
os.environ["JAVA_HOME"] = "C:\Java\jdk1.8.0_202"
os.environ["SPARK_HOME"] = "C:\spark"

In [2]:
import findspark
findspark.init() 

In [3]:
# Loading Package
import pyspark
from pyspark import SparkContext

In [4]:
sc = SparkContext(master='local[2]')

In [5]:
sc

In [6]:
#creating spark session
from pyspark.sql import SparkSession

In [7]:
spark = SparkSession.builder.appName("BDProject").getOrCreate()

# Workflow
- Data Prep
- Feature Engineering
- Build Model
- Evaluate

In [8]:
#Load Dataset
df = spark.read.csv('YouTube.csv',header=True, inferSchema=True)

In [9]:
#df.show()

In [10]:
#print(df.columns)

In [11]:
#df.printSchema()

In [12]:
#df.describe().show()

In [13]:
#df.groupBy('ratings_disabled').count().show()

In [14]:
df.show(1)

+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+-------------+--------------+------------+-------------+--------+
|               Title| ChannelTitle|CategoryID|                Tags|ViewCount| Likes|CommentCount|CommentsDisabled|RatingsDisabled|Year|Month|Day|Trending_Year|Trending_Month|Trending_Day|Trending_Days|   Label|
+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+-------------+--------------+------------+-------------+--------+
|I left youtube fo...|jacksepticeye|        24|jacksepticeye|fun...|  2038853|353790|       40228|               0|              0|2020|    8| 11|         2020|             8|          12|            1|trending|
+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+------

In [15]:
import pyspark.ml

In [16]:
#dir(pyspark.ml)

In [17]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [18]:
print(df.columns)

['Title', 'ChannelTitle', 'CategoryID', 'Tags', 'ViewCount', 'Likes', 'CommentCount', 'CommentsDisabled', 'RatingsDisabled', 'Year', 'Month', 'Day', 'Trending_Year', 'Trending_Month', 'Trending_Day', 'Trending_Days', 'Label']


In [19]:
df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- ChannelTitle: string (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- Tags: string (nullable = true)
 |-- ViewCount: integer (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- CommentCount: integer (nullable = true)
 |-- CommentsDisabled: integer (nullable = true)
 |-- RatingsDisabled: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Trending_Year: integer (nullable = true)
 |-- Trending_Month: string (nullable = true)
 |-- Trending_Day: integer (nullable = true)
 |-- Trending_Days: integer (nullable = true)
 |-- Label: string (nullable = true)



In [20]:
df.dtypes

[('Title', 'string'),
 ('ChannelTitle', 'string'),
 ('CategoryID', 'int'),
 ('Tags', 'string'),
 ('ViewCount', 'int'),
 ('Likes', 'int'),
 ('CommentCount', 'int'),
 ('CommentsDisabled', 'int'),
 ('RatingsDisabled', 'int'),
 ('Year', 'int'),
 ('Month', 'int'),
 ('Day', 'int'),
 ('Trending_Year', 'int'),
 ('Trending_Month', 'string'),
 ('Trending_Day', 'int'),
 ('Trending_Days', 'int'),
 ('Label', 'string')]

In [21]:
import matplotlib as mpl
from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd

In [22]:
from pyspark.sql.functions import count

display(df.groupBy("Trending_Year").agg(count("*")).select("Trending_Year","count(1)").toPandas())


Unnamed: 0,Trending_Year,count(1)
0,2023.0,1800
1,2022.0,72998
2,,14
3,1.0,1
4,6.0,1
5,3.0,3
6,5.0,3
7,4.0,3
8,2020.0,27593
9,2.0,3


In [23]:
df.show(3)

+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+-------------+--------------+------------+-------------+--------+
|               Title| ChannelTitle|CategoryID|                Tags|ViewCount| Likes|CommentCount|CommentsDisabled|RatingsDisabled|Year|Month|Day|Trending_Year|Trending_Month|Trending_Day|Trending_Days|   Label|
+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+-------------+--------------+------------+-------------+--------+
|I left youtube fo...|jacksepticeye|        24|jacksepticeye|fun...|  2038853|353790|       40228|               0|              0|2020|    8| 11|         2020|             8|          12|            1|trending|
|TAXI CAB SLAYER K...|Eleanor Neale|        27|eleanor|neale|ele...|   236830| 16423|        1642|               0|              0|2020|    8| 11|      

In [24]:
#Label Encoding
LabelEncoding = StringIndexer(inputCol='Label', outputCol='Target').fit(df)
df = LabelEncoding.transform(df)

In [25]:
df.show(2)

+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+-------------+--------------+------------+-------------+--------+------+
|               Title| ChannelTitle|CategoryID|                Tags|ViewCount| Likes|CommentCount|CommentsDisabled|RatingsDisabled|Year|Month|Day|Trending_Year|Trending_Month|Trending_Day|Trending_Days|   Label|Target|
+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+-------------+--------------+------------+-------------+--------+------+
|I left youtube fo...|jacksepticeye|        24|jacksepticeye|fun...|  2038853|353790|       40228|               0|              0|2020|    8| 11|         2020|             8|          12|            1|trending|   0.0|
|TAXI CAB SLAYER K...|Eleanor Neale|        27|eleanor|neale|ele...|   236830| 16423|        1642|               0|         

In [26]:
#get the labels
LabelEncoding.labels

['trending', 'non-trending']

In [27]:
print(df.columns)

['Title', 'ChannelTitle', 'CategoryID', 'Tags', 'ViewCount', 'Likes', 'CommentCount', 'CommentsDisabled', 'RatingsDisabled', 'Year', 'Month', 'Day', 'Trending_Year', 'Trending_Month', 'Trending_Day', 'Trending_Days', 'Label', 'Target']


In [28]:
df2 = df.select('Label')

In [29]:
df2.printSchema()

root
 |-- Label: string (nullable = true)



In [30]:
#df2.toPandas().astype(float)

In [31]:
required_features = ['ViewCount', 'Likes', 'CommentCount']

In [32]:
#vector assembler
vec_assembler = VectorAssembler(inputCols=required_features, outputCol='features')

In [33]:
vec_df = vec_assembler.transform(df)

In [34]:
vec_df.show(3)

+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+-------------+--------------+------------+-------------+--------+------+--------------------+
|               Title| ChannelTitle|CategoryID|                Tags|ViewCount| Likes|CommentCount|CommentsDisabled|RatingsDisabled|Year|Month|Day|Trending_Year|Trending_Month|Trending_Day|Trending_Days|   Label|Target|            features|
+--------------------+-------------+----------+--------------------+---------+------+------------+----------------+---------------+----+-----+---+-------------+--------------+------------+-------------+--------+------+--------------------+
|I left youtube fo...|jacksepticeye|        24|jacksepticeye|fun...|  2038853|353790|       40228|               0|              0|2020|    8| 11|         2020|             8|          12|            1|trending|   0.0|[2038853.0,353790...|
|TAXI CAB SLAYER K...|Eleanor Neale|    

In [35]:
finalased_data = vec_df.select('features','Target')

In [36]:
finalased_data.show()

+--------------------+------+
|            features|Target|
+--------------------+------+
|[2038853.0,353790...|   0.0|
|[236830.0,16423.0...|   0.0|
|[2381688.0,146739...|   0.0|
|[613785.0,37567.0...|   0.0|
|[940036.0,87113.0...|   0.0|
|[1050143.0,89192....|   0.0|
|[1.1308046E7,6554...|   0.0|
|[1514614.0,156910...|   0.0|
|[277506.0,27420.0...|   0.0|
|[1123889.0,45803....|   0.0|
|[210345.0,12221.0...|   0.0|
|[122755.0,6072.0,...|   0.0|
|[876682.0,153585....|   0.0|
|[1213314.0,64247....|   0.0|
|[165064.0,3921.0,...|   0.0|
|[97824.0,2068.0,6...|   0.0|
|[9140911.0,296505...|   0.0|
|[115871.0,7599.0,...|   0.0|
|[149047.0,47715.0...|   0.0|
|[730082.0,40309.0...|   0.0|
+--------------------+------+
only showing top 20 rows



In [38]:
from pyspark.ml.regression import LinearRegression
train_data,test_data=finalased_data.randomSplit([0.75,0.25])
regressor=LinearRegression(featuresCol='features', labelCol='Target')
regressor=regressor.fit(train_data)

Py4JJavaError: An error occurred while calling o178.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 12.0 failed 1 times, most recent failure: Lost task 1.0 in stage 12.0 (TID 216) (DESKTOP-TG9LCS8 executor driver): org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$3133/24915159: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1230)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1231)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:396)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 31 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2297)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1183)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1177)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1222)
	at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
	at org.apache.spark.ml.regression.LinearRegression.trainWithNormal(LinearRegression.scala:452)
	at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:346)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:328)
	at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:185)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(StringIndexerModel$$Lambda$3133/24915159: (string) => double)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1230)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1231)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: StringIndexer encountered NULL value. To handle or skip NULLS, try setting StringIndexer.handleInvalid.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:396)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 31 more


# Train, Test split

In [None]:
train_df,test_df = vec_df.randomSplit([0.7,0.3])

In [None]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier

In [None]:
LR = LogisticRegression(featuresCol='features', labelCol='Target')

In [None]:
LR_Model = LR.fit(train_df)