<a href="https://colab.research.google.com/github/ExxLiang193/DataAnalysisPractice/blob/master/youtube_trending_pred.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Developed through Google Colab.

Please download dataset from https://www.kaggle.com/datasnaek/youtube-new/data#CAvideos.csv before proceeding.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
import json
from datetime import datetime, timedelta
from collections import Counter

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
from time import perf_counter
from functools import wraps, partial
import  pyspark.sql.functions as F

import warnings
warnings.filterwarnings('ignore')
# warnings.filterwarnings(action='once')
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

In [0]:
def parse_category_data():
    # Load raw category data on music
    with open('CA_category_id.json', 'r') as data:
        ca_cat_info = json.load(data)

    # Parse raw category data into ids, title, and assignable status
    with open('CA_category_id.csv', 'w') as outf:
        for cat in ca_cat_info['items']:
            outf.write(','.join([cat['id'], cat['snippet']['title'],
                                str(cat['snippet']['assignable'])]) + '\n')
parse_category_data()

In [0]:
def measure_time(f):
    @wraps(f)
    def wrapper(*args, **kwargs):
        t0 = perf_counter()
        res = f(*args, **kwargs)
        print("--{}-- process took: {} sec".format(f.__name__,
                                                   round(perf_counter() - t0, 5)))
        return res
    return wrapper

In [0]:
TIME_FEATURE_COLS = [
    'publish_hour', 'publish_weekday', 'days_before_trending', 'trending_day_of_year'
]

def time_features(row):
    publish_datetime = pd.to_datetime(row['publish_time'])
    trending_date = pd.to_datetime(row['trending_date'], format='%y.%d.%m')

    """
    Calculate publish hour of day.
    """
    row['publish_hour'] = row['publish_time'].timetuple().tm_hour

    row['publish_weekday'] = row['publish_time'].timetuple().tm_wday

    """
    Calculate number of days before on trending.
    """
    row['days_before_trending'] = (trending_date.date() - publish_datetime.date()).days
    
    """
    Convert trending date to day of year.
    """
    row['trending_day_of_year'] = trending_date.timetuple().tm_yday
    
    return row

In [0]:
TEXT_FEATURE_COLS = [
    'title_length', 'desc_length', 'title_cap_ratio',
    'title_sep_count', 'title_special_char_count',
    'desc_url_count', 'sub_request', 'desc_cap_ratio',
    'mentions_music', 'mentions_game', 'tag_count'
]

SEPARATORS = ['|', '/', '-']
SEP_CAP = 8
SPECIAL_CHARS = ['?', '!']
SPECIAL_CHARS_CAP = 10
MAX_COMMON = 200

def char_counter(phrase, chars, limiter):
    count = 0
    for c in phrase:
        if c in chars:
            count += 1
    return min(count, limiter) if limiter else count


def text_features(row):
    """
    Calculate title length.
    """
    row['title_length'] = len(row['title'])

    """
    Calculate description length.
    """
    row['desc_length'] = len(row['description'] or '')

    """
    Calculate proportion of uppercase letters in title.
    """
    row['title_cap_ratio'] = \
        sum(1 for c in row['title'] if c.isupper()) / len(row['title'])

    """
    Count number of separators (|, /, -). Generally,
    separators are used to separate important contributors,
    thus affecting click rate.
    """
    row['title_sep_count'] = \
        char_counter(row['title'], chars=SEPARATORS, limiter=SEP_CAP)

    """
    Count number of special characters (!,?)
    """
    row['title_special_char_count'] = \
        char_counter(row['title'], chars=SPECIAL_CHARS,
                     limiter=SPECIAL_CHARS_CAP)

    """
    Count number of urls in description of video. We can assume that
    having 'http' is one occurrence of a url.
    """
    row['desc_url_count'] = row['description'].count('http')

    """
    Creator's video's description requested to subscribe.
    """
    row['sub_request'] = int('subscribe' in row['description'].lower())

    """
    Calculate proportion of uppercase letters in description.
    """
    if row['description']:
        row['desc_cap_ratio'] = \
            sum(1 for c in row['description'] if c.isupper()) \
            / len(row['description'])
    else:
        row['desc_cap_ratio'] = 0.0

    """
    Video's description or title mentions music.
    """
    row['mentions_music'] = int('music' in row['title'].lower() or
                                'music' in row['description'].lower())

    """
    Video's description or title mentions gaming.
    """
    row['mentions_game'] = int('game' in row['title'].lower() or
                               'game' in row['description'].lower())

    """
    Get number of tags in video.
    """
    row['tag_count'] = row['tags'].count('|') + 1

    return row

