In [1]:
import numpy as np
import pandas as pd
import sys, os, json, pickle
import random
import re
import functools
import itertools
import concurrent.futures

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('max_columns', None)

In [2]:
# Set up source paths
channel_src_path = "youtube_data/channels"
video_src_path = "youtube_data/videos"
comment_src_path = "youtube_data/comments"

# Set up destination paths
channel_des_path = "dataframes"
video_des_path = "dataframes"
comment_des_path = "dataframes"

In [3]:
# Remove unwanted directory (if any)
for src_path in (channel_src_path, video_src_path, comment_src_path):
    unwanted_dir = os.path.join(src_path, ".ipynb_checkpoints")
    if os.path.exists(unwanted_dir):
        os.rmdir(unwanted_dir)

In [4]:
# Check number of files in the source paths
print(f"{'Channels': <9}: {len(os.listdir(channel_src_path))} files")
print(f"{'Videos': <9}: {len(os.listdir(video_src_path))} files")
print(f"{'Comments': <9}: {len(os.listdir(comment_src_path))} files")

Channels : 166 files
Videos   : 217 files
Comments : 35 files


# Merge & Save Channel Data (as a Parquet File)

In [5]:
# Read channel files
channels = []
for i, channel_filename in enumerate(os.listdir(channel_src_path), start=1):
    channel_filepath= os.path.join(channel_src_path, channel_filename)
    print(f"Process {i}-th file: {channel_filename}")
    with open(channel_filepath, "r") as json_file:
        channel_data = json.load(json_file)
        channel_data["topic_categories"] = [channel_data["topic_categories"]] # Convert 1D list to 2D list
        channel_data["all_video_ids"] = [channel_data["all_video_ids"]] # Convert 1D list to 2D list
        channel_data = pd.DataFrame(channel_data)
        channels.append(channel_data)

# Merge channel files
channels = functools.reduce(lambda left, right: pd.concat([left, right], axis="index", ignore_index=True), channels)

Process 1-th file: UCi2GvcaxZCN-61a0co8Smnw.json
Process 2-th file: UCuLKTkYGCXako3wMktwrbkQ.json
Process 3-th file: UCqripRcC8scod22F5NKvLcQ.json
Process 4-th file: UCKBBMJ8YNWQTUKQ8PtGDb2Q.json
Process 5-th file: UCgL6J6oL8F69vm-GcPScmwg.json
Process 6-th file: UCHAL5c5PrCSA4ExAlY_gLuQ.json
Process 7-th file: UCiEHmW7zRRZIjBsOKZ4s5AA.json
Process 8-th file: UC6VKHP606ee6ffKwKmBHSig.json
Process 9-th file: UCgDQKFV2rMNzTE8Y3rHMVbQ.json
Process 10-th file: UCRevYqA7N-NrSCGSm5k-KlA.json
Process 11-th file: UCmArFxfDybtCbo0Fa0Z2xxA.json
Process 12-th file: UCampQmnDSpJNJcs_5m57W4w.json
Process 13-th file: UCxUzQ3wu0oJP_8YLWt71WgQ.json
Process 14-th file: UCtcaZ5FUqaNXGX6xhpiGPQA.json
Process 15-th file: UC_XRq7JriAORvDe1lI1RAsA.json
Process 16-th file: UCjqAuT-giO8fjQqnNl1dDUw.json
Process 17-th file: UCmV89JP1Fx5jTAMYjmpJMQg.json
Process 18-th file: UCqxRJpX52Q0nxcHf7hqdJMA.json
Process 19-th file: UCEfetJrzg6OcXWWuX8uhdhw.json
Process 20-th file: UC9sB4o4zIoQttSvvQu03W4g.json
Process 2

In [6]:
# Preprocess channel dataframe
# 1) Convert datetime
channels["published_at"] = pd.to_datetime(channels["published_at"]).dt.tz_convert(None)
channels["crawled_datetime"] = pd.to_datetime(channels["crawled_datetime"])
# 2) Clean keyword column
channels["keywords"] = (channels["keywords"]
         .str.split(r"""((?:[^ "]|"[^"]*")+)""")                # Split on spaces except those in double quotes
         .apply(lambda x: x[1::2] if x is not None else [])      # Get every two elements (to remove elements that are spaces or empty strings)
         .apply(lambda x: [s.replace('"', '') for s in x])       # Remove elements that are double quotes          
)
# 3) Clean topic_categories column
channels["topic_categories"] = [[link.split("/")[-1] for link in lst] 
                                    if lst is not None else None 
                                for lst in channels["topic_categories"].values]
