# Characterizing Patronage on YouTube - Preprocessing

#### Libaries imports

In [None]:
import ast
import gzip
import json
import os
import re
import timeit

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm.notebook import tqdm

#### Paths to data files

Source files used in this notebook:

**YouNiverse dataset:**

- `df_channels_en.tsv.gz`: channel metadata.
- `df_timeseries_en.tsv.gz`: channel-level time-series.
- `yt_metadata_en.jsonl.gz`: raw video metadata.

**Graphteon dataset:**
- `final_processed_file.jsonl.gz` all graphtreon time-series.

In [None]:
# data folder paths
DATA_FOLDER = "/dlabdata1/youtube_large/"
LOCAL_DATA_FOLDER = "local_data/"

# YouTube Metadata
PATH_YT_METADATA_SRC = DATA_FOLDER+"yt_metadata_en.jsonl.gz"
PATH_YT_METADATA_DST = LOCAL_DATA_FOLDER+"yt_metadata_en_pt.tsv.gz"
PATH_YT_METADATA_UNIQUE_PT_DST = LOCAL_DATA_FOLDER+"yt_metadata_en_unique_pt_per_chan.tsv.gz"
PATH_YT_METADATA_UNIQUE_YT_PT_DST = LOCAL_DATA_FOLDER+"yt_metadata_en_unique_pt_yt.tsv.gz"
PATH_YT_METADATA_RESTRICTED = LOCAL_DATA_FOLDER+"yt_metadata_en_restricted.tsv.gz"

# Linked channels and patrons
PATH_LINKED_CHANNELS_PATRONS_DST = LOCAL_DATA_FOLDER+"df_linked_channels_patreons.tsv.gz"

# YouTube Timeseries
PATH_YT_TIMESERIES_SRC = DATA_FOLDER+"df_timeseries_en.tsv.gz"
PATH_YT_TIMESERIES_RESTRICTED_DST = LOCAL_DATA_FOLDER+"df_yt_timeseries_restricted.tsv.gz"

# Patreon timeseries
PATH_GT_TIMESERIES_SRC = DATA_FOLDER+"final_processed_file.jsonl.gz"
PATH_GT_TIMESERIES_DST = LOCAL_DATA_FOLDER+"df_gt_timeseries_filtered.tsv.gz"
PATH_GT_TIMESERIES_RESTRICTED_DST = LOCAL_DATA_FOLDER+"df_gt_timeseries_restricted.tsv.gz"
PATH_GT_TIMESERIES_EARNING_DST = LOCAL_DATA_FOLDER+"dailyGraph_earningsSeries.tsv.gz"
PATH_GT_TIMESERIES_PATRONS_DST = LOCAL_DATA_FOLDER+"dailyGraph_patronsSeries.tsv.gz"
PATH_GT_TIMESERIES_PATRONS_AND_EARNINGS_DST = LOCAL_DATA_FOLDER+"dailyGraph_patrons_and_earnings_Series.tsv.gz"

In [None]:
# convention for variable names
# use 'df_' prefix when dataframe in the original form (not group by etc..)

In [None]:
# list all files in DATA_FOLDER
# !ls -lh {DATA_FOLDER}

In [None]:
# list all files in LOCAL_DATA_FOLDER
# !ls -lh {LOCAL_DATA_FOLDER}

## 1. Preprocess data

### 1.1. Preprocess YouTube metadata

#### Filter YouTube metadata containing patreon id
_Extract Patreon urls from YouTube metadata description (if they exist) and keep only those rows_

In [None]:
def json_escape(str):
    """
    replace new line special character by a space
    """
    return str.replace("\\n", " ")

In [None]:
# # extract patreon accounts from youtube channel descriptions and
# # filter the metadata to retain only the rows which description contains a patreon url

# TEST_RUN = False
# # MAX_ITER = 10_000

# nb_rows_read = 0
# JSONDecodeErrors_cnt = 0 
# lines_json = []    

# # match patterns starting with patreon.com/ and matching at least 1 character after
# # until it reaches anything thats not a word character
# pattern = re.compile(r'patreon.com/[^\W]+')

# compressed_file_size = os.stat(PATH_YT_METADATA_SRC).st_size
# print("Compressed file size is :                 {:>3,.2f} GB".format(compressed_file_size / 2**30))

# uncompressed_file_size = 97_600_000_000
# print("Estimated Uncompressed file size is :     {:>3,.2f} GB".format(uncompressed_file_size / 2**30))

# start = timeit.default_timer()

# # Load tqdm with size counter instead of file counter
# with tqdm(total=uncompressed_file_size, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
#     with gzip.open(PATH_YT_METADATA_SRC, "r") as f:
#         for i, line_byte in enumerate(f): 

#             read_bytes = len(line_byte)
#             if read_bytes:
#                 pbar.set_postfix(file=PATH_YT_METADATA_SRC[len(DATA_FOLDER)+1:], refresh=False)
#                 pbar.update(read_bytes)

#             nb_rows_read += 1
            
#             # set a maximum iteration for tests
#             if TEST_RUN == True:
#                 if nb_rows_read >= MAX_ITER:
#                     break

#             # convert bytes into string
#             line_str = line_byte.decode("utf-8")

#             # convert string into json after escaping new line characters
#             line_str_esc = json_escape(line_str)
#             try:
#                 line_json = json.loads(line_str_esc)
#             except Exception as e:
#                 JSONDecodeErrors_cnt += 1
#                 pass

#             # print(line_json)
#             # print(line_json['categories'])
            
#             # add line if description contains a patreon.com id
#             if re.search(pattern, line_json['description']):
#                 patreon_id = re.findall(pattern, line_json['description'])[0]
#                 line_json['patreon_id'] = patreon_id
#                 lines_json.append(line_json)

# stop = timeit.default_timer()
# time_diff = stop - start

# print()
# print("==> total time to read and filter youtube metadata:                {:>10.0f} min. ({:.0f}s.)".format(time_diff/60, time_diff)) 
# print("==> number of rows (= videos) read:                                {:>10,}".format(nb_rows_read))
# print("==> number of videos containing a patreon link in the description: {:>10,} ({:.3%})".format(len(lines_json), len(lines_json)/nb_rows_read ))
# print("==> number of skipped rows (JSONDecodeErrors):                     {:>10,} ({:.3%})".format(JSONDecodeErrors_cnt, JSONDecodeErrors_cnt/nb_rows_read))

# # create new dataframe with the filtered lines
# df_yt_metadata_pt = pd.DataFrame(data=lines_json, index=None)


In [None]:
# # remove rows where patreon_ids = patreon.com/posts or patreon.com/user (in the future fix in regex)
# df_yt_metadata_pt = df_yt_metadata_pt[df_yt_metadata_pt['patreon_id'] != 'patreon.com/posts']
# df_yt_metadata_pt = df_yt_metadata_pt[df_yt_metadata_pt['patreon_id'] != 'patreon.com/user']
# df_yt_metadata_pt = df_yt_metadata_pt[df_yt_metadata_pt['patreon_id'] != 'patreon.com/join']