In [0]:
BOOL_COLS = ['comments_disabled', 'ratings_disabled', 'video_error_or_removed']

def bool_features(row):
    """
    Transform boolean columns into binary variables.
    """
    for col in BOOL_COLS:
        row[col] = int(row[col] == 'True')
    return row

In [0]:
DROP_COLS = ['video_id', 'thumbnail_link']

@measure_time
def preprocess(data):
    data = data.withColumnRenamed('description\r', 'description')
    data = data.fillna('', subset=['tags', 'description'])

    # Drop columns that don't provide useful info
    return data.drop(*DROP_COLS)


FEATURE_MAPPERS = [
    time_features, text_features, bool_features
]

def row_mapper(row):
    row = row.asDict()
    for mapper in FEATURE_MAPPERS:
        row = mapper(row)
    return tuple(row[key] for key in FEATURE_MAPPED_COLS)

@measure_time
def map_features(data):
    return data.rdd.map(row_mapper).toDF(FEATURE_MAPPED_COLS)

In [0]:
MAX_TAG_RANK = 20
INDEX = ['channel_title', 'title']


def reduce_multilevel_cols(cols):
    return cols.map('_'.join).str.strip('_')


@measure_time
def aggregate_features(pdata):
    """
    Get average tag rank based on aggregation of tags across dataset.
    """
    tag_counter = Counter()
    tag_only_df = pdata[INDEX + ['tags']].drop_duplicates()
    tag_only_df['tags'] = tag_only_df['tags'].map(
        lambda tags: [tag.strip('"') for tag in tags.split('|')]
    )
    for tag_list in tag_only_df['tags']:
        for tag in tag_list:
            tag_counter[tag] += 1
    tag_rank = {pair[0]: index for index, pair in enumerate(tag_counter.most_common(MAX_TAG_RANK))}
    tag_only_df['tag_rank'] = tag_only_df['tags'].map(
        lambda tags: np.mean([tag_rank[tag] for tag in tags if tag in tag_rank])
    )
    tag_only_df['tag_rank'].fillna(MAX_TAG_RANK, inplace=True)
    tag_only_df.drop(['tags'], axis=1, inplace=True)


    pdata['creator_videos_trending'] = 1
    agg_features = pd.pivot_table(pdata, index=INDEX,
                                  aggfunc={'creator_videos_trending': len,
                                           'trending_day_of_year': [min, max],
                                           'days_before_trending': [min, max],
                                           'views': [min, max],
                                           'likes': [min, max],
                                           'dislikes': [min, max],
                                           'comment_count': [min, max]})
    agg_features.columns = reduce_multilevel_cols(agg_features.columns)

    agg_features['days_on_trending'] = \
        agg_features['days_before_trending_max'] - agg_features['days_before_trending_min'] + 1

    agg_features['views_growth'] = agg_features['views_max'] - agg_features['views_min']
    agg_features['likes_growth'] = agg_features['likes_max'] - agg_features['likes_min']
    agg_features['dislikes_growth'] = agg_features['dislikes_max'] - agg_features['dislikes_min']
    agg_features['comments_growth'] = \
        agg_features['comment_count_max'] - agg_features['comment_count_min']


    agg_features.drop(['days_before_trending_max', 'views_max',
                       'likes_max', 'dislikes_max', 'comment_count_max'], axis=1, inplace=True)
    agg_features.rename(columns={'days_before_trending_min': 'days_before_trending',
                                 'creator_videos_on_trending_len': 'creator_videos_trending',
                                 'views_min': 'init_views',
                                 'likes_min': 'init_likes',
                                 'dislikes_min': 'init_dislikes',
                                 'comment_count_min': 'init_comments'},
                        inplace=True)
    agg_features.reset_index(inplace=True)

    return pd.merge(tag_only_df, agg_features, on=INDEX, how='inner')
    

In [0]:
INDEX = ['channel_title', 'title']

