# YouTube - viz Data Processing

In [1]:
import os

from functools import partial

In [2]:
os.environ["ARROW_PRE_0_15_IPC_FORMAT"] = "1"

In [3]:
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = (10,6)
plt.rcParams['font.size'] = 18
plt.style.use('fivethirtyeight')

In [4]:
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType, StructType, StructField, StringType, IntegerType, ArrayType, LongType

In [5]:
spark = (
    SparkSession.builder.master("local[24]")
    .appName("YouTubeVizLocal")
    .config("spark.driver.memory", "16g")
    .config("spark.executor.memory", "32g")
    .getOrCreate()
)

In [6]:
big_df = spark.read.json('/dlabdata1/manoel/useful_stuff_all/')

In [7]:
big_df.printSchema()

root
 |-- categories: string (nullable = true)
 |-- channel_id: string (nullable = true)
 |-- crawl_date: string (nullable = true)
 |-- description: string (nullable = true)
 |-- dislike_count: long (nullable = true)
 |-- display_id: string (nullable = true)
 |-- duration: long (nullable = true)
 |-- like_count: long (nullable = true)
 |-- tags: string (nullable = true)
 |-- title: string (nullable = true)
 |-- upload_date: string (nullable = true)
 |-- view_count: long (nullable = true)



In [9]:
big_df.show(vertical=True, truncate=False)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 categories    | Howto & Style    

## Preprocessing
Rescale weights and filter to interesting time range

In [8]:
df = spark.read.parquet('/scratch/tvaucher/data/helper.parquet.gzip')

In [10]:
cats = df.select('categories').distinct().collect()

[Row(categories='Gaming'),
 Row(categories='Entertainment'),
 Row(categories='Education, Science & Tech'),
 Row(categories='Howto & Style'),
 Row(categories='News & Politics'),
 Row(categories='Music'),
 Row(categories='Others')]

In [11]:
all_cat = sorted([x['categories'] for x in cats])

In [13]:
start_date = '2008-01-07'
end_date = '2019-06-30'

In [14]:
min_weight = df.agg(F.min('weight')).collect()[0]['min(weight)']

In [15]:
upper_quantile = df.stat.approxQuantile('weight', [0.75], 0.5)[0]

In [29]:
@F.pandas_udf("integer", F.PandasUDFType.SCALAR)
def scale(x):
    return (x.clip(0, upper_quantile) / (min_weight)).round().astype(int)

In [30]:
scaled = df.select('categories', 'view_count', 'display_id', 'duration', 'like_count', 'dislike_count',
                   scale('weight').alias('weight'),
                   'upload_date')\
           .filter(F.col('upload_date').between(start_date, end_date))\
           .cache()

In [53]:
import numpy as np

In [130]:
@F.pandas_udf("integer", F.PandasUDFType.GROUPED_AGG)
def _sum(x):
    return x.sum() // 1e6

@F.pandas_udf("integer", F.PandasUDFType.GROUPED_AGG)
def _count(x):
    return len(x)

In [131]:
def histogram(values, bins=20):
    y, x = np.histogram(values, bins=bins)
    return [y.tolist(), x.tolist()]

In [141]:
hist_duration = F.pandas_udf(partial(histogram, bins=np.linspace(0, 1800, 31)), ArrayType(ArrayType(IntegerType())), F.PandasUDFType.GROUPED_AGG)
hist_v_count = F.pandas_udf(partial(histogram, bins=np.logspace(0, 32, 33, base=2)), ArrayType(ArrayType(LongType())), F.PandasUDFType.GROUPED_AGG)
hist_l_count = F.pandas_udf(partial(histogram, bins=np.logspace(0, 25, 26, base=2)), ArrayType(ArrayType(LongType())), F.PandasUDFType.GROUPED_AGG)
hist_d_count = F.pandas_udf(partial(histogram, bins=np.logspace(0, 25, 26, base=2)), ArrayType(ArrayType(LongType())), F.PandasUDFType.GROUPED_AGG)

In [145]:
w = Window().partitionBy('date', 'categories').orderBy(F.desc('view_count'))

In [161]:
top_v = (scaled.withColumn('date', F.date_trunc('week', 'upload_date'))
               .select('date', 'categories', 'display_id', 'view_count', 'like_count', 'dislike_count', 'duration', F.rank().over(w).alias('rank'))
               .filter("rank <= 5")
               .drop("rank")
               .groupBy("date", "categories")
               .agg(F.collect_list(F.struct('display_id', 'view_count', 'like_count', 'dislike_count', 'duration')).alias('best_videos'))
               .orderBy(F.desc('date'), 'categories')
        )

In [163]:
week_score = scaled.withColumn('date', F.date_trunc('week', 'upload_date'))\
      .withColumn('score', scaled.view_count * scaled.weight)\
      .groupBy('date', 'categories')\
      .agg(F.struct(_sum('score').alias('score'),
                    _count("display_id").alias('count'),
                    hist_v_count('view_count').alias('view_count'),
                    hist_l_count('like_count').alias('like_count'),
                    hist_d_count('dislike_count').alias('dislike_count')
                   ).alias('data'))\
      .join(top_v, on=['date', 'categories'])\
      .groupBy('date')\
      .pivot('categories', all_cat)\
      .agg(F.struct(F.first('data').alias('data'), F.first('best_videos').alias('best_videos')))\
      .orderBy('date')

DataFrame[date: timestamp, Education, Science & Tech: struct<data:struct<score:int,count:int,view_count:array<array<bigint>>,like_count:array<array<bigint>>,dislike_count:array<array<bigint>>>,best_videos:array<struct<display_id:string,view_count:bigint,like_count:bigint,dislike_count:bigint,duration:bigint>>>, Entertainment: struct<data:struct<score:int,count:int,view_count:array<array<bigint>>,like_count:array<array<bigint>>,dislike_count:array<array<bigint>>>,best_videos:array<struct<display_id:string,view_count:bigint,like_count:bigint,dislike_count:bigint,duration:bigint>>>, Gaming: struct<data:struct<score:int,count:int,view_count:array<array<bigint>>,like_count:array<array<bigint>>,dislike_count:array<array<bigint>>>,best_videos:array<struct<display_id:string,view_count:bigint,like_count:bigint,dislike_count:bigint,duration:bigint>>>, Howto & Style: struct<data:struct<score:int,count:int,view_count:array<array<bigint>>,like_count:array<array<bigint>>,dislike_count:array<array<bi

In [173]:
week_score.coalesce(1).write.json('output', mode='overwrite', timestampFormat='yyyy-MM-dd', lineSep=',\n')

In [174]:
# !(echo "[" && head -c-2 output/*.json && echo "]") > weekly_data.json