In [6]:
import os
import json
import glob
import gzip
import swifter
import langdetect
import numpy as np
import pandas as pd
import seaborn as sns
import zstandard as zstd
import matplotlib as mpl
import scipy.stats as stats
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
from matplotlib.lines import Line2D
import matplotlib.font_manager as font_manager

class zreader:

    def __init__(self, file, chunk_size=16384):
        self.fh = open(file, 'rb')
        self.chunk_size = chunk_size
        self.dctx = zstd.ZstdDecompressor()
        self.reader = self.dctx.stream_reader(self.fh)
        self.buffer = ''

    def readlines(self):
        while True:
            chunk = self.reader.read(self.chunk_size).decode("utf-8", errors="ignore")
            if not chunk:
                break
            lines = (self.buffer + chunk).split("\n")

            for line in lines[:-1]:
                yield line

            self.buffer = lines[-1]

## Paths: change here!

In [3]:
DIR = "/dlabdata1/youtube_large/"
PATH_TIME_SERIES_SRC = DIR + "_raw_df_timeseries.tsv.gz"
PATH_TIME_SERIES_DST = DIR + "df_timeseries_en.tsv.gz"
PATH_CHANNELS_SRC = DIR + "_raw_df_channels.tsv.gz"
PATH_CHANNELS_DST = DIR + "df_channels_en.tsv.gz"
PATH_INVALID = DIR + "invalid.csv"
PATH_METADATA_SRC = DIR + "_raw_yt_metadata.jsonl.zst"
PATH_METADATA_NON_ENGLISH = DIR + "yt_metadata_fil.jsonl.gz"
PATH_METADATA_DEDUP = DIR + "yt_metadata_en_dd/{}.jsonl.gz"
PATH_METADATA_DST = DIR + "yt_metadata_en.jsonl.gz"
PATH_METADATA_HELPER = DIR + "yt_metadata_helper.feather"

## Filters non-english channels

In [28]:
reader = zreader(PATH_METADATA_SRC, chunk_size=16384)
dict_channels = {}
done_stack = set()
idx = 0

for line in reader.readlines():
    line_dict = json.loads(line)
    if line_dict["channel_id"] in done_stack:
        continue
    tmp = dict_channels.get(line_dict["channel_id"], [])

    if len(tmp) == 10:
        try:
            l = langdetect.detect_langs(" ".join(tmp))
        except:
            l = None
        dict_channels[line_dict["channel_id"]] = l
        done_stack.add(line_dict["channel_id"])
    else:
        tmp.append(line_dict["title"] + line_dict["description"])
        dict_channels[line_dict["channel_id"]] = tmp
    idx += 1

    if idx % 100000 == 0:
        print(idx)

print("Done:", len(done_stack), "Remaining:", len(dict_channels.keys()))

remainder_stack = set(dict_channels.keys()) - done_stack
for key_tmp in remainder_stack:
    tmp = dict_channels.get(key_tmp, [])
    try:
        l = langdetect.detect_langs(" ".join(tmp))
    except:
        l = None
    dict_channels[key_tmp] = l
    done_stack.add(key_tmp)

print("Done:", len(done_stack), "Remaining:", len(dict_channels.keys()))

th = 0.6
to_remove = set()
for key, item in dict_channels.items():
    if item is None:
        to_remove.add(key)
        continue

    tmp = [v for v in item if v.lang == "en"]
    if len(tmp) != 1:
        to_remove.add(key)
    else:
        if tmp[0].prob < th:
            to_remove.add(key)

print("% to remove", len(to_remove) / len(dict_channels.keys()))
df = pd.DataFrame({"invalid": list(to_remove)})
df.to_csv(PATH_INVALID, index=False)

## Filters time-series and channels files

In [29]:
# Reads stuff
df_sb_f = pd.read_csv(PATH_TIME_SERIES_SRC, sep="\t", compression="infer")
df_ch = pd.read_csv(PATH_CHANNELS_SRC, sep="\t", compression="infer")
df_invalid = pd.read_csv(PATH_INVALID)

# Filters invalid
df_invalid = df_invalid.set_index("invalid")
df_invalid["val"] = True
dict_channels = dict(df_invalid["val"])
df_ch_en = df_ch.loc[df_ch.channel.swifter.apply(lambda x: x not in dict_channels)]
df_sb_f_en = df_sb_f.loc[df_sb_f.channel.swifter.apply(lambda x: x not in dict_channels)]