def generate_circular_repr(data, category, cycle_length):
    s = list(data[category].map(
        lambda entry: [np.sin(entry * np.pi / (cycle_length / 2.)),
                       np.cos(entry * np.pi / (cycle_length / 2.))]
    ))
    data[[category + '_x', category + '_y']] = pd.DataFrame(s)
    data.drop(category, axis=1, inplace=True)

    return data

@measure_time
def post_processing(agg_features, ca_data):
    encoded_categories = pd.get_dummies(ca_data['category_id'])
    for col in encoded_categories.columns:
        ca_data['category_' + str(col)] = encoded_categories[col]

    ca_data = ca_data.drop(['views', 'likes', 'dislikes', 'trending_date',
                            'comment_count', 'tags', 'category_id', 'description',
                            'creator_videos_trending', 'trending_day_of_year',
                            'days_before_trending', 'publish_time'], axis=1).drop_duplicates()
    
    ca_data = pd.merge(ca_data, agg_features, how='inner', on=INDEX)

    ca_data = generate_circular_repr(ca_data, 'publish_hour', 24)
    ca_data = generate_circular_repr(ca_data, 'publish_weekday', 7)
    ca_data = generate_circular_repr(ca_data, 'trending_day_of_year_min', 365)
    ca_data = generate_circular_repr(ca_data, 'trending_day_of_year_max', 365)
    
    return ca_data.drop(INDEX, axis=1)

In [0]:
from pyspark.sql.types import (
    StructType, StructField, StringType, FloatType,
    DoubleType, IntegerType, BooleanType, DateType, TimestampType)

RAW_COLS = {
    'video_id': StringType(),
    'trending_date': StringType(),
    'title': StringType(),
    'channel_title': StringType(),
    'category_id': IntegerType(),
    'publish_time': TimestampType(),
    'tags': StringType(),
    'views': DoubleType(),
    'likes': DoubleType(),
    'dislikes': DoubleType(),
    'comment_count': IntegerType(),
    'thumbnail_link': StringType(),
    'comments_disabled': BooleanType(),
    'ratings_disabled': BooleanType(),
    'video_error_or_removed': BooleanType(),
    'description': StringType()
}

PREPROCESSED_COLS = list(set(RAW_COLS) - set(DROP_COLS))

FEATURE_MAPPED_COLS = PREPROCESSED_COLS + TIME_FEATURE_COLS + TEXT_FEATURE_COLS

@measure_time
def workflow():
    # Load raw video trending data from Youtube
    @measure_time
    def read_raw_data():
        return spark.read.load('CAvideos.csv',
                               format='com.databricks.spark.csv',
                               header=True, multiLine=True,
                               schema=StructType([StructField(key, dtype, False)
                                                  for key, dtype in RAW_COLS.items()]))

    ca_data = read_raw_data()
    ca_data = preprocess(ca_data)
    ca_data = map_features(ca_data)
    ca_data = ca_data.toPandas()
    agg_features = aggregate_features(ca_data)
    ca_features = post_processing(agg_features, ca_data)
    return ca_features


result = workflow()

In [0]:
LABELS = ['views_growth']

def training(stuff):
    features = stuff.copy()
    labels = np.array(features[LABELS])
    features.drop(LABELS, axis=1, inplace=True)
    feature_list = list(features.columns)
    features = np.array(features)

    from sklearn.model_selection import train_test_split
    train_features, test_features, train_labels, test_labels = \
        train_test_split(features, labels, test_size = 0.25)
    
    print('Training Features Shape:', train_features.shape)
    print('Training Labels Shape:', train_labels.shape)
    print('Testing Features Shape:', test_features.shape)
    print('Testing Labels Shape:', test_labels.shape)

    from sklearn.ensemble import RandomForestRegressor
    rf = RandomForestRegressor(n_estimators = 100)
    rf.fit(train_features, train_labels)

    # Use the forest's predict method on the test data
    predictions = rf.predict(test_features)
    # Calculate the absolute errors
    errors = abs(predictions - test_labels)

    # Calculate mean absolute percentage error (MAPE)
    mape = 100 * (errors / test_labels)
    # Calculate and display accuracy
    accuracy = 100 - np.mean(mape)
    print('Accuracy:', round(accuracy, 2), '%.')

training(result)