# # lowercase all patreon ids to avoid duplicates
# df_yt_metadata_pt['patreon_id'] = df_yt_metadata_pt['patreon_id'].str.lower()

In [None]:
# save youtube metadata df containing patreon accounts
# df_yt_metadata_pt.to_csv(PATH_YT_METADATA_DST, index=False, sep='\t', compression='gzip')
!ls -lh {PATH_YT_METADATA_DST}

#### Restrict to 1 patreon id per youtube channel
Some YouTube channels use multiple patreon accounts. We'll only keep the most used patreon account for each YouTube channel.

In [None]:
# read youtube metadata file containing patreon accounts (takes about 2 mins)
df_yt_metadata_pt = pd.read_csv(PATH_YT_METADATA_DST, sep="\t", lineterminator='\n', compression='gzip')
df_yt_metadata_pt.head()

In [None]:
# declare global variable for size of original YT dataset

# use if running script above
# DF_YT_METADATA_ROWS = nb_rows_read 

# use if load df_yt_metadata_pt
DF_YT_METADATA_ROWS = 72_924_794 

In [None]:
# stats 
print("[YouTube metadata] Total number of videos:                                                {:>10,}".format(DF_YT_METADATA_ROWS))
print("[Filtered YouTube metadata] number of videos that contain a patreon link in description:  {:>10,} ({:.1%} of total dataset)".format(len(df_yt_metadata_pt), len(df_yt_metadata_pt)/DF_YT_METADATA_ROWS))

# get list of all unique patreon ids in df_yt_metadata_pt
yt_patreon_list = df_yt_metadata_pt['patreon_id'].unique()
yt_pt_channel_list = df_yt_metadata_pt['channel_id'].unique()

print("[Filtered YouTube metadata] number of unique channels that contain a patreon account:     {:>9,}".format(len(yt_pt_channel_list)))
print("[Filtered YouTube metadata] number of unique patreon ids:                                 {:>9,}".format(len(yt_patreon_list)))


**Observation:** \
We can see that we have _**more patreon ids than channels**_ . Let's investigate further:

In [None]:
# group by channel_id AND patreon_id and count the number of unique videos (display_ids)
yt_metadata_pt_grp_chan = df_yt_metadata_pt.groupby(['channel_id','patreon_id']).agg(display_id_cnt=("display_id", pd.Series.nunique))
yt_metadata_pt_grp_chan.head()

In [None]:
# reset index
yt_metadata_pt_grp_chan = yt_metadata_pt_grp_chan.reset_index()
yt_metadata_pt_grp_chan

In [None]:
# count the number of patreon_ids per channel
pt_id_cnt_pr_chan = yt_metadata_pt_grp_chan.groupby('channel_id').count()['patreon_id'].sort_values(ascending=False)
pt_id_cnt_pr_chan = pt_id_cnt_pr_chan.to_frame(name='patreon_id_cnt')
pt_id_cnt_pr_chan.head()

In [None]:
chan_gt_1_pt = pt_id_cnt_pr_chan[pt_id_cnt_pr_chan['patreon_id_cnt']>1]
print(f"Total number of channels:                            {len(pt_id_cnt_pr_chan):>6,}")
print(f"Number of channels with more than 1 patreon account: {len(chan_gt_1_pt):>6,} ({len(chan_gt_1_pt) / len(pt_id_cnt_pr_chan):.1%})")

# plot Distribution of patreon ids per channel
fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(6,4))

# plot with log scale for x axis and log scale for y axis
sns.histplot(data=pt_id_cnt_pr_chan, ax=axs, bins=50, kde=False, legend=False, color=f'C{0}')
axs.set(title=f'Distribution of patreon ids per channel (log scale)')
axs.set_xlabel("Number of patreon ids")
axs.set_ylabel("Count of channels (log scale)")
axs.set(yscale="log")

# plt.tight_layout()
plt.show()

# descriptive statistics table
pt_id_cnt_pr_chan.describe().T