# Saves stuff
df_ch_en.to_csv(PATH_CHANNELS_DST, sep="\t", index=False, compression="infer")
df_sb_f_en.to_csv(PATH_TIME_SERIES_DST, sep="\t", index=False, compression="infer")

## Filters non-english channels from channel metadata

In [30]:
reader = zreader(PATH_METADATA_SRC, chunk_size=16384)
writer = gzip.open(PATH_METADATA_NON_ENGLISH, mode="wb")
idx = 0

valid_channels = set(df_ch_en.channel.values)
min_crawl_date, max_crawl_date = pd.to_datetime("01-01-2100"), pd.to_datetime("01-01-1990")
min_upload_date, max_upload_date = pd.to_datetime("01-01-2100"), pd.to_datetime("01-01-1990")

# Read each line from the reader
for line in reader.readlines():
    line_dict = json.loads(line)
    if line_dict["channel_id"] in valid_channels:
        writer.write((json.dumps(line_dict) + "\n").encode())
        idx += 1
        if pd.to_datetime(line_dict["crawl_date"]) < min_crawl_date:
            min_crawl_date = pd.to_datetime(line_dict["crawl_date"])
        if pd.to_datetime(line_dict["crawl_date"]) > max_crawl_date:
            max_crawl_date = pd.to_datetime(line_dict["crawl_date"])
        if pd.to_datetime(line_dict["upload_date"]) < min_upload_date:
            min_upload_date = pd.to_datetime(line_dict["upload_date"])
        if pd.to_datetime(line_dict["upload_date"]) > max_upload_date:
            max_upload_date = pd.to_datetime(line_dict["upload_date"])
    if idx % 1000000 == 0:
        print(idx)

print(idx)
print(min_crawl_date, max_crawl_date)
print(min_upload_date, max_upload_date)
writer.close()

## De-duplicates channels

In [31]:
seen = set()
expected = 0
for df_json in pd.read_json(PATH_METADATA_NON_ENGLISH, compression="infer", chunksize=500000, lines=True):
    expected += 500000
    print("---", expected)
    print(len(df_json), len(seen) / expected)
    df_json = df_json.drop_duplicates("display_id")  # drop dups
    df_json = df_json.loc[df_json.display_id.apply(lambda x: x not in seen)]  # drops dups from before
    seen = set(df_json.display_id)  # uodates before
    df_json.to_json(PATH_METADATA_DEDUP.format(str(expected)), lines=True, orient="records", compression="infer")

## Concatenate files with zcat !

    for f in *.jsonl.gz; do (zcat "${f}"; echo) | gzip >> yt_metadata_en.jsonl.gz; done

## Creates .feather helper without "description", "tags", "title", and "crawl_date" fields

In [32]:
dfs = []
for df_json in pd.read_json(PATH_METADATA_DST, compression="infer", chunksize=5000000, lines=True):
    df_json.drop(["description", "tags", "title", "crawl_date"], inplace=True, axis=1)
    df_json["upload_date"] = pd.to_datetime(df_json["upload_date"])

    dfs.append(df_json)
df_final = pd.concat(dfs)
df_final.to_feather(PATH_METADATA_HELPER)


## Creates weigths derived from rankings

In [33]:
import pandas as pd

df_ch_f = pd.read_csv(PATH_CHANNELS_DST, sep="\t", compression="infer")

