In [1]:
### IMPORTS ###

import pandas as pd                        # pandas for data analysis
pd.options.mode.chained_assignment = None  # default='warn'
import matplotlib.pyplot as plt            # matplotlib for data visualisation
import json
import zstandard as zstd

In [2]:
### PATHS ###

DIR = "../data/"

#read
METADATA_PATH   = DIR + "original_metadata.jsonl.zst"
PROCESSED_TIMESERIES_PATH = DIR + "processed_timeseries.tsv.zip"
SCORED_CHANNELS_PATH   = DIR + "scored_channels.tsv.zip"

#write
FINAL_METADATA_PATH   = DIR + "final_metadata.tsv.zip"
FINAL_TIMESERIES_PATH = DIR + "final_timeseries.tsv.zip"
FINAL_CHANNELS_PATH   = DIR + "final_channels.tsv.zip"

In [3]:
### READS ###

timeseries = pd.read_csv(PROCESSED_TIMESERIES_PATH)
channels   = pd.read_csv(SCORED_CHANNELS_PATH)

In [4]:
#### READ AND SPLIT INTO SMALLER CSV FILES ###

class zreader:

    def __init__(self, file, chunk_size=16384):
        self.fh = open(file, 'rb')
        self.chunk_size = chunk_size
        self.dctx = zstd.ZstdDecompressor()
        self.reader = self.dctx.stream_reader(self.fh)
        self.buffer = ''

    def readlines(self):
        while True:
            chunk = self.reader.read(self.chunk_size).decode("utf-8", errors="ignore")
            if not chunk:
                break
            lines = (self.buffer + chunk).split("\n")

            for line in lines[:-1]:
                yield line

            self.buffer = lines[-1]

reader = zreader(METADATA_PATH)
metadata = []
df_metadata = pd.DataFrame([])

idx = 0
store_idx = 0
save_idx = 0

min_upload_date, max_upload_date = pd.to_datetime("01-01-2015"), pd.to_datetime("09-30-2019")
for line in reader.readlines():
    line_dict = json.loads(line)
    
    if (pd.to_datetime(line_dict["upload_date"]) < max_upload_date) & \
       (pd.to_datetime(line_dict["upload_date"]) > min_upload_date) & \
       (line_dict['channel_id'] in channels['channel'].values):
        
        del line_dict['crawl_date']
        del line_dict['categories']
        
        metadata.append(line_dict)
    idx += 1
    if idx%100000 == 0:
        print(idx)
    
    #store in a dataframe every 1 million
    if len(metadata) >= 1000000:
        if store_idx < 9 : print(" - STORE", store_idx)
        df_metadata = pd.concat([df_metadata, pd.DataFrame(metadata)])
        metadata = []
        store_idx += 1
        
        
    #save dataframe every 10 million
    if len(df_metadata) >= 10000000:
        print(" - SAVE ", save_idx)
        
        S_METADATA_PATH = DIR + "metadata/_raw_yt_metadata" + str(save_idx) + ".tsv.zip"
    
        df_metadata.to_csv(S_METADATA_PATH, index=False, compression={'method':'zip'})
        df_metadata = pd.DataFrame([])
        store_idx = 0
        save_idx += 1
        
if store_idx < 9 : print(" - STORE", store_idx)
df_metadata = pd.concat([df_metadata, pd.DataFrame(metadata)])
metadata = []
store_idx += 1

print(" - SAVE ", save_idx)
        
S_METADATA_PATH = DIR + "metadata/_raw_yt_metadata" + str(save_idx) + ".tsv.zip"
        
df_metadata.to_csv(S_METADATA_PATH, index=False, compression={'method':'zip'})
df_metadata = pd.DataFrame([])
store_idx = 0
save_idx += 1

100000
200000
300000
400000
500000
600000
700000
800000
900000
1000000
1100000
1200000
1300000
1400000
1500000


In [None]:
# TREAT THE METDATA FILES
for i in range(save_idx):
    print("Start metadata ", i)
    
    # PATH
    METADATA_PATH = DIR + "metadata/_raw_yt_metadata" + str(i) + ".tsv.zip"
    ENT_METADATA_PATH = DIR + "metadata/ent_metadata" + str(i) + ".tsv.zip"
    print("Path done - ")
    
    # READ
    metadata = pd.read_csv(METADATA_PATH)
    print("Read done - ")
    
    # TREATMENT
    ent_metadata = metadata.rename(columns={'channel_id':'channel'})
    print("Treatment done - ")
    
    # WRITE
    ent_metadata.to_csv(ENT_METADATA_PATH, index=False, compression={'method':'zip'})
    print("Write done -")
    
    print("Done metadata ", i)

Start metadata  0
Path done - 
Read done - 
Treatment done - 
Write done -
Done metadata  0
Start metadata  1
Path done - 


FileNotFoundError: [Errno 2] No such file or directory: '../data/metadata/_raw_yt_metadata1.tsv.zip'

In [None]:
### STORE ALL DATAFRAMES IN ONLY ONE ###

ent_metadata = pd.DataFrame()
for i in range(save_idx):
    print("Start metadata ", i)
    
    # PATH
    METADATA_PATH = DIR + "metadata/ent_metadata" + str(i) + ".tsv.zip"
    print("Path done - ")
    
    # READ
    metadata = pd.read_csv(METADATA_PATH)
    print("Read done - ")
    
    # CONCAT
    ent_metadata = pd.concat([ent_metadata, metadata], ignore_index=True)
    print("Concat done - ")


# WRITE
ENT_METADATA_PATH = DIR + "ent_metadata_en.tsv.zip"
ent_metadata.to_csv(ENT_METADATA_PATH, index=False, compression={'method':'zip'})
print("Write done -")

Start metadata  0
Path done - 
Read done - 
Concat done - 
Start metadata  1
Path done - 


FileNotFoundError: [Errno 2] No such file or directory: '../data/metadata/ent_metadata1.tsv.zip'

In [None]:
### KEEP COMMON CHANNELS BETWEEN ALL DATAFRAMES ###

#treatment
channel_ids = metadata[['channel']].drop_duplicates()
channels = pd.merge(channels, channel_ids)
timeseries = pd.merge(timeseries, channel_ids)
channel_ids = channels[['channel']].drop_duplicates()
ent_metadata = pd.merge(ent_metadata, channel_ids)

In [None]:
### WRITES ###

channels.to_csv(FINAL_CHANNELS_PATH, index=False, compression={'method':'zip'})
timeseries.to_csv(FINAL_TIMESERIES_PATH, index=False, compression={'method':'zip'})
ent_metadata.to_csv(FINAL_METADATA_PATH, index=False, compression={'method':'zip'})