**Discussion:** \
As we observed earlier, some channels use more than 1 patreon id, and use different patreon urls for different videos. For example:
- [Patreon_Gaming](https://www.youtube.com/channel/UCAsLyFlWkbdhvri02tO6veA) uses 73 different patreon ids.
- [Artistic Maniacs](https://www.youtube.com/channel/UC3pcSD6_RRisNLaHGznemJA) uses 69 different patreon ids.

In [None]:
# example for Artistic Maniacs
yt_metadata_pt_grp_chan[yt_metadata_pt_grp_chan['channel_id'] == 'UC3pcSD6_RRisNLaHGznemJA'].head()

**Keep only most used patreon_id per channel (patreon_id with most videos for each channel)**

In [None]:
# sort metadata df by diplay_id_cnt within each channel_id group
yt_metadata_pt_grp_chan = yt_metadata_pt_grp_chan.sort_values(['channel_id','display_id_cnt'], ascending=[True, False])
yt_metadata_pt_grp_chan.head(5)

In [None]:
# calculate the number of duplicate of rows with same channel id but different patreon ids
dup_chan_id = yt_metadata_pt_grp_chan[yt_metadata_pt_grp_chan.duplicated(subset=['channel_id'], keep='first')]
print("Number of duplicate rows (same channel id with multiple patreon_ids): {:,}".format(len(dup_chan_id)))

In [None]:
# look at duplicate rows
# yt_metadata_pt_grp_chan[yt_metadata_pt_grp_chan.duplicated('channel_id')].sort_values('channel_id')

In [None]:
yt_metadata_pt_grp_chan

In [None]:
# drop duplicate rows, keep the patreon ids with the most videos (keep first)
yt_metadata_unique_pt = yt_metadata_pt_grp_chan.drop_duplicates(subset='channel_id', keep='first')
print('Removed {:,} rows'.format(len(yt_metadata_pt_grp_chan) - len(yt_metadata_unique_pt)))
yt_metadata_unique_pt.head()

In [None]:
df_yt_metadata_unique_pt_per_chan = df_yt_metadata_pt[df_yt_metadata_pt['patreon_id'].isin(yt_metadata_unique_pt['patreon_id'])]
print(f"removed {len(df_yt_metadata_pt) - len(df_yt_metadata_unique_pt_per_chan):,} videos from dataframe")

In [None]:
print("After keeping only patreon appearing on the most videos for each channel:")
print(f"Number of videos:    {len(df_yt_metadata_unique_pt_per_chan):>10,}")
print(f"Unique channel ids:  {df_yt_metadata_unique_pt_per_chan.channel_id.nunique():>10,}")
print(f"Unique patreon ids:  {df_yt_metadata_unique_pt_per_chan.patreon_id.nunique():>10,}")

In [None]:
# save "YouTube Metadata unique patreon account per channel" dataframe to LOCAL SCRATCH FOLDER as a compressed tsv
# df_yt_metadata_unique_pt_per_chan.to_csv(PATH_YT_METADATA_UNIQUE_PT_DST, index=False, sep='\t', compression='gzip')
!ls -lh {PATH_YT_METADATA_UNIQUE_PT_DST}

#### Restrict to 1 youtube channel per patreon id 
We grouping YouTube metadata by `channel_id` and `patreon_id`, we also notice that we have more rows than the total number of unique patreon ids. \
This is because some `patreon_id` are used on multiple YT channels. We'll remove those accounts

In [None]:
print(f"total rows (grouped by channel_id and patreon_id):   {len(yt_metadata_unique_pt):,}")
print(f"total number of unique patreon ids:                  {yt_metadata_unique_pt.patreon_id.nunique():,}")

In [None]:
# # show patreon_id that are used on multiple channels.
yt_metadata_unique_pt[yt_metadata_unique_pt.duplicated(subset=['patreon_id'], keep=False)].sort_values(by='patreon_id')

In [None]:
# print("[Filtered YouTube metadata] number of channels per patreon id:")

# chan_cnt_per_patreon_id = df_yt_metadata_pt.groupby('patreon_id')\
#                                             .agg(channel_id_count=('channel_id', 'count'))\
#                                             .sort_values(by=['channel_id_count'], ascending=False)
# chan_cnt_per_patreon_id
# # chan_cnt_per_patreon_id.reset_index()

In [None]:
yt_metadata_unique_pt

In [None]:
df_yt_metadata_unique_pt_per_chan.head(2)

In [None]:
# remove patreon accounts that have more than 1 youtube channel

# Count YT channel_ids per patreon_id
yt_metadata_pt_chan_id_cnt = df_yt_metadata_unique_pt_per_chan.groupby(['patreon_id','channel_id']).agg(channel_id_cnt=("channel_id", pd.Series.nunique)).groupby('patreon_id').count().sort_values('channel_id_cnt', ascending=False)
yt_metadata_pt_chan_id_cnt

In [None]:
# keep patreon ids that have exactly 1 channel id only
yt_metadata_pt_chan_id_cnt_unique_chan = yt_metadata_pt_chan_id_cnt[yt_metadata_pt_chan_id_cnt['channel_id_cnt']==1]

print(f"removed {len(yt_metadata_pt_chan_id_cnt) - len(yt_metadata_pt_chan_id_cnt_unique_chan)} accounts")

patreons_with_unique_chan = yt_metadata_pt_chan_id_cnt_unique_chan.index

print(f"Number of patreon accounts with only 1 YT channel: {patreons_with_unique_chan.size:,}")

In [None]:
df_yt_metadata_unique_pt_yt = df_yt_metadata_pt[df_yt_metadata_pt['patreon_id'].isin(patreons_with_unique_chan)]
print(f"removed {len(df_yt_metadata_pt) - len(df_yt_metadata_unique_pt_yt)} videos from dataframe")

In [None]:
print(f"{len(df_yt_metadata_unique_pt_yt):,}")

In [None]:
print(f"Number of videos:    {len(df_yt_metadata_unique_pt_yt):>10,}")
print(f"Unique channel ids:  {df_yt_metadata_unique_pt_yt.channel_id.nunique():>10,}")
print(f"Unique patreon ids:  {df_yt_metadata_unique_pt_yt.patreon_id.nunique():>10,}")

In [None]:
# save "YouTube Metadata unique patreon account per channel" dataframe to LOCAL SCRATCH FOLDER as a compressed tsv
# df_yt_metadata_unique_pt_yt.to_csv(PATH_YT_METADATA_UNIQUE_YT_PT_DST, index=False, sep='\t', compression='gzip')
!ls -lh {PATH_YT_METADATA_UNIQUE_YT_PT_DST}

#### _Restrict YouTube metadata further according to YouTube Timeseries restricted dataset_ (see 1.3)
_This is done at the end of section 1.3 below, after the YouTube Timeseries 4 filters have been applied_

#### [_Ignore for now_] Number of videos per patreon id

In [None]:
# group by patreon_id and count the number of unique display_ids
vids_cnt_per_patreon_id = df_yt_metadata_pt.groupby('patreon_id').agg({"display_id": pd.Series.nunique}).sort_values(by='display_id', ascending=False)
vids_cnt_per_patreon_id.rename(columns={'display_id':'display_id_cnt'}, inplace=True)

print("[Filtered YouTube metadata] number of videos per patreon id:")
vids_cnt_per_patreon_id

In [None]:
# plot with linear scale for both axes
fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(6,4))


# plot with log scale for x axis and log scale for y axis
sns.histplot(data=vids_cnt_per_patreon_id, ax=axs, bins=50, kde=False, color=f'C{0}')
axs.set(title=f'Distribution of videos per patreon id (log scale)')
axs.set_xlabel("Number of videos")
axs.set_ylabel("# patreon ids (log scale)")
axs.set(yscale="log")

plt.tight_layout()
plt.show()

# descriptive statistics table
vids_cnt_per_patreon_id.describe().T

**Discussion:** \
From the above graph and table, we can see that the _videos_ distributions among patreon ids follows a **power law**, meaning that most patreon accounts have only a few videos, but a few of them have a lot of videos.

More specifically:
- 25% of the Patreon accounts have 1 video
- 50% of the Patreon accounts have less than 4 videos

### 1.2 Link YT channels and Patrons

#### "Link" dataframe

Consider them linked only if 
- Link YouTube channel to Patreon id which appears in most of its videos

In [None]:
yt_metadata_unique_all = yt_metadata_unique_pt[yt_metadata_unique_pt['patreon_id'].isin(patreons_with_unique_chan)]
print(f"Removed {len(yt_metadata_unique_pt) - len(yt_metadata_unique_all)} accounts")

In [None]:
# store into new "matched" dataframe
df_linked_channels_patreons = yt_metadata_unique_all[['channel_id', 'patreon_id']]
df_linked_channels_patreons = df_linked_channels_patreons.reset_index(drop=True)
df_linked_channels_patreons

In [None]:
# save "linked" dataframe to LOCAL SCRATCH FOLDER as a compressed tsv
# df_linked_channels_patreons.to_csv(PATH_LINKED_CHANNELS_PATRONS_DST, index=False, sep='\t', compression='gzip')

In [None]:
!ls -lh {PATH_LINKED_CHANNELS_PATRONS_DST}

### 1.3 Filter YouTube timeseries - Restrict YouTube channels (4 filters)
Restrict YouTube channels according to the following criteria (filters are applied sequentially):
- Filter 1: Keep only YouTube channels that are in YouTube Timeseries dataset AND linked to a patreon account 
- Filter 2: At least 2 year between first and last video
- Filter 3: At least 20 videos with patreon ids
- Filter 4: At least 250k subscribers at data crawling time

In [None]:
!ls -lh {PATH_YT_TIMESERIES_SRC}

In [None]:
# load channel-level time-series. (takes about 50 secs)
df_yt_timeseries = pd.read_csv(PATH_YT_TIMESERIES_SRC, sep="\t", compression='gzip', parse_dates=['datetime'])

In [None]:
df_yt_timeseries.head(3)

In [None]:
# Define global values for filters
MIN_DAYS_DELTA = "730 day"    # filter 2
NB_PATREON_VIDS = 20          # filter 3
NB_SUBS = 250_000             # filter 4

In [None]:
# Nb of channels of original YT timeseries dataset (need to first load df_yt_timeseries in 1.1.2)
yt_ts_uniq_chan_cnt = df_yt_timeseries['channel'].nunique()
print("[YouTube Timeseries] Nb of videos of original dataset:                  {:>10,}".format(len(df_yt_timeseries)))
print("[YouTube Timeseries] Nb of channels of original dataset:                {:>10,}".format(yt_ts_uniq_chan_cnt))

#### **Filter 1:** Keep only YouTube channels that are in YouTube Timeseries dataset AND linked to a patreon account (using "Link" df)

In [None]:
# Apply filter 1: retain only the YT channels that exist in the filtered YT metadata dataset (need to first load df_yt_metadata_pt and yt_pt_channel_list in 2.2.1)
df_yt_timeseries_filt1 = df_yt_timeseries[df_yt_timeseries['channel'].isin(df_linked_channels_patreons['channel_id'])]
chan_list_filt1 = df_yt_timeseries_filt1['channel'].unique()
chan_list_filt1_cnt = len(chan_list_filt1)

print("[YouTube Timeseries] Nb of rows of after applying filter 1:           {:>10,} ({:5.1%} of original dataset)".format(len(df_yt_timeseries_filt1), len(df_yt_timeseries_filt1)/len(df_yt_timeseries)))
print("[YouTube Timeseries] Nb of channels after applying filter 1:          {:>10,} ({:5.1%} of original dataset)".format(chan_list_filt1_cnt, chan_list_filt1_cnt/yt_ts_uniq_chan_cnt))

---
#### **Filter 2:** At least 2 years between first and last video

In [None]:
# among filter1 channels, calculate time difference between the first and the last video for each channel
datetime_data = df_yt_timeseries_filt1.groupby('channel').agg(datetime_min=('datetime', 'min'),
                                                              datetime_max=('datetime', 'max'))
datetime_data['delta_datetime'] = datetime_data['datetime_max'] - datetime_data['datetime_min']

# filter channels that we have data for at least MIN_TIME_DELTA days
datetime_data_filt2 = datetime_data[datetime_data['delta_datetime'] > pd.Timedelta(MIN_DAYS_DELTA)]

# Apply filter on YT Timeseries dataset: retain only those channels that have data for at least MIN_TIME_DELTA days
df_yt_timeseries_filt2 = df_yt_timeseries_filt1[df_yt_timeseries_filt1['channel'].isin(datetime_data_filt2.index)]

chan_list_filt2 = df_yt_timeseries_filt2['channel'].unique()
chan_list_filt2_cnt = len(chan_list_filt2)

print("[YouTube Timeseries] Nb of rows of after applying filter 1+2:         {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 1 dataset)".format(len(df_yt_timeseries_filt2), len(df_yt_timeseries_filt2)/len(df_yt_timeseries), len(df_yt_timeseries_filt2)/len(df_yt_timeseries_filt1)))
print("[YouTube Timeseries] Nb of channels after applying filter 1+2:        {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 1 channels)".format(chan_list_filt2_cnt, chan_list_filt2_cnt/yt_ts_uniq_chan_cnt, chan_list_filt2_cnt/chan_list_filt1_cnt))

___

#### **Filter 3:** At least 20 videos with patreon ids per channel 

In [None]:
# group by channel_id AND patreon_id and count the number of unique videos (=display_ids). (need to load yt_metadata_pt_grp_chan from point 2.2.1)
# Then filter rows that have at least 20 videos (display_ids) 
yt_metadata_pt_grp_chan_filt3 = yt_metadata_pt_grp_chan[yt_metadata_pt_grp_chan['display_id_cnt'] > NB_PATREON_VIDS]
yt_metadata_pt_grp_chan_filt3

# get list of unique channels satisfying filter 3
chan_list_filt_3 = yt_metadata_pt_grp_chan_filt3['channel_id'].unique()

# Apply filter on YT Timeseries dataset: retain only those channels from filt 2 that are in the chan_list_filt_3
df_yt_timeseries_filt3 = df_yt_timeseries_filt2[df_yt_timeseries_filt2['channel'].isin(chan_list_filt_3)]

chan_list_filt3 = df_yt_timeseries_filt3['channel'].unique()
chan_list_filt3_cnt = len(chan_list_filt3)

print("[YouTube Timeseries] Nb of rows of after applying filter 1+2+3:       {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 2 dataset)".format(len(df_yt_timeseries_filt3), len(df_yt_timeseries_filt3)/len(df_yt_timeseries), len(df_yt_timeseries_filt3)/len(df_yt_timeseries_filt2)))
print("[YouTube Timeseries] Nb of channels after applying filter 1+2+3:      {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 2 channels)".format(chan_list_filt3_cnt, chan_list_filt3_cnt/yt_ts_uniq_chan_cnt, chan_list_filt3_cnt/chan_list_filt2_cnt))

---
#### **Filter 4:** At least 250k subscribers at data crawling time

In [None]:
# Aggregates per channel
subs_aggr_per_channel = df_yt_timeseries_filt3.groupby('channel')\
                                               .agg(min_subs=('subs', 'min'),
                                                    max_subs=('subs', 'max'))\
                                                .sort_values(by=['max_subs'], ascending=False)\
                                                .reset_index()
# subs_aggr_per_channel.head()

In [None]:
# Need to first load data_per_channel (aggregates per channel in 1.1.2 'Datetime points accross channels' section)
subs_per_channel_filt4 = subs_aggr_per_channel[subs_aggr_per_channel['max_subs'] > NB_SUBS]

# get list of unique channels satisfying filter 4
chan_list_filt_4 = subs_per_channel_filt4['channel'].unique()

# # Apply filter on YT Timeseries dataset: retain only those channels from filt_3 that are in the chan_list_filt_4
df_yt_timeseries_filt4 = df_yt_timeseries_filt3[df_yt_timeseries_filt3['channel'].isin(chan_list_filt_4)]

chan_list_filt4 = df_yt_timeseries_filt4['channel'].unique()
chan_list_filt4_cnt = len(chan_list_filt4)

print("[YouTube Timeseries] Nb of rows of after applying filter 1+2+3+4:     {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 3 dataset)".format(len(df_yt_timeseries_filt4), len(df_yt_timeseries_filt4)/len(df_yt_timeseries), len(df_yt_timeseries_filt4)/len(df_yt_timeseries_filt3)))
print("[YouTube Timeseries] Nb of channels after applying filter 1+2+3+4:    {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 3 channels)".format(chan_list_filt4_cnt, chan_list_filt4_cnt/yt_ts_uniq_chan_cnt, chan_list_filt4_cnt/chan_list_filt3_cnt))

___
___
**• Filters summary**

In [None]:
print("[YouTube Timeseries] Stats before and after filters:")
print()

print("Filter 1 = \"keep only YouTube channels that are in YouTube Timeseries dataset AND linked to a patreon account\"")
print("Filter 2 = \"at least {:.1f} years ({} days) between first and last video\"".format(pd.Timedelta(MIN_DAYS_DELTA).days/365, pd.Timedelta(MIN_DAYS_DELTA).days))
print("Filter 3 = \"at least {:,} videos with patreon ids per channel\"".format(NB_PATREON_VIDS))
print("Filter 4 = \"at least {:,} subscribers at data crawling time\"".format(NB_SUBS))
print()
print("[YouTube Timeseries] Nb of rows of original dataset:                  {:>10,}".format(len(df_yt_timeseries)))
print("[YouTube Timeseries] Nb of rows of after applying filter 1:           {:>10,} ({:5.1%} of original dataset)".format(len(df_yt_timeseries_filt1), len(df_yt_timeseries_filt1)/len(df_yt_timeseries)))
print("[YouTube Timeseries] Nb of rows of after applying filter 1+2:         {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 1 dataset)".format(len(df_yt_timeseries_filt2), len(df_yt_timeseries_filt2)/len(df_yt_timeseries), len(df_yt_timeseries_filt2)/len(df_yt_timeseries_filt1)))
print("[YouTube Timeseries] Nb of rows of after applying filter 1+2+3:       {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 2 dataset)".format(len(df_yt_timeseries_filt3), len(df_yt_timeseries_filt3)/len(df_yt_timeseries), len(df_yt_timeseries_filt3)/len(df_yt_timeseries_filt2)))
print("[YouTube Timeseries] Nb of rows of after applying filter 1+2+3+4:     {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 3 dataset)".format(len(df_yt_timeseries_filt4), len(df_yt_timeseries_filt4)/len(df_yt_timeseries), len(df_yt_timeseries_filt4)/len(df_yt_timeseries_filt3)))
print()
print("[YouTube Timeseries] Nb of channels of original dataset:              {:>10,}".format(yt_ts_uniq_chan_cnt))
print("[YouTube Timeseries] Nb of channels after applying filter 1:          {:>10,} ({:5.1%} of original dataset)".format(chan_list_filt1_cnt, chan_list_filt1_cnt/yt_ts_uniq_chan_cnt))
print("[YouTube Timeseries] Nb of channels after applying filter 1+2:        {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 1 channels)".format(chan_list_filt2_cnt, chan_list_filt2_cnt/yt_ts_uniq_chan_cnt, chan_list_filt2_cnt/chan_list_filt1_cnt))
print("[YouTube Timeseries] Nb of channels after applying filter 1+2+3:      {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 2 channels)".format(chan_list_filt3_cnt, chan_list_filt3_cnt/yt_ts_uniq_chan_cnt, chan_list_filt3_cnt/chan_list_filt2_cnt))
print("[YouTube Timeseries] Nb of channels after applying filter 1+2+3+4:    {:>10,} ({:5.1%} of original dataset, {:5.1%} of filter 3 channels)".format(chan_list_filt4_cnt, chan_list_filt4_cnt/yt_ts_uniq_chan_cnt, chan_list_filt4_cnt/chan_list_filt3_cnt))
print()
print('[YouTube Timeseries] Time range of original dataset                   {} and {}'.format(df_yt_timeseries['datetime'].min().strftime('%B %d, %Y'),
                                                              df_yt_timeseries['datetime'].max().strftime('%B %d, %Y')))

print('[YouTube Timeseries] Time range after applying filter 1+2+3+4        {} and {}'.format(df_yt_timeseries_filt4['datetime'].min().strftime('%B %d, %Y'),
                                                              df_yt_timeseries_filt4['datetime'].max().strftime('%B %d, %Y')))

display(df_yt_timeseries_filt4.head())
print("Restricted list of channels after 4 filters (count = {:,}):".format(chan_list_filt4_cnt))
print(chan_list_filt4)

In [None]:
df_yt_timeseries_restricted = df_yt_timeseries_filt4.copy()

# save youtube restricted timeseries df
# df_yt_timeseries_restricted.to_csv(PATH_YT_TIMESERIES_RESTRICTED_DST, index=False, sep='\t', compression='gzip')
# !ls -lh {PATH_YT_TIMESERIES_RESTRICTED_DST}

#### Restrict YouTube Metadata accordingly

In [None]:
# filter YT metadata dataset by list of filtered channels from YT timeseries above
df_yt_metadata_pt_restr = df_yt_metadata_unique_pt_yt[df_yt_metadata_unique_pt_yt['channel_id'].isin(chan_list_filt4)]

# get unique channels for youtube metadata (original and restricted)
yt_metadata_uniq_chan = df_yt_metadata_pt['channel_id'].unique()
yt_metadata_uniq_chan_restr = df_yt_metadata_pt_restr['channel_id'].unique()

# get unique patreon ids for youtube metadata (original and restricted)
yt_metadata_uniq_pat = df_yt_metadata_pt['patreon_id'].unique()
yt_metadata_uniq_pat_restr = df_yt_metadata_pt_restr['patreon_id'].unique()

print("[YouTube Metadata]:")
print()
print("Restriction = \"keep only YouTube channels that are in YouTube Timeseries filtered (filters 1-4) dataset\"")
print()
# print("[YouTube Metadata] Nb of videos in original dataset:                                   {:>10,}".format(DF_YT_METADATA_ROWS))
# print("[YouTube Metadata] Nb of videos in pre-filtered (containing patreon id) dataset:       {:>10,}".format(len(df_yt_metadata_pt)))
# print("[YouTube Metadata] Nb of videos after filtering by restricted channels:                {:>10,} ({:5.1%} of pre-filtered dataset dataset)".format(len(df_yt_metadata_pt_restr), len(df_yt_metadata_pt_restr)/len(df_yt_metadata_pt)))
# print()
print("[Filtered YouTube Metadata]   Nb of channels in pre-filtered (containing patreon id) dataset:     {:>10,}".format(len(yt_metadata_uniq_chan)))
print("[Restricted YouTube Metadata] Nb of channels after filtering by restricted channels:              {:>10,} ({:5.1%} of pre-filtered dataset dataset)".format(len(yt_metadata_uniq_chan_restr), len(yt_metadata_uniq_chan_restr)/len(yt_metadata_uniq_chan)))
print()
print("[Filtered YouTube Metadata]   Nb of patreon ids in pre-filtered (containing patreon id) dataset:  {:>10,}".format(len(yt_metadata_uniq_pat)))
print("[Restricted YouTube Metadata] Nb of patreon ids after filtering by restricted channels:           {:>10,} ({:5.1%} of pre-filtered dataset dataset)".format(len(yt_metadata_uniq_pat_restr), len(yt_metadata_uniq_pat_restr)/len(yt_metadata_uniq_pat)))


In [None]:
# save youtube Metadata restricted according to YouTube Timeseries 4 filters
# df_yt_metadata_pt_restr.to_csv(PATH_YT_METADATA_RESTRICTED, index=False, sep='\t', compression='gzip')
!ls -lh {PATH_YT_METADATA_RESTRICTED}

### 1.4 Preprocess Graphtreon timeseries

#### Filter Graphtreon to keep only records which patreon id exists the YouTube metadata

Read restricted youtube metadata file from disk... 

In [None]:
!ls -lh {PATH_YT_METADATA_DST}
!ls -lh {PATH_YT_METADATA_RESTRICTED}

In [None]:
# read filtered youtube metadata file (takes about 2 mins - already loaded in 1.1)
# df_yt_metadata_pt = pd.read_csv(PATH_YT_METADATA_DST, sep="\t", lineterminator='\n', compression='gzip') 
# df_yt_metadata_pt.head(3)

# read restricted youtube metadata file (takes about 10 seconds)
yt_metadata_en_restricted = pd.read_csv(PATH_YT_METADATA_RESTRICTED, sep="\t", lineterminator='\n', compression='gzip') 
yt_metadata_en_restricted.head(3)

In [None]:
print("[Restricted YouTube metadata] number of videos in restricted YouTube metadata:                 {:>10,}".format(len(yt_metadata_en_restricted)))


# get list of all unique patreon ids in df_yt_metadata_pt
yt_patreon_list = df_yt_metadata_pt.patreon_id.unique()
yt_patreon_list_restricted = yt_metadata_en_restricted.patreon_id.unique()

print("[Restricted YouTube metadata] total number of unique patreon ids:                              {:>10,}".format(len(yt_patreon_list)))

In [None]:
!ls -lh {PATH_GT_TIMESERIES_SRC}

In [None]:
def json_escape(str):
    """
    replace new line special character by a space
    """
    return str.replace("\\n", " ")

In [None]:
# Filter Graphtreon dataset to keep only records which patreon id exists the YouTube metadata (yt_patreon_list) (takes about 2 mins)
input_file_path = PATH_GT_TIMESERIES_SRC

TEST_RUN = False
MAX_ITER = 1_000

nb_rows_read = 0
JSONDecodeErrors_cnt = 0 
lines_json = []    

# pattern = re.compile(r'patreon.com/\w*')

compressed_file_size = os.stat(input_file_path).st_size
print("Compressed file size is :                 {:>4,.2f} GB".format(compressed_file_size / 2**30))

uncompressed_file_size = 13_310_000_000 # (=12.4 GB)
print("Estimated Uncompressed file size is :     {:>4,.2f} GB".format(uncompressed_file_size / 2**30))

start = timeit.default_timer()

# Load tqdm with size counter instead of file counter
with tqdm(total=uncompressed_file_size, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
    with gzip.open(input_file_path, "r") as f:
        for i, line_byte in enumerate(f): 

            read_bytes = len(line_byte)
            if read_bytes:
                pbar.set_postfix(file=input_file_path[len(DATA_FOLDER):], refresh=False)
                pbar.update(read_bytes)

            nb_rows_read += 1
            
            # set a maximum iteration for tests
            if TEST_RUN == True:
                if nb_rows_read >= MAX_ITER:
                    break

            # convert bytes into string
            line_str = line_byte.decode("utf-8")
            
            # convert string into json after escaping new line characters
            line_str_esc = json_escape(line_str)
            try:
                line_json = json.loads(line_str_esc)
            except Exception as e:
                JSONDecodeErrors_cnt += 1
                pass
           
            
            # add line if patreon id is exists in df_yt_metadata_pt
            if line_json['patreon'] in yt_patreon_list:
                lines_json.append(line_json)

stop = timeit.default_timer()
time_diff = stop - start

print()
print("==> total time to read and filter graphtreon time series:                     {:>10.0f} min. ({:.0f}s.)".format(time_diff/60, time_diff)) 
print("==> number of rows read:                                                      {:>10,}".format(nb_rows_read))
print("==> number of patreon ids that exist in both GTts and YT metadata  :          {:>10,} ({:.2%})".format(len(lines_json), len(lines_json)/nb_rows_read ))
print("==> number of skipped rows (JSONDecodeErrors):                                {:>10,}".format(JSONDecodeErrors_cnt))

# create new dataframe with the filtered lines
df_gt_timeseries_filtered = pd.DataFrame(data=lines_json)

# calculate memory usage of the new dataframe
mem_cons = df_gt_timeseries_filtered.memory_usage(index=True).sum()
print("==> memory usage of new (filtered) dataframe:                                  {:12,.2f} MB ({:,} bytes)".format(mem_cons / 2**20, mem_cons))

In [None]:
# save filtered data to LOCAL SCRATCH FOLDER as a compressed tsv
# df_gt_timeseries_filtered.to_csv(PATH_GT_TIMESERIES_DST, index=False, sep='\t', compression='gzip')
!ls -lh {PATH_GT_TIMESERIES_DST}

In [None]:
# save restricted data to LOCAL SCRATCH FOLDER as a compressed tsv
# df_gt_timeseries_restricted.to_csv(PATH_GT_TIMESERIES_RESTRICTED_DST, index=False, sep='\t', compression='gzip')
# !ls -lh {PATH_GT_TIMESERIES_RESTRICTED_DST}

-------

In [None]:
# declare global variable for size of original GT dataset
GT_final_processed_file_ROWS = 232_269

In [None]:
!ls -lh {PATH_GT_TIMESERIES_DST}

In [None]:
df_gt_timeseries_filtered = pd.read_csv(PATH_GT_TIMESERIES_DST, sep="\t", compression='gzip')
df_gt_timeseries_filtered.head(3)

In [None]:
print("Statistics of loaded pre-filtered Graphtreon Timeseries file:")
print("[Graphtreon Timeseries] Total number of patreon ids:                                                   {:>9,}".format(GT_final_processed_file_ROWS))
print("[Graphtreon Timeseries] Nb of patreon ids that exist in both GT Timeseries and YT metadata restricted: {:>9,} ({:.1%} of GT timeseries dataset)".format(len(df_gt_timeseries_filtered), len(df_gt_timeseries_filtered)/GT_final_processed_file_ROWS))

#### Join GT timeseries with matched channel_id
Add corresponding YT channel id to dataframe \
(join to the channels in the restricted list of channels of the matched dataframe)

In [None]:
df_linked_channels_patreons.head(3)

In [None]:
df_gt_timeseries_filtered.head(1)

In [None]:
# join GT timeseries and matched channels
df_gt_timeseries_merged = df_gt_timeseries_filtered.merge(df_linked_channels_patreons, left_on='patreon', right_on='patreon_id')
df_gt_timeseries_merged.head(1)

#### Extract daily earnings per patreon account

In [None]:
# get list of all unique patreon ids in df_gt_timeseries_filtered
yt_gt_patreon_list_filtered = df_gt_timeseries_filtered.patreon.unique()
print("number of filtered patreon ids", len(yt_gt_patreon_list_filtered))

In [None]:
df_gt_timeseries_filtered.head(3)

In [None]:
# From the Graphtreon source dataset, for each channel, extract the date and earnings from “dailyGraph_earningsSeriesData” (takes about 3 mins)
input_file_path = PATH_GT_TIMESERIES_SRC

TEST_RUN = False
# MAX_ITER = 100

nb_rows_read = 0
valid_predicate_count = 0
JSONDecodeErrors_cnt = 0 
dailyEarningsError_cnt = 0 
lines_json = []    

compressed_file_size = os.stat(input_file_path).st_size
print("Compressed file size is :                 {:>8,.2f} GB".format(compressed_file_size / 2**30))

uncompressed_file_size = 13_310_000_000
print("Estimated Uncompressed file size is :     {:>8,.2f} GB".format(uncompressed_file_size / 2**30))

start = timeit.default_timer()

# Load tqdm with size counter instead of file counter
with tqdm(total=uncompressed_file_size, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
    with gzip.open(input_file_path, "r") as f:
        for i, line in enumerate(f): 

            read_bytes = len(line)
            if read_bytes:
                pbar.set_postfix(file=input_file_path[len(DATA_FOLDER)+1:], refresh=False)
                pbar.update(read_bytes)

            nb_rows_read += 1
            
            # set a maximum iteration for tests
            if TEST_RUN == True:
                if nb_rows_read >= MAX_ITER:
                    break
    
            try:
                line_json = json.loads(line)
            except Exception as e:
                JSONDecodeErrors_cnt += 1
                continue
                
            # add line if patreon id is exists in df_yt_metadata_pt
            if line_json['patreon'] in yt_gt_patreon_list_filtered:
                valid_predicate_count += 1
                
                # Use ast.literal_eval to convert string of lists, to list of list
                dailyGraph_earningsSeriesData = line_json.get('dailyGraph_earningsSeriesData')
                
                if dailyGraph_earningsSeriesData:
                    daily_earnings = ast.literal_eval(dailyGraph_earningsSeriesData)
                else:
                    daily_earnings = [[np.nan, np.nan]]
                                            
                for daily_earning in daily_earnings:
                    # case where there are multiple tuples per row
                    if isinstance(daily_earning, list):
                        date = daily_earning[0]
                        earning = daily_earning[1]
                        lines_json.append({
                            'creatorName':   line_json.get('creatorName'), 
                            'creatorRange':  line_json.get('creatorRange'), 
                            'startDate':     line_json.get('startDate'),
                            'categoryTitle': line_json.get('categoryTitle'),
                            'patreon':       line_json.get('patreon'),
                            'date':          date,
                            'earning':       earning
                        })
                    else:
                        dailyEarningsError_cnt += 1
                        print(">>>> dailyEarningsError - skipped line value: ")
                        print(line_json.get('creatorName'), line_json.get('creatorRange'), line_json.get('startDate'), line_json.get('categoryTitle'), line_json.get('patreon'), daily_earnings)

stop = timeit.default_timer()
time_diff = stop - start

print()
print("==> total time to read and filter graphtreon time series:                      {:>10.0f} min. ({:.0f}s.)".format(time_diff/60, time_diff)) 
print("==> number of rows read:                                                       {:>10,}".format(nb_rows_read))
print("==> number of patreon ids that exist in both GTts and restricted YT metadata:  {:>10,} ({:.2%})".format(valid_predicate_count, valid_predicate_count/nb_rows_read ))
print("==> number of skipped rows (JSONDecodeErrors):                                 {:>10,}".format(JSONDecodeErrors_cnt))
print("==> number of skipped rows (dailyEarningsError):                               {:>10,}".format(dailyEarningsError_cnt))

# create new dataframe with the filtered lines
df_dailyGraph_earningsSeries = pd.DataFrame(data=lines_json)

In [None]:
# From the Graphtreon dataset, for each channel, extract the date and earnings from “dailyGraph_earningsSeriesData” (takes about 3 mins)
input_file_path = PATH_GT_TIMESERIES_SRC

TEST_RUN = False
# MAX_ITER = 100

nb_rows_read = 0
valid_predicate_count = 0
JSONDecodeErrors_cnt = 0 
dailyEarningsError_cnt = 0 
lines_json = []    

compressed_file_size = os.stat(input_file_path).st_size
print("Compressed file size is :                 {:>8,.2f} GB".format(compressed_file_size / 2**30))

uncompressed_file_size = 13_310_000_000
print("Estimated Uncompressed file size is :     {:>8,.2f} GB".format(uncompressed_file_size / 2**30))

start = timeit.default_timer()

# Load tqdm with size counter instead of file counter
with tqdm(total=uncompressed_file_size, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
    with gzip.open(input_file_path, "r") as f:
        for i, line in enumerate(f): 

            read_bytes = len(line)
            if read_bytes:
                pbar.set_postfix(file=input_file_path[len(DATA_FOLDER)+1:], refresh=False)
                pbar.update(read_bytes)

            nb_rows_read += 1
            
            # set a maximum iteration for tests
            if TEST_RUN == True:
                if nb_rows_read >= MAX_ITER:
                    break
    
            try:
                line_json = json.loads(line)
            except Exception as e:
                JSONDecodeErrors_cnt += 1
                continue
                
            # add line if patreon id is exists in df_yt_metadata_pt
            if line_json['patreon'] in yt_gt_patreon_list_filtered:
                valid_predicate_count += 1
                
                # Use ast.literal_eval to convert string of lists, to list of list
                dailyGraph_earningsSeriesData = line_json.get('dailyGraph_earningsSeriesData')
                
                if dailyGraph_earningsSeriesData:
                    daily_earnings = ast.literal_eval(dailyGraph_earningsSeriesData)
                else:
                    daily_earnings = [[np.nan, np.nan]]
                                            
                for daily_earning in daily_earnings:
                    # case where there are multiple tuples per row
                    if isinstance(daily_earning, list):
                        date = daily_earning[0]
                        earning = daily_earning[1]
                        lines_json.append({
                            'creatorName':   line_json.get('creatorName'), 
                            'creatorRange':  line_json.get('creatorRange'), 
                            'startDate':     line_json.get('startDate'),
                            'categoryTitle': line_json.get('categoryTitle'),
                            'patreon':       line_json.get('patreon'),
                            'date':          date,
                            'earning':       earning
                        })
                    else:
                        dailyEarningsError_cnt += 1
                        print(">>>> dailyEarningsError - skipped line value: ")
                        print(line_json.get('creatorName'), line_json.get('creatorRange'), line_json.get('startDate'), line_json.get('categoryTitle'), line_json.get('patreon'), daily_earnings)

stop = timeit.default_timer()
time_diff = stop - start

print()
print("==> total time to read and filter graphtreon time series:                      {:>10.0f} min. ({:.0f}s.)".format(time_diff/60, time_diff)) 
print("==> number of rows read:                                                       {:>10,}".format(nb_rows_read))
print("==> number of patreon ids that exist in both GTts and restricted YT metadata:  {:>10,} ({:.2%})".format(valid_predicate_count, valid_predicate_count/nb_rows_read ))
print("==> number of skipped rows (JSONDecodeErrors):                                 {:>10,}".format(JSONDecodeErrors_cnt))
print("==> number of skipped rows (dailyEarningsError):                               {:>10,}".format(dailyEarningsError_cnt))

# create new dataframe with the filtered lines
df_dailyGraph_earningsSeries = pd.DataFrame(data=lines_json)

In [None]:
# check for NaN values
df_dailyGraph_earningsSeries[df_dailyGraph_earningsSeries.isna().any(axis=1)]

In [None]:
# save filtered data to LOCAL SCRATCH FOLDER as a compressed tsv (5.3Mb)
# df_dailyGraph_earningsSeries.to_csv(PATH_GT_TIMESERIES_EARNING_DST, index=False, sep='\t', compression='gzip')
!ls -lh {PATH_GT_TIMESERIES_EARNING_DST}

#### Extract daily patrons per patreon account

In [None]:
# From the Graphtreon dataset, for each channel, extract the date and patrons from “dailyGraph_patronSeriesData” (takes about 3 mins)
input_file_path = PATH_GT_TIMESERIES_SRC

TEST_RUN = False
MAX_ITER = 1000

nb_rows_read = 0
valid_predicate_count = 0
JSONDecodeErrors_cnt = 0 
dailyPatronsError_cnt = 0 
lines_json = []    

compressed_file_size = os.stat(input_file_path).st_size
print("Compressed file size is :                 {:>8,.2f} GB".format(compressed_file_size / 2**30))

uncompressed_file_size = 13_310_000_000
print("Estimated Uncompressed file size is :     {:>8,.2f} GB".format(uncompressed_file_size / 2**30))

start = timeit.default_timer()

# Load tqdm with size counter instead of file counter
with tqdm(total=uncompressed_file_size, unit='B', unit_scale=True, unit_divisor=1024) as pbar:
    with gzip.open(input_file_path, "r") as f:
        for i, line in enumerate(f): 

            read_bytes = len(line)
            if read_bytes:
                pbar.set_postfix(file=input_file_path[len(DATA_FOLDER)+1:], refresh=False)
                pbar.update(read_bytes)

            nb_rows_read += 1
            
            # set a maximum iteration for tests
            if TEST_RUN == True:
                if nb_rows_read >= MAX_ITER:
                    break
    
            try:
                line_json = json.loads(line)
            except Exception as e:
                JSONDecodeErrors_cnt += 1
                continue
                
            # add line if patreon id is exists in df_yt_metadata_pt
            if line_json['patreon'] in yt_gt_patreon_list_filtered:
                valid_predicate_count += 1
                
                # Use ast.literal_eval to convert string of lists, to list of list
                dailyGraph_patronSeriesData = line_json.get('dailyGraph_patronSeriesData')
                
                if dailyGraph_patronSeriesData:
                    daily_patrons = ast.literal_eval(dailyGraph_patronSeriesData)
                else:
                    daily_patrons = [[np.nan, np.nan]]
                                            
                for daily_patron in daily_patrons:
                    # case where there are multiple tuples per row
                    if isinstance(daily_patron, list):
                        date = daily_patron[0]
                        patrons = daily_patron[1]
                        lines_json.append({
                            'creatorName':   line_json.get('creatorName'), 
                            'creatorRange':  line_json.get('creatorRange'), 
                            'startDate':     line_json.get('startDate'),
                            'categoryTitle': line_json.get('categoryTitle'),
                            'patreon':       line_json.get('patreon'),
                            'date':          date,
                            'patrons':       patrons
                        })
                    else:
                        dailyPatronsError_cnt += 1
                        print(">>>> dailyPatronsError - skipped line value: ")
                        print(line_json.get('creatorName'), line_json.get('creatorRange'), line_json.get('startDate'), line_json.get('categoryTitle'), line_json.get('patreon'), daily_patrons)

stop = timeit.default_timer()
time_diff = stop - start

print()
print("==> total time to read and filter graphtreon time series:                      {:>10.0f} min. ({:.0f}s.)".format(time_diff/60, time_diff)) 
print("==> number of rows read:                                                       {:>10,}".format(nb_rows_read))
print("==> number of patreon ids that exist in both GTts and restricted YT metadata:  {:>10,} ({:.2%})".format(valid_predicate_count, valid_predicate_count/nb_rows_read ))
print("==> number of skipped rows (JSONDecodeErrors):                                 {:>10,}".format(JSONDecodeErrors_cnt))
print("==> number of skipped rows (dailyPatronsError):                               {:>10,}".format(dailyPatronsError_cnt))

# create new dataframe with the filtered lines
df_dailyGraph_patronsSeries = pd.DataFrame(data=lines_json)

In [None]:
# check for NaN values
# df_dailyGraph_patronsSeries[df_dailyGraph_patronsSeries.isna().any(axis=1)]

In [None]:
# save filtered data to LOCAL SCRATCH FOLDER as a compressed tsv (7.1Mb)
# df_dailyGraph_patronsSeries.to_csv(PATH_GT_TIMESERIES_PATRONS_DST, index=False, sep='\t', compression='gzip')
!ls -lh {PATH_GT_TIMESERIES_PATRONS_DST}

#### Merge extracted times series of daily earnings and daily patrons

In [None]:
!ls -lh {PATH_GT_TIMESERIES_EARNING_DST}

In [None]:
# read dailyGraph_earningsSeries file from disk and convert dates
df_dailyGraph_earningsSeries = pd.read_csv(PATH_GT_TIMESERIES_EARNING_DST, sep="\t", compression='gzip')
# df_dailyGraph_earningsSeries.date = pd.to_datetime(df_dailyGraph_earningsSeries.date, unit='ms')
df_dailyGraph_earningsSeries.head(3)

In [None]:
!ls -lh {PATH_GT_TIMESERIES_PATRONS_DST}

In [None]:
# read dailyGraph_patronsSeries from disk and convert dates
df_dailyGraph_patronsSeries = pd.read_csv(PATH_GT_TIMESERIES_PATRONS_DST, sep="\t", compression='gzip')
# df_dailyGraph_patronsSeries.date = pd.to_datetime(df_dailyGraph_patronsSeries.date, unit='ms')
df_dailyGraph_patronsSeries.head(3)

In [None]:
# join dailyGraph_earningsSeries with df_dailyGraph_patronsSeries
df_dailyGraph_patrons_and_earnings_Series = df_dailyGraph_earningsSeries.merge(df_dailyGraph_patronsSeries, how='outer')

# convert patrons column to Int64 so it can hold NaN values after outer join
df_dailyGraph_patrons_and_earnings_Series['patrons'] = df_dailyGraph_patrons_and_earnings_Series['patrons'].astype('Int64')
df_dailyGraph_patrons_and_earnings_Series.head()

In [None]:
# save filtered data to LOCAL SCRATCH FOLDER as a compressed tsv (6.2Mb)
# df_dailyGraph_patrons_and_earnings_Series.to_csv(PATH_GT_TIMESERIES_PATRONS_AND_EARNINGS_DST, index=False, sep='\t', compression='gzip')
!ls -lh {PATH_GT_TIMESERIES_PATRONS_AND_EARNINGS_DST}