def get_dist_rank(df_ch_f, wsize=50, rank_name="subscriber_rank"):
    df_ch_f = df_ch_f.sort_values(rank_name).reset_index(drop=True)
    ranknonna = df_ch_f[~df_ch_f[rank_name].isna()]
    len_ranks = len(ranknonna)
    weights, pvalues = [], []

    for idx, rank in enumerate(ranknonna.index):
        
        # first wsize//2 items
        if idx < wsize//2:
            minv = ranknonna[rank_name].values[0]
            maxv = ranknonna[rank_name].values[wsize]
            observed = [int(v) for v in ranknonna[rank_name].values[0:wsize]]

        # last wsize//2 items
        elif idx >= len_ranks - wsize//2:
            minv = ranknonna[rank_name].values[-wsize]
            maxv = ranknonna[rank_name].values[-1]
            observed = [int(v) for v in ranknonna[rank_name].values[-wsize:-1]]
        else:
            minv = ranknonna[rank_name].values[idx-wsize//2]
            maxv = ranknonna[rank_name].values[idx+wsize//2]
            observed = [int(v) for v in ranknonna[rank_name].values[idx-wsize//2:idx+wsize//2]]

        all_vals = set(list(range(int(minv), int(maxv+1))))
        non_observed = np.array(list(all_vals.difference(set(observed))))
        observed = np.array(observed)
        all_vals = np.zeros(len(all_vals))
        all_vals[observed - int(minv)] = 1
        p = wsize/(maxv - minv)     
        weights.append(p)

    weights = 1/np.array(weights)
    name_field = "{}_{}".format(rank_name, str(wsize))
    df_ch_f["weights" + name_field] = np.NaN
    df_ch_f.loc[~df_ch_f[rank_name].isna(), "weights" + name_field] = weights
    return df_ch_f

for wsize in [100, 2000, 32000]:
    print(wsize)wd
    df_ch_f = get_dist_rank(df_ch_f, wsize=wsize, rank_name="subscriber_rank_sb")

## Ranking visualization

In [36]:
# plotting config
fontpath = os.path.expanduser('~/.local/share/fonts/LinLibertine_DRah.ttf')
prop = font_manager.FontProperties(fname=fontpath)

params = {
    "axes.titlesize" : 14,
    'axes.labelsize': 12,
    'font.size': 12,
    'legend.fontsize': 12,
    'xtick.labelsize': 12,
    'ytick.labelsize': 12,
    'font.family': prop.get_name(),
    'text.usetex': True
}

mpl.rcParams.update(params)

import sys
# Local Modules
sys.path.insert(0, os.path.abspath('/scratch/horta/youtube_dataset/'))
from helpers.plot import set_size

fig, axs = plt.subplots(2, 1, figsize=(7, 6), sharex=False, gridspec_kw={"hspace": 0.4})

vs = ["#1b9e77", "#7570b3", "#e6ab02"]
label = ["$k=100$", "$k=2000$", "$k=32000$"]
kwargs = {'cumulative': True}

ax = axs[0]
for idx, wsize in enumerate([100, 2000, 32000]):
    ax.plot(df_ch_f["subscriber_rank_sb"].values,
            df_ch_f["weightssubscriber_rank_sb_{}".format(str(wsize))],
            color=vs[idx], 
            label=label[idx]
           )

    x = df_ch_f["weightssubscriber_rank_sb_{}".format(str(wsize))].values
    x = x[1:] - x[:-1]
    ax.xaxis.grid(color="#CCCCCC", ls=":")
    ax.yaxis.grid(color="#CCCCCC", ls=":")

ax.set_title("(a) Weights for different window sizes")
ax.set_yscale("log")
ax.legend()
ax.set_ylabel("Weights")
ax.set_xlabel("Subscriber Ranking")

ax = axs[1]
ax.set_title("(b) Adjustment example")
ax.plot(df_ch_f.subscriber_rank_sb,
        df_ch_f.videos_cc.rolling(1000).mean(), color="gray", label="Moving average over number of videos")
ax.set_xlabel("Subscriber Ranking")
ax.axhline(np.mean(df_ch_f.videos_cc), ls=":", color="black", label="Unadjusted mean $\mu = 699.78$")
ax.axhline(np.average(df_ch_f.videos_cc, weights=df_ch_f.weightssubscriber_rank_sb_2000), 
           ls="-", color="#7570b3", label="Mean adjusted with weights $\mu = 559.26$")
ax.legend()
ax.xaxis.grid(color="#CCCCCC", ls=":")
ax.yaxis.grid(color="#CCCCCC", ls=":")
ax.set_ylabel("Number of videos")
set_size(fig, size=(7, 6))
fig.savefig("./images/weights_window.pdf", bbox_inches="tight")

## Saves dataframe with ranking

In [37]:
df_ch_f.rename({"weightssubscriber_rank_sb_2000": "weights"}, axis=1)\
       .drop(['weightssubscriber_rank_sb_100', 'weightssubscriber_rank_sb_32000'], axis=1)\
       .to_csv(PATH_CHANNELS_DST, sep="\t", compression="infer")