# 3) Cast non-string data types
channels["country"] = channels["country"].astype("category")

channels["view_count"] = channels["view_count"].astype(int)
channels["subscriber_count"] = channels["subscriber_count"].astype(int)
channels["video_count"] = channels["video_count"].astype(int)

channels["ishidden_subscriber_count"] = channels["ishidden_subscriber_count"].astype(bool)

channels = channels.rename(columns={"channelTitle": "channel_title"}) # To be removed

In [7]:
len(os.listdir(channel_src_path)) == channels.shape[0]

True

In [23]:
channels["title"] = channels.title.str.replace("Annie Singing", "魚乾")

In [28]:
# Save as parquet
channel_des_filepath = os.path.join(channel_des_path, "channels.parquet")
channels.to_parquet(channel_des_filepath)

In [29]:
channels.all_video_ids.apply(len).sum()

131080

# Merge & Save Video Data (as a Parquet File)

In [10]:
# Set up paths to read from
new_video_src_path = [video_path for video_path in os.listdir(video_src_path) if video_path.endswith("_new.json")]
duplicate_video_paths = [new_vid_path[:-9] + ".json" for new_vid_path in new_video_src_path]
all_vid_paths_no_duplicates = [vid_path for vid_path in os.listdir(video_src_path) if vid_path not in duplicate_video_paths]

In [11]:
# Check if duplicate path correcty removed
len(duplicate_video_paths)
len(os.listdir(video_src_path))
len(all_vid_paths_no_duplicates)

166

217

166

In [13]:
%%time
# Read video files
videos = []
def load_videos(args):
    i, video_filename = args
    print(f"Process {i}-th file: {video_filename}\n", end="")
    video_filepath= os.path.join(video_src_path, video_filename)
    with open(video_filepath, "r") as json_file:
        video_data = json.load(json_file) # I/O-bound task
        for video in video_data:
            if not video:
                continue
            video["tags"] = [video["tags"]] # Convert 1D list to 2D list
            video = pd.DataFrame(video).rename(columns={ # Rename columns
                "channelTitle": "channel_title",
                "likeCount": "like_count", 
                "viewCount": "view_count", 
                "commentCount": "comment_count"
            })
            videos.append(video)
    
with concurrent.futures.ThreadPoolExecutor(max_workers=len(all_vid_paths_no_duplicates)) as executer:
    _ = executer.map(load_videos, enumerate(all_vid_paths_no_duplicates, start=1))

Process 1-th file: UCa7KELKe1u9mQ5jWMdIB-HQ_videos_new.json
Process 2-th file: UC9YOQFPfEUXbulKDtxeqqBA_videos_new.json
Process 3-th file: UCOhvluFPMWqPxgVH3edamuw_videos_new.json
Process 4-th file: UCgL6J6oL8F69vm-GcPScmwg_videos_new.json
Process 5-th file: UCWGGwv7agoMmd3BAu4Fv3jQ_videos_new.json
Process 6-th file: UCclEnpUoaP35pnvWvYmnODA_videos_new.json
Process 7-th file: UCJ6rlGjE5pO0SqMOfxBUrhQ_videos_new.json
Process 8-th file: UC0SkNQXPJ60hKEFubOz0fDA_videos_new.json
Process 9-th file: UCDrswN-SqWh7Kii62h9aXGA_videos_new.json
Process 10-th file: UCPwxSX0DYDMQxCvgfeVDv_g_videos_new.json
Process 11-th file: UCMIf0lFsh_US3VWCClK7IcQ_videos_new.json
Process 12-th file: UCESIRUtUaGfqWCgljB1EB2A_videos_new.json
Process 13-th file: UCMy0L5Y-h0-s8t4XZs49gXw_videos_new.json
Process 14-th file: UC0KD8LvHG4CgohsRZzzCQSw_videos_new.json
Process 15-th file: UC6ZHLoydvIPevb_A5M88L4A_videos_new.json
Process 16-th file: UCeB5UXWltaHtGoxkwGXLzpg_videos_new.json
Process 17-th file: UCZVCbj9weVNA

In [14]:
%%time
# Merge video files
videos = pd.concat(videos, ignore_index=True) # O(N)
# videos = functools.reduce(lambda left, right: pd.concat([left, right], axis="index", ignore_index=True), videos) # O(N ** 2): 55 min

