In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
from pyspark.sql import SQLContext
import json
from classes import VideoTask1,TrendingDayTask1
from channel import ChannelTask4,VideoStatsTask4
from task5 import ChannelTask5,VideoDayTask5
from task6 import *

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("spark") \
    .getOrCreate()

In [16]:
glob_1 = []
glob_4 = []
glob_5 = []
categories_list = []

In [None]:
with open('CA_category_id.json', 'r') as f:
    categories = json.load(f)
for c in categories["items"]:
    categories_list.append(CategoryDefault(c["id"],c["title"]))

In [4]:
data = spark.read.csv('CAvideos.csv',inferSchema=True, header=True, multiLine=True)

In [5]:
data.describe()
data = data.withColumnRenamed("description\r", "description")

Find Top 10 videos that were amongst the trending videos for the highest number of days (it
doesn't need to be a consecutive period of time). You should also include information about
different metrics for each day the video was trending.


In [6]:
selected_videos = data.filter(data.video_error_or_removed == False).groupBy('title').count().orderBy("count",ascending=False).collect()
for row in selected_videos[:10]:
    r = data.filter(data.title == row.title).orderBy("trending_date").collect()
    t_d = []
    for _ in r:
        t_d.append(TrendingDayTask1(_.trending_date,_.views,_.likes,_.dislikes))
    video = VideoTask1(r[0].video_id,r[0].title,r[0].description,r[0].views,r[0].likes,r[0].dislikes,t_d)
    glob_1.append(video)

In [7]:
for v in glob_1:
    print(v.toJSON())

{
    "description": "Most Popular Violin Covers of Popular Songs 2018 || Best Instrumental Violin Covers 2018",
    "id": "spOzogi8Orw",
    "latest_dislikes": 118,
    "latest_likes": 499,
    "latest_view": 310449,
    "title": "Most Popular Violin Covers of Popular Songs 2018 || Best Instrumental Violin Covers 2018",
    "trending_days": [
        {
            "date": "17.01.12",
            "dislikes": 118,
            "likes": 499,
            "views": 310449
        },
        {
            "date": "17.24.12",
            "dislikes": 262,
            "likes": 1182,
            "views": 370440
        },
        {
            "date": "17.25.12",
            "dislikes": 301,
            "likes": 1387,
            "views": 459491
        },
        {
            "date": "17.28.12",
            "dislikes": 705,
            "likes": 4007,
            "views": 746602
        },
        {
            "date": "17.29.12",
            "dislikes": 1175,
            "likes": 7032,
        

Find what was the most popular category for each week (7 days slices). Popularity is
decided based on the total number of views for videos of this category. Note, to calculate it
you can’t just sum up the number of views. If a particular video appeared only once during
the given period, it shouldn’t be counted. Only if it appeared more than once you should
count the number of new views. For example, if video A appeared on day 1 with 100 views,
then on day 4 with 250 views and again on day 6 with 400 views, you should count it as 400 - 100 = 300. For our purpose, it will mean that this particular video was watched 300 times in
the given time period.

What were the 10 most used tags amongst trending videos for each 30days time period?
Note, if during the specified period the same video appears multiple times, you should count
tags related to that video only once. The result should have the following schema:

Show the top 20 channels by the number of views for the whole period. Note, if there are
multiple appearances of the same video for some channel, you should take into account
only the last appearance (with the highest number of views). The result should have the
following schema:

In [8]:
edited_data = data.orderBy("trending_date",ascending=False).dropDuplicates(subset = ['video_id'])
selected_videos = edited_data.groupBy('channel_title').sum("views").orderBy("sum(views)",ascending=False)
selected_videos = selected_videos.withColumnRenamed("sum(views)", "total_views").collect()
for video in selected_videos[:20]:
    r = edited_data.filter(edited_data.channel_title == video.channel_title).orderBy("trending_date").collect()
    start_date = r[0].trending_date
    end_date = r[-1].trending_date
    v_s = []
    for tmp in r:
        v_s.append(VideoStatsTask4(tmp.video_id,tmp.views))
    glob_4.append(ChannelTask4(video.channel_title,start_date,end_date,video.total_views,v_s))

In [9]:
for v in glob_4:
    print(v.toJSON())

{
    "channel_name": "T-Series",
    "end_date": "18.31.03",
    "start_date": "17.15.11",
    "total_views": 338603808,
    "videos_views": [
        {
            "video_id": "c64I9HNpiOY",
            "views": 11810521
        },
        {
            "video_id": "dZ0fwJojhrs",
            "views": 20956090
        },
        {
            "video_id": "M2q64UowX9g",
            "views": 6596028
        },
        {
            "video_id": "P8ZWk3S14ec",
            "views": 3846957
        },
        {
            "video_id": "xWi8nDUjHGA",
            "views": 27070757
        },
        {
            "video_id": "-NIlDHUYiRw",
            "views": 2437182
        },
        {
            "video_id": "ytsHyHD_vWs",
            "views": 1196094
        },
        {
            "video_id": "i8J0cR-PY9c",
            "views": 13281301
        },
        {
            "video_id": "OX-h7MtkeOI",
            "views": 1655544
        },
        {
            "video_id": "kZUxD_pEqgg",
  

Show the top 10 channels with videos trending for the highest number of days (it doesn't
need to be a consecutive period of time) for the whole period. In order to calculate it, you
may use the results from the question No1. The total_trending_days count will be a sum of
the numbers of trending days for videos from this channel. The result should have the
following schema:

In [11]:
video_to_trending_days_list = data.filter(data.video_error_or_removed == False).groupBy('title').count()
video_to_trending_days_list = video_to_trending_days_list.withColumnRenamed("count", "trending_days")
channel_to_trending_days = data.groupBy("channel_title").count()
channel_to_trending_days = channel_to_trending_days.withColumnRenamed("count", "trending_days").orderBy("trending_days",ascending=False).collect()
for channel in channel_to_trending_days[:10]:
    total_tr_days = 0
    videos = data.filter(data.video_error_or_removed == False).filter(data.channel_title == channel.channel_title).select("video_id","title").distinct().collect()
    selected_v = []
    for video in videos:
        t = video_to_trending_days_list.filter(video_to_trending_days_list.title == video.title).collect()[0].trending_days
        v = VideoDayTask5(video.video_id,video.title,t)
        total_tr_days += t
        selected_v.append(v)
    glob_5.append(ChannelTask5(channel.channel_title,total_tr_days,selected_v))

In [12]:
for v in glob_5[:10]:
    print(v.toJSON())

{
    "channel_name": "SET India",
    "total_trending_days": 193,
    "video_days": [
        {
            "trending_days": 1,
            "video_id": "IGU07H1yih0",
            "video_title": "Crime Patrol Dial 100 - \u0915\u094d\u0930\u093e\u0907\u092e \u092a\u0947\u091f\u094d\u0930\u094b\u0932 - Ep 659 - Full Episode - 23rd November, 2017"
        },
        {
            "trending_days": 1,
            "video_id": "RwokCE74vSc",
            "video_title": "Crime Patrol Dial 100 - Ep 689 - Full Episode - 11th January, 2018"
        },
        {
            "trending_days": 1,
            "video_id": "WbqfRsYfSjU",
            "video_title": "Crime Patrol Dial 100 - Ep 663 - Full Episode - 6th December, 2017"
        },
        {
            "trending_days": 3,
            "video_id": "WH6dQrQn-uc",
            "video_title": "Crime Patrol Dial 100 - Ep 730 - Full Episode - 9th March, 2018"
        },
        {
            "trending_days": 1,
            "video_id": "UdpRqQgbd-w",


Show the top 10 videos by the ratio of likes/dislikes for each category for the whole period.
You should consider only videos with more than 100K views. If the same video occurs
multiple times you should take the record when the ratio was the highest. The result should
have the following schema: