In [1]:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark = SparkSession \
    .builder \
    .appName("BIG DATA PROJECT") \
    .getOrCreate()

In [3]:
youtubeSchema = StructType([
  StructField("video_id", StringType(), False),
  StructField("trending_date", StringType(), False),
  StructField("title", StringType(), False),
  StructField("channel_title", StringType(), False),
  StructField("category_id", IntegerType(), False),
  StructField("publish_time", TimestampType(), False),
  StructField("tags", StringType(), False),
  StructField("views", LongType(), False),
  StructField("likes", LongType(), False),
  StructField("dislikes", LongType(), False),
  StructField("comment_count", LongType(), False),
])
print(youtubeSchema)

StructType(List(StructField(video_id,StringType,false),StructField(trending_date,StringType,false),StructField(title,StringType,false),StructField(channel_title,StringType,false),StructField(category_id,IntegerType,false),StructField(publish_time,TimestampType,false),StructField(tags,StringType,false),StructField(views,LongType,false),StructField(likes,LongType,false),StructField(dislikes,LongType,false),StructField(comment_count,LongType,false)))


In [4]:
YtData = spark.read.csv('INvideos.csv', 
                         schema=youtubeSchema, header=True)

In [5]:
YtData.show(5)

+-----------+-------------+--------------------+---------------+-----------+--------------------+--------------------+-------+-----+--------+-------------+
|   video_id|trending_date|               title|  channel_title|category_id|        publish_time|                tags|  views|likes|dislikes|comment_count|
+-----------+-------------+--------------------+---------------+-----------+--------------------+--------------------+-------+-----+--------+-------------+
|kzwfHumJyYc|     17.14.11|Sharry Mann: Cute...|Lokdhun Punjabi|          1|2017-11-12 12:20:...|"sharry mann|""sh...|1096327|33966|     798|          882|
|zUZ1z7FwLc8|     17.14.11|पीरियड्स के समय, ...|        HJ NEWS|         25|2017-11-13 05:43:...|"पीरियड्स के समय|...| 590101|  735|     904|            0|
|10L1hZ9qa58|     17.14.11|Stylish Star Allu...|           TFPC|         24|2017-11-12 15:48:...|"Stylish Star All...| 473988| 2011|     243|          149|
|N1vE8iiEg64|     17.14.11|Eruma Saani | Tam...|    Eruma Saani|

In [6]:
def count_missings(YtData,sort=True):
    df = YtData.select([count(when(isnan(c) | isnull(c), c)).alias(c) for (c,c_type) in YtData.dtypes if c_type not in ('timestamp', 'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

In [7]:
count_missings(YtData)

Unnamed: 0,count
video_id,0
trending_date,0
title,0
channel_title,0
category_id,0
tags,0
views,0
likes,0
dislikes,0
comment_count,0


In [8]:
MostLiked = YtData.agg({"likes": "max"}).collect()[0]
print(MostLiked)

Row(max(likes)=2912710)


In [9]:
MostLikedRow = YtData.where(YtData['likes'] == MostLiked[0])

In [10]:
a = MostLikedRow.select("title").take(1)
print(str(a[0]).split('\'')[1] + ' WITH ' + str(MostLiked[0]) + ' LIKES')

YouTube Rewind: The Shape of 2017 | #YouTubeRewind WITH 2912710 LIKES


In [11]:
MostDisliked = YtData.agg({"dislikes": "max"}).collect()[0]
MostDislikedRow = YtData.where(YtData['dislikes'] == MostDisliked[0])
a = MostDislikedRow.select("title").take(1)
print(str(a[0]).split('\'')[1] + ' WITH ' + str(MostDisliked[0]) + ' DISLIKES')

YouTube Rewind: The Shape of 2017 | #YouTubeRewind WITH 1545017 DISLIKES


In [12]:
MostViewed = YtData.agg({"views": "max"}).collect()[0]
MostViewedRow = YtData.where(YtData['views'] == MostViewed[0])
a = MostViewedRow.select("title").take(1)
print(str(a[0]).split('\'')[1] + ' WITH ' + str(MostViewed[0]) + ' VIEWS')

YouTube Rewind: The Shape of 2017 | #YouTubeRewind WITH 125432237 VIEWS


In [13]:
import pandas as pd

In [14]:
dictionary = pd.read_json('IndiaYTcategories.json')['items']

In [15]:
categories = {}
for category in dictionary:
    snippet = category['snippet']
    categories[int(category['id'])] = snippet['title']

In [16]:
print(categories)

{1: 'Film & Animation', 2: 'Autos & Vehicles', 10: 'Music', 15: 'Pets & Animals', 17: 'Sports', 18: 'Short Movies', 19: 'Travel & Events', 20: 'Gaming', 21: 'Videoblogging', 22: 'People & Blogs', 23: 'Comedy', 24: 'Entertainment', 25: 'News & Politics', 26: 'Howto & Style', 27: 'Education', 28: 'Science & Technology', 30: 'Movies', 31: 'Anime/Animation', 32: 'Action/Adventure', 33: 'Classics', 34: 'Comedy', 35: 'Documentary', 36: 'Drama', 37: 'Family', 38: 'Foreign', 39: 'Horror', 40: 'Sci-Fi/Fantasy', 41: 'Thriller', 42: 'Shorts', 43: 'Shows', 44: 'Trailers'}


In [17]:
CatDF = []
for key,value in categories.items():
    CatDF.append((key,value))
print(CatDF)

[(1, 'Film & Animation'), (2, 'Autos & Vehicles'), (10, 'Music'), (15, 'Pets & Animals'), (17, 'Sports'), (18, 'Short Movies'), (19, 'Travel & Events'), (20, 'Gaming'), (21, 'Videoblogging'), (22, 'People & Blogs'), (23, 'Comedy'), (24, 'Entertainment'), (25, 'News & Politics'), (26, 'Howto & Style'), (27, 'Education'), (28, 'Science & Technology'), (30, 'Movies'), (31, 'Anime/Animation'), (32, 'Action/Adventure'), (33, 'Classics'), (34, 'Comedy'), (35, 'Documentary'), (36, 'Drama'), (37, 'Family'), (38, 'Foreign'), (39, 'Horror'), (40, 'Sci-Fi/Fantasy'), (41, 'Thriller'), (42, 'Shorts'), (43, 'Shows'), (44, 'Trailers')]


In [18]:
CategoryDf = spark.createDataFrame(CatDF, ["category_id", "category_name"])

In [19]:
CategoryDf.show()

+-----------+--------------------+
|category_id|       category_name|
+-----------+--------------------+
|          1|    Film & Animation|
|          2|    Autos & Vehicles|
|         10|               Music|
|         15|      Pets & Animals|
|         17|              Sports|
|         18|        Short Movies|
|         19|     Travel & Events|
|         20|              Gaming|
|         21|       Videoblogging|
|         22|      People & Blogs|
|         23|              Comedy|
|         24|       Entertainment|
|         25|     News & Politics|
|         26|       Howto & Style|
|         27|           Education|
|         28|Science & Technology|
|         30|              Movies|
|         31|     Anime/Animation|
|         32|    Action/Adventure|
|         33|            Classics|
+-----------+--------------------+
only showing top 20 rows



In [20]:
YtData.dtypes

[('video_id', 'string'),
 ('trending_date', 'string'),
 ('title', 'string'),
 ('channel_title', 'string'),
 ('category_id', 'int'),
 ('publish_time', 'timestamp'),
 ('tags', 'string'),
 ('views', 'bigint'),
 ('likes', 'bigint'),
 ('dislikes', 'bigint'),
 ('comment_count', 'bigint')]

In [21]:
MaxViews = YtData.groupBy('category_id').max('views') \
    .sort(col("category_id"))
MaxLiked = YtData.groupBy('category_id').max('likes') \
    .sort(col("category_id"))
MaxDisliked = YtData.groupBy('category_id').max('dislikes') \
    .sort(col("category_id"))
MaxViews.show()
MaxLiked.show()
MaxDisliked.show()

+-----------+----------+
|category_id|max(views)|
+-----------+----------+
|          1|  38664360|
|          2|   1770864|
|         10|  43738208|
|         15|   2490776|
|         17|  20761480|
|         19|    287298|
|         20|  14589314|
|         22|   7301276|
|         23|  13241801|
|         24| 125432237|
|         25|  11510500|
|         26|  12447671|
|         27|   5217644|
|         28|  35276532|
|         29|    461408|
|         30|   7398655|
|         43|   2764124|
+-----------+----------+

+-----------+----------+
|category_id|max(likes)|
+-----------+----------+
|          1|    865914|
|          2|     51014|
|         10|   1804377|
|         15|    186307|
|         17|    871933|
|         19|      9292|
|         20|    321673|
|         22|    223455|
|         23|    793330|
|         24|   2912710|
|         25|    530022|
|         26|     68625|
|         27|    352049|
|         28|    402773|
|         29|     46548|
|         30|     77789|

In [22]:
MaxViews = MaxViews.join(CategoryDf, how = 'inner', on = 'category_id')
MaxLiked = MaxLiked.join(CategoryDf, how = 'inner', on = 'category_id')
MaxDisliked = MaxDisliked.join(CategoryDf, how = 'inner', on = 'category_id')

In [23]:
MaxViews=MaxViews.withColumnRenamed("max(views)","MaximumViews")
MaxLiked=MaxLiked.withColumnRenamed("max(likes)","MaximumLikes")
MaxDisliked=MaxDisliked.withColumnRenamed("max(dislikes)","MaximumDislikes")
#MaxViews1 = spark.createDataFrame(MaxViews,Schema1)
#MaxView1 = MaxView1.withColumn("Category_id", MaxView1("category_id").cast(StringType)).drop("category_id")
MaxViews.show()
MaxLiked.show()
MaxDisliked.show()

+-----------+------------+--------------------+
|category_id|MaximumViews|       category_name|
+-----------+------------+--------------------+
|          1|    38664360|    Film & Animation|
|          2|     1770864|    Autos & Vehicles|
|         10|    43738208|               Music|
|         15|     2490776|      Pets & Animals|
|         17|    20761480|              Sports|
|         19|      287298|     Travel & Events|
|         20|    14589314|              Gaming|
|         22|     7301276|      People & Blogs|
|         23|    13241801|              Comedy|
|         24|   125432237|       Entertainment|
|         25|    11510500|     News & Politics|
|         26|    12447671|       Howto & Style|
|         27|     5217644|           Education|
|         28|    35276532|Science & Technology|
|         30|     7398655|              Movies|
|         43|     2764124|               Shows|
+-----------+------------+--------------------+

+-----------+------------+-------------

In [24]:
VidNamesViews = YtData.select('title','views')
VidNamesLikes = YtData.select('title','likes')
VidNamesDislikes = YtData.select('title','dislikes')
#MaxViews = MaxViews.join(YtData, MaxViews.MaximumViews == YtData.views ,'inner')
VidNamesViews.show()
VidNamesLikes.show()
VidNamesDislikes.show()

+--------------------+--------+
|               title|   views|
+--------------------+--------+
|Sharry Mann: Cute...| 1096327|
|पीरियड्स के समय, ...|  590101|
|Stylish Star Allu...|  473988|
|Eruma Saani | Tam...| 1242680|
|why Samantha beca...|  464015|
|MCA (Middle Class...| 6106669|
|Daang ( Full Vide...| 5718766|
|Padmavati : Ek Di...|10588371|
|Chiranjeevi in Na...|  118223|
|New bike vs Old b...|  969030|
|Mehjabi Reveals H...|  632747|
|Jannat (Full Song...| 2348107|
|Renu Desai Gives ...|  156085|
|Peehu Srivastav P...|  472413|
|Rowi Na | Nadha V...|  836006|
|ஜெயலலிதாவின் உயில...|   89531|
|TYPES OF STUDENTS...|  344545|
|Tiger Zinda Hai |...|35885754|
|Meri Setting Karw...|  209599|
|The Trump Preside...| 2418783|
+--------------------+--------+
only showing top 20 rows

+--------------------+------+
|               title| likes|
+--------------------+------+
|Sharry Mann: Cute...| 33966|
|पीरियड्स के समय, ...|   735|
|Stylish Star Allu...|  2011|
|Eruma Saani | Tam...| 703

In [25]:
MaxViews = MaxViews.join(VidNamesViews, MaxViews.MaximumViews == VidNamesViews.views ,'inner')
MaxLiked = MaxLiked.join(VidNamesLikes, MaxLiked.MaximumLikes == VidNamesLikes.likes ,'inner')
MaxDisliked = MaxDisliked.join(VidNamesDislikes, MaxDisliked.MaximumDislikes == VidNamesDislikes.dislikes ,'inner')

In [26]:
MaxViews = MaxViews.drop('views')
MaxViews = MaxViews.drop('category_id')
MaxLiked = MaxLiked.drop('likes')
MaxLiked = MaxLiked.drop('category_id')
MaxDisliked = MaxDisliked.drop('dislikes')
MaxDisliked = MaxDisliked.drop('category_id')

In [27]:
MaxViews.show(MaxViews.count())
MaxLiked.show(MaxLiked.count())
MaxDisliked.show(MaxDisliked.count())

+------------+--------------------+--------------------+
|MaximumViews|       category_name|               title|
+------------+--------------------+--------------------+
|     7301276|      People & Blogs|$3 Fries Vs. $100...|
|    13241801|              Comedy|Why I'm not in Yo...|
|    11510500|     News & Politics|No VIP treatment ...|
|    11510500|     News & Politics|No VIP treatment ...|
|     1770864|    Autos & Vehicles|శ్రీదేవి కూతుర్ల ...|
|     5217644|           Education|    DO U KNO DA WEI?|
|    43738208|               Music|Taylor Swift - De...|
|      287298|     Travel & Events|Chinese Street Fo...|
|     2764124|               Shows|Vijay Television ...|
|     2764124|               Shows|Vijay Television ...|
|     7398655|              Movies|Golak Bugni Bank ...|
|    38664360|    Film & Animation|Tiger Zinda Hai |...|
|   125432237|       Entertainment|YouTube Rewind: T...|
|    12447671|       Howto & Style|17 Weird Ways To ...|
|    20761480|              Spo

In [28]:
YtData.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
category_id,37352,21.57659563075605,6.556593033620352,1,43
views,37352,1060477.645882416,3184932.0533807403,4024,125432237
likes,37352,27082.71765902763,97145.09513101683,0,2912710
dislikes,37352,1665.0819768687086,16076.174538768348,0,1545017
comment_count,37352,2676.9974298565003,14868.317129651421,0,827755


In [29]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['category_id','views','dislikes','comment_count'], outputCol = 'features')
YtDataAssem = vectorAssembler.transform(YtData)
YtDataAssem = YtDataAssem.select(['features', 'likes'])
YtDataAssem.show(3)

+--------------------+-----+
|            features|likes|
+--------------------+-----+
|[1.0,1096327.0,79...|33966|
|[25.0,590101.0,90...|  735|
|[24.0,473988.0,24...| 2011|
+--------------------+-----+
only showing top 3 rows



In [30]:
splits = YtDataAssem.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [31]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='likes')
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-83.2208034715,0.0183923798593,-1.15586775182,3.30289951329]
Intercept: 2358.094551189264


In [32]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 40147.733765
r2: 0.816478


In [33]:
train_df.describe().show()

+-------+------------------+
|summary|             likes|
+-------+------------------+
|  count|             26081|
|   mean|26663.125685364826|
| stddev| 93718.42537058167|
|    min|                 0|
|    max|           2912710|
+-------+------------------+



In [34]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","likes","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="likes",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+------------------+-----+--------------------+
|        prediction|likes|            features|
+------------------+-----+--------------------+
| 2707.054930656344|  859|[1.0,14752.0,38.0...|
| 2682.337295055174|  477|[1.0,20385.0,9.0,...|
|2821.9191012102465|  262|[1.0,24517.0,34.0...|
|3660.9943602797593| 1382|[1.0,29086.0,18.0...|
| 3425.017966621715| 2728|[1.0,30810.0,121....|
+------------------+-----+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.828493


In [35]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 43334.3


In [36]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 1
objectiveHistory: [0.0]
+-------------------+
|          residuals|
+-------------------+
| -2297.153715289941|
|-2128.8701423094612|
| -1820.329293256983|
|  -2750.57215604999|
|-1294.1335232072142|
|-1294.1335232072142|
| -2653.719759224321|
| -2147.853555712477|
| -2139.836979005954|
|-2796.8241756647744|
|-1574.0594916657265|
| -2586.563510667852|
|-2596.5858603400093|
|-2745.1722009465484|
| 2225.1991233312483|
| -2563.709639151063|
| -1700.525857089673|
| -2717.494219197316|
|  -2983.44819113152|
|-2226.3413853021348|
+-------------------+
only showing top 20 rows



In [37]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","likes","features").show()

+------------------+-----+--------------------+
|        prediction|likes|            features|
+------------------+-----+--------------------+
| 2707.054930656344|  859|[1.0,14752.0,38.0...|
| 2682.337295055174|  477|[1.0,20385.0,9.0,...|
|2821.9191012102465|  262|[1.0,24517.0,34.0...|
|3660.9943602797593| 1382|[1.0,29086.0,18.0...|
| 3425.017966621715| 2728|[1.0,30810.0,121....|
|2870.6487380833637|   58|[1.0,32913.0,34.0...|
| 5838.472911693218| 6284|[1.0,38345.0,96.0...|
|3086.0899998161603|  388|[1.0,40712.0,6.0,...|
|3241.4127848134503| 1247|[1.0,43554.0,14.0...|
| 3083.021040785819|   34|[1.0,44379.0,27.0...|
|3281.5810826037678|  426|[1.0,49114.0,22.0...|
|3149.9750285307487|   43|[1.0,49222.0,89.0...|
|3981.7886377023706| 1307|[1.0,50568.0,48.0...|
| 6338.968822016299| 5973|[1.0,52968.0,170....|
| 5741.646136870722| 6578|[1.0,53606.0,154....|
|3350.9718726554734|  997|[1.0,53892.0,78.0...|
|3465.0081692247845| 2018|[1.0,53915.0,34.0...|
|3624.4899679442947| 2247|[1.0,54702.0,1

In [38]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'likes')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluatorrmse = RegressionEvaluator(
    labelCol='likes', predictionCol="prediction", metricName="rmse")
rmse = dt_evaluatorrmse.evaluate(dt_predictions)
dt_evaluatorr2 = RegressionEvaluator(
    labelCol='likes', predictionCol="prediction", metricName="r2")
r2 = dt_evaluatorr2.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
print("r2: %f" % r2)

Root Mean Squared Error (RMSE) on test data = 61799.3
r2: 0.651193


In [39]:
dt_model.featureImportances

SparseVector(4, {0: 0.0242, 1: 0.1066, 2: 0.0206, 3: 0.8486})

In [40]:
YtData.take(1)

[Row(video_id='kzwfHumJyYc', trending_date='17.14.11', title='Sharry Mann: Cute Munda ( Song Teaser) | Parmish Verma | Releasing on 17 November', channel_title='Lokdhun Punjabi', category_id=1, publish_time=datetime.datetime(2017, 11, 12, 12, 20, 39), tags='"sharry mann|""sharry mann new song""|""sharry mann cute munda""|""sharry mann latest song""|""sharry mann punjabi song 2017""|""parmish verma""|""parmish verma new song""|""parmish verma sharry mann""|""parmish verma sharry mann new song""|""parmish verma cute munda""|""new punjabi song 2017""|""punjabi song 2017""|""parmish verma new song 2017""|""parmish verma latest song 2017""|""punjabi songs 2017"""', views=1096327, likes=33966, dislikes=798, comment_count=882)]

In [41]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'likes', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'likes', 'features').show(5)

+-----------------+-----+--------------------+
|       prediction|likes|            features|
+-----------------+-----+--------------------+
|2244.645294745949|  859|[1.0,14752.0,38.0...|
|2244.645294745949|  477|[1.0,20385.0,9.0,...|
|2244.645294745949|  262|[1.0,24517.0,34.0...|
|2611.635981292386| 1382|[1.0,29086.0,18.0...|
|2611.635981292386| 2728|[1.0,30810.0,121....|
+-----------------+-----+--------------------+
only showing top 5 rows



In [42]:
gbt_evaluatorrmse = RegressionEvaluator(
    labelCol="likes", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluatorrmse.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
gbt_evaluatorr2 = RegressionEvaluator(
    labelCol="likes", predictionCol="prediction", metricName="r2")
r2 = gbt_evaluatorr2.evaluate(gbt_predictions)
print("r2: %f" % r2)

Root Mean Squared Error (RMSE) on test data = 60311.1
r2: 0.667790