CPU times: user 1min 4s, sys: 539 ms, total: 1min 4s
Wall time: 1min 4s


In [15]:
%%time
# Preprocess videos dataframe
# 1) Convert datetime
videos["published_at"] = pd.to_datetime(videos["published_at"]).dt.tz_localize(None)
videos["crawled_datetime"] = pd.to_datetime(videos["crawled_datetime"])

CPU times: user 287 ms, sys: 0 ns, total: 287 ms
Wall time: 285 ms


In [16]:
%%time
# 2) Clean duration column & Cast to pd.Timedelta
videos["duration"] = (
    pd.to_timedelta(videos["duration"].str.extract(r"(\d+)H")[0].astype(float), unit="hour", errors="coerce").fillna(pd.Timedelta(hours=0))
  + pd.to_timedelta(videos["duration"].str.extract(r"(\d+)M")[0].astype(float), unit="minute", errors="coerce").fillna(pd.Timedelta(minutes=0))
  + pd.to_timedelta(videos["duration"].str.extract(r"(\d+)S")[0].astype(float), unit="seconds", errors="coerce").fillna(pd.Timedelta(seconds=0))
).dt.total_seconds().astype(int)

CPU times: user 1.12 s, sys: 0 ns, total: 1.12 s
Wall time: 1.11 s


In [17]:
%%time
# 3) Cast non-string data types
videos["category_id"] = videos["category_id"].astype("category")
videos["default_language"] = videos["default_language"].astype("category")
videos["default_audio_language"] = videos["default_audio_language"].astype("category")
videos["topic_categories"] = videos["topic_categories"].astype("category")
videos["definition"] = videos["definition"].astype("category")
videos["upload_status"] = videos["upload_status"].astype("category")
videos["privacy_status"] = videos["privacy_status"].astype("category")

videos["like_count"] = videos["like_count"].astype(float).astype(pd.Int64Dtype())
videos["view_count"] = videos["view_count"].astype(float).astype(pd.Int64Dtype())
videos["comment_count"] = videos["comment_count"].astype(float).astype(pd.Int64Dtype())

videos["caption"] = videos["caption"].astype(bool)

CPU times: user 713 ms, sys: 0 ns, total: 713 ms
Wall time: 712 ms


In [18]:
# Check if becomes more & is subset of previous version
video_parquet = pd.read_parquet(os.path.join(video_des_path, "videos.parquet"))
video_parquet.shape
videos.shape
set(video_parquet["video_id"]).issubset(set(videos["video_id"]))

(124535, 24)

(131048, 24)

False

In [19]:
# Get missing videos
missing_videos = video_parquet[~video_parquet["video_id"].isin(videos["video_id"])].copy()
# Check if get all missing videos
len(missing_videos)
len(set(video_parquet["video_id"]).difference(set(videos["video_id"])))

13

13

In [20]:
set(videos["video_id"]).issubset(set(pd.concat([videos, missing_videos])["video_id"]))

True

In [21]:
# Add missing videos
videos = pd.concat([videos, missing_videos])

In [38]:
# Check if all channels' videos are captured
unique_channels = set(channels.title.sort_values())
video_channels = set(videos.channel_title.sort_values().unique())
video_channels.difference(unique_channels)
unique_channels.difference(video_channels)
unique_channels.symmetric_difference(video_channels)

set()

set()

set()

In [36]:
videos["channel_title"] = videos.channel_title.str.replace("Annie Singing", "魚乾")

In [39]:
# Save as parquet
video_des_filepath = os.path.join(video_des_path, "videos.parquet")
videos.to_parquet(video_des_filepath)

In [40]:
pd.read_parquet(os.path.join(video_des_path, "videos.parquet")).shape

(131061, 24)

# Merge & Save Comment Data as a Parquet File
Note that the comment data is not used in this research.<br>
The code below is only kept for reference.

In [30]:
# Set up paths to read from
new_comment_src_path = [comment_path for comment_path in os.listdir(comment_src_path) if comment_path.endswith("_new.json")]
duplicate_comment_paths = [new_comment_path[:-9] + ".json" for new_comment_path in new_comment_src_path]
all_comment_paths_no_duplicates = [comment_path for comment_path in os.listdir(comment_src_path) if comment_path not in duplicate_comment_paths]

In [31]:
# Check if duplicate path correcty removed
len(duplicate_comment_paths)
len(os.listdir(comment_src_path))
len(all_comment_paths_no_duplicates)

3

35

32

In [32]:
# Read comment files
comments = []
for i, comment_filename in enumerate(all_comment_paths_no_duplicates):
    print(f"Process {i}-th file: {comment_filename}")
    comment_filepath= os.path.join(comment_src_path, comment_filename)
    with open(comment_filepath, "r") as json_file:
        try:
            comment_data = json.load(json_file)
        except json.decoder.JSONDecodeError:
            print(f"JSONDecodeError while processing {i}-th file: {comment_filename}")
            continue
        for video_id, comment in comment_data:
            comment = pd.DataFrame(comment)
            comments.append(comment)

# Merge comment files
comments = functools.reduce(lambda left, right: pd.concat([left, right], axis="index", ignore_index=True), comments)

# Preprocess comments dataframe
# 1) Convert datetime
comments["published_at"] = pd.to_datetime(comments["published_at"]).dt.tz_localize(None)
comments["updated_at"] = pd.to_datetime(comments["updated_at"]).dt.tz_localize(None)
comments["crawled_datetime"] = pd.to_datetime(comments["crawled_datetime"])
# 2) Cast non-string data types
comments["reply_count"] = comments["reply_count"].astype(pd.Int16Dtype())
comments["like_count"] = comments["like_count"].astype(pd.Int64Dtype())

Process 0-th file: UCpSB70OL7IO7RnZCj_7MQDw_comments.json
Process 1-th file: UCuLKTkYGCXako3wMktwrbkQ_comments.json
Process 2-th file: UC6IMF6xi_MZ3jA1wRlPQDLA_comments.json
Process 3-th file: UCbIJeyl_va8MG2xx0q4Uobg_comments.json
Process 4-th file: UCxUzQ3wu0oJP_8YLWt71WgQ_comments.json
Process 5-th file: UCJWXJTaLqI5KBnJuaXbCaTg_comments.json
Process 6-th file: UCGpNjY0Xq2GJLXh4OOX1LOA_comments.json
Process 7-th file: UCEfetJrzg6OcXWWuX8uhdhw_comments.json
Process 8-th file: UCpgt8SEyAy5tbr9BzVK8Lsg_comments.json
Process 9-th file: UCvcZO0cvDT3rzKtfakL3R5Q_comments.json
Process 10-th file: UCjhwHd3mgmqm0ONm0bXKmng_comments.json
Process 11-th file: UCiWXd0nmBjlKROwzMyPV-Nw_comments.json
Process 12-th file: UCDrswN-SqWh7Kii62h9aXGA_comments.json
Process 13-th file: UC3LBFXbWtEBdOOUb8-qJm9Q_comments.json
Process 14-th file: UCr90FXGOO8nAE9B6FAUeTNA_comments.json
Process 15-th file: UC9i2Qgd5lizhVgJrdnxunKw_comments.json
Process 16-th file: UCeo3JwE3HezUWFdVcehQk9Q_comments.json
Process

In [33]:
# Check if become more & is subset of previous version
comment_parquet = pd.read_parquet(os.path.join(comment_des_path, "comments.parquet"))
comment_parquet.shape
comments.shape
set(comment_parquet["comment_id"]).issubset(set(comments["comment_id"]))

(721706, 11)

(1720579, 11)

False

In [35]:
# Get missing comments
missing_comments = comment_parquet[~comment_parquet["comment_id"].isin(comments["comment_id"])].copy()
# Check if get all missing comments
len(missing_comments)
len(set(comment_parquet["comment_id"]).difference(set(comments["comment_id"])))

2832

2832

In [44]:
comments.query(f"comment_id == \'{missing_comments['comment_id'].sample(1).values[0]}\'")

Unnamed: 0,comment_id,video_id,parent_id,reply_count,text_original,like_count,published_at,updated_at,crawled_datetime,author_channel_id,author_display_name


In [45]:
full_comments = pd.concat([comments, missing_comments])
len(full_comments) == len(missing_comments) + len(comments)
len(full_comments)
full_comments.comment_id.nunique()

True

1723411

1723411

In [47]:
# Save as parquet
comment_des_filepath = os.path.join(comment_des_path, "comments.parquet")
full_comments.to_parquet(comment_des_filepath)

In [48]:
pd.read_parquet(os.path.join(comment_des_path, "comments.parquet")).shape

(1723411, 11)