# Data cleaning

Now we will proceed to the data cleanin phase, the raw data that we have is not quite ready for analysis just yet. We have a couple of issues:
1) Nested structures
2) Duplicates
3) Missing cells
4) unfriendly names

If you have taken a peek at the `ndjson` generated in the extraction process one record may look like this:
```json
{
    "id": "id",
    "snippet": {
        "channelId": "channelId",
        "textDisplay": "textDisplay",
        "authorDisplayName": "authorDisplayName",
        "authorChannelId": {
            "value": "value"
            },
        "likeCount": 0,
        "publishedAt": "2021-06-15T10:57:22Z"
        },
    "totalReplyCount": 0, // missing in replies
    "videoId": "videoId", // missing in replies
    "parentId": "parentId" // missing in top level comments
}
```
Replies for example, are missing the `videoId` field, and have an additional column for `parentId`, the top level comment that the current reply belongs to.
We will address all of these issues in the current notebook, and save our clean dataset to a parquet file, a friendly format to handle tabular data.

## Imports and preliminary work
Similar to the previous notebook, we have to make some imports and configure our notebook so it can read all our files from the project. Here, we will be using utility functions from `src/preprocessing.py`, functions that will allow us to make some analysis further on easier, by preprocessing the text.

We will also work with channel handle in this notebook, so make sure that the `channel_handle` variable is configured in `cofing.py` at the root of the current project. The Paths will also provide some dinamyc pathing based on the channel handle and the date to be processed.

In [1]:
import polars as pl
from datetime import date
import json
import os
import gc
import sys
# load project directory to path
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

from src.preprocessing import *
from paths import Paths
import config

print(f"Working with the channel handle: {config.channel_handle}.")

Working with the channel handle: kurzgesagt.


By default, we are passing today's date to the `Paths` class, but you can change to perform cleaning on a file from a different day. You can give the specific date like
```python 
date_path = date(2025, 9, 7) # Year, Month, day
```
where the first number is the year, the second the month and the third the day.

In [2]:
date_path = date.today()
channel_paths = Paths(channel_handle=config.channel_handle, date_obj=date_path)

## Incorrect tabular format
The objects that are in the `ndjson` file from extraction have a `json` format. But for this notebook we will need our data to be strictly tabular. Let's take a look when we load directly the `ndjson` to a polars dataframe:

In [5]:
preview_df = pl.read_ndjson(channel_paths.raw_comments_file_path, n_rows=5)
preview_df.sample(5)

id,snippet,totalReplyCount,videoId
str,struct[7],i64,str
"""UghQJ7ZtN3UTlXgCoAEC""","{""UCsXVk37bltHxD1rDPwtNM8Q"",""omg so amazing"",null,""@adilsonfilho9622"",{""UCU0pfz9gGQOhL5-OV233bag""},0,""2016-08-10T06:20:05Z""}",0.0,"""ZW3aV7U-aik"""
"""UggDV5rwA11cPXgCoAEC""","{""UCsXVk37bltHxD1rDPwtNM8Q"",""I thought newtron stars where made of single newtrons... CLICKBAIT!!!"",null,""@mememachine1392"",{""UCwKJl2Wl6nd8maT-INzMh7g""},0,""2016-08-09T06:50:38Z""}",0.0,"""ZW3aV7U-aik"""
"""Ugj9OA7MtJ7hSHgCoAEC""","{""UCsXVk37bltHxD1rDPwtNM8Q"",""Im confused, so when a star dies it supernovas and then always becomes a neutron star? Or is neutron star only one option for a supernova'd star"",null,""@kaitlyngardner4478"",{""UCvOD2fgEyjA3Wf81_02A5Yg""},0,""2016-08-08T02:32:05Z""}",4.0,"""ZW3aV7U-aik"""
"""Ugj9OA7MtJ7hSHgCoAEC.8HIgs_KB8…","{""UCsXVk37bltHxD1rDPwtNM8Q"",""It's one possibility. It mostly depends on how big the star was when it stopped fusing things in its core. It's really quite fascinating stuff, I end up getting stuck reading about what we know of stars' lives a lot xD"",""Ugj9OA7MtJ7hSHgCoAEC"",""@WindsorMason"",{""UC1IWy8L9PQ0xPm9fbGucAqA""},0,""2016-08-08T03:07:32Z""}",,
"""Ugj9OA7MtJ7hSHgCoAEC.8HIgs_KB8…","{""UCsXVk37bltHxD1rDPwtNM8Q"",""A no-so-big star (like our sun) becomes a white dwarf, a bigger star becomes a neutron star, a even bigger star becomes a black hole. :-)"",""Ugj9OA7MtJ7hSHgCoAEC"",""@Matamune87"",{""UCSGOUpcTk2sVg17j0p_tyJg""},0,""2016-08-08T21:36:54Z""}",,


The dataframe has four columns: `id`, `snippet`, `totalReplyCount` and `videoId`.
For now our problem is the `snippet` column, is of type `struct`. In order to keep working with the dataframe we would need to unnest the `struct` type, and unnest any inner `struct` columns.
We could keep working with with the current dataframe, but to make our lives easier, we will reload the dataframe again with another function.

### Flatten on load
We can use the function `pl.json_normalize` to completely flatten the data on load, this will unnest or undo any `json` levels and any deeper fields will be represented like `firstLevelField.secondLevelField.field`. We load the ndjson into an array first since we can't flatten exactly at load. Instead we will use the function `json_normalize` once the data is fully loaded into array. ⚠️ <span style="color: orange; font-weight: bold;">Be careful, since `pl.json_normalize` is an unstable function (according to polars documentation)</span>. If you run into any problems you can try `pandas.json_normalize` and convert the resulting dataframe from `pandas` to `polars`.

In [4]:
with open(channel_paths.raw_comments_file_path, 'r') as comments:
    data = [json.loads(comment) for comment in comments]

today_df = pl.json_normalize(data)

del data
gc.collect()

0

## Forward Fill videoID
As indicated in the introduction, the replies lack videoId, so in the dataframe they simply show as nulls. But a reply technically does have a videoId, it was simply not there at the extraction process. (The `videoId` for a comment doesn't make part of a `comments` resource in the YouTube Data API v3, if is there for top level comments, is because the `commentThreads` resource, the one responsible for top level comments, does contain `videoId`). Just before deduplication we will perform forward fill `videoId`. Since the data is loaded:
1) top level comment
2) any replies for top level comment

we can be sure that a reply has the top level comment it belongs to immediately above it (or above other replies for the same top level comment). So we can perform forward fill safely, the program will take the first non null `videoId` and propagate it forward (or to the records bellow), if it finds another `videoId` it will propagate the new one found.
Note that if the data is somehow reordered, this can lead to unexpected results.

In [7]:
today_df = today_df.with_columns([
    pl.col("videoId").forward_fill().alias("videoId")
])

## Deduplication
There is a chance of duplicates in the same day of data extraction. Duplicates are not useful for us here, they only indicate that something went wrong at insertion, but never hurts to check, even when we trust in the extraction process. We will perform two deduplication, one for local duplicates, that is, the duplicates that are in the current file. Another duplication will be global deduplication, and for that we will need information about datasets previously processed.

For local deduplication we can trust that the column `id` is the key for that row. If two rows in the dataset share the same `id` we can be sure that the rest of the information for both is going to be exactly the same, so we will deduplicate via the `id` subset.

In [8]:
print(f"Removing a total of {today_df.is_duplicated().sum()} local duplicates.")
today_df = today_df.unique(subset=["id"])

Removing a total of 2586 local duplicates.


### Global duplicates
This process involves checking that we don't process comments that we have processed in previous days. For this reason we will fetch all `comment_id`s from all available clean datasets.

In [None]:
processed_files = channel_paths.list_processed_files()  # Sorted list of parquet files
previous_ids_df = None
previous_ids = None

if processed_files:
    # Read only the 'id' column from all files and concatenate
    previous_ids = [
        pl.read_parquet(file, columns=["comment_id"]) for file in processed_files
    ]
    previous_ids_df = pl.concat(previous_ids).unique()
else:
    previous_ids_df = pl.DataFrame({"comment_id": pl.Series("comment_id", [], pl.Utf8)})

del previous_ids
gc.collect()

7

With all the `comment_id`s at our disposal, we will proceed and perform an `antijoin`. An `antijoin` selects all the data of the first dataset THAT ARE NOT in the second dataset, and that is exactly what we want.

In [10]:
# count gobal duplicates
removed_rows_count = today_df.join(
    previous_ids_df,
    left_on="id",
    right_on="comment_id",
    how="semi"
).height

print(f"Removing a total of {removed_rows_count} global duplicates.")

# remove global duplicates
today_df = today_df.join(
    previous_ids_df,
    left_on="id",
    right_on="comment_id",
    how="anti"
)

Removing a total of 22 global duplicates.


## Renaming & Re-typing
Now we will worry about the names of our variables and their corresponding types. Polars did a good job at infering the types of the data, mostly. There are a couple of issues
with the dates still.

In [11]:
today_df.schema

Schema([('id', String),
        ('totalReplyCount', Int64),
        ('videoId', String),
        ('snippet.channelId', String),
        ('snippet.textDisplay', String),
        ('snippet.authorDisplayName', String),
        ('snippet.authorChannelId.value', String),
        ('snippet.likeCount', Int64),
        ('snippet.publishedAt', String),
        ('snippet.parentId', String)])

We start by removing the names that the normalization caused to our data, remove the `snippet.` prefix, and the `.value` suffix.

In [12]:
def clean_column_names(name: str) -> str:
    return name.replace("snippet.", "").replace(".value", "")

today_df = today_df.rename({col: clean_column_names(col) for col in today_df.columns})

Some python friendly renaming

In [13]:
rename_map = {
    "id": "comment_id",
    "videoId": "video_id",
    "channelId": "channel_id",
    "totalReplyCount": "reply_count",
    "textDisplay": "comment",
    "authorDisplayName": "author",
    "authorChannelId": "author_id",
    "likeCount": "likes",
    "publishedAt": "published_at",
    "parentId": "parent_id"
}

today_df = today_df.rename(rename_map)

Now we fix the date for `published_at`, we set the time zone for "Zulu" which is a synonim for "UTC" since all our dates contain time zone info (the `z` at the end of the string).
We will fill with 0 the `reply_count`, a reply by default lacks that column, so we fill with 0.

(You may wonder if YouTube handles replies for replies, but that is not the case as of for now. You can only post a reply for a top level comment and if you need to "respond" to a reply, users usually tag the user they are responding to, eg. "@user that is great!")

In [14]:
today_df = today_df.with_columns([
    pl.col("published_at").str.to_datetime(time_unit="ms", time_zone="Zulu").alias("published_at")
])

today_df = today_df.with_columns([
    pl.col("reply_count").fill_null(0)
])

In [15]:
today_df.describe()

statistic,comment_id,reply_count,video_id,channel_id,comment,author,author_id,likes,published_at,parent_id
str,str,f64,str,str,str,str,str,f64,str,str
"""count""","""300508""",300508.0,"""300508""","""300508""","""300508""","""300508""","""300508""",300508.0,"""300508""","""160633"""
"""null_count""","""0""",0.0,"""0""","""0""","""0""","""0""","""0""",0.0,"""0""","""139875"""
"""mean""",,0.534538,,,,,,6.635491,"""2019-01-06 22:53:13.401000+00:…",
"""std""",,5.927343,,,,,,178.873217,,
"""min""","""Ugg--0aepx3adXgCoAEC""",0.0,"""21eFwbb48sE""","""UCsXVk37bltHxD1rDPwtNM8Q""","""""","""""","""UC--1tqVJ3uxVrl4EkEhT3xQ""",0.0,"""2013-07-11 15:53:10+00:00""","""Ugg-2ZMgvgmpX3gCoAEC"""
"""25%""",,0.0,,,,,,0.0,"""2016-06-06 15:11:09+00:00""",
"""50%""",,0.0,,,,,,0.0,"""2018-11-21 04:10:27+00:00""",
"""75%""",,0.0,,,,,,1.0,"""2021-03-11 09:15:05+00:00""",
"""max""","""UgzzzsC0zqAUii_PKzZ4AaABAg""",652.0,"""zQGOcOUBi6s""","""UCsXVk37bltHxD1rDPwtNM8Q""","""𥿽""","""@희망의요정""","""UCzzzoJBD8L5LQONWNh2sciA""",44756.0,"""2025-09-10 16:06:02+00:00""","""Ugzzzh2HzhsnmA4AKMl4AaABAg"""


## Derived Columns
Now that our dataframe is mostly in place, we will derive some columns. Our scripts from `src/preprocessing.py` will be handy here.
We will derive the following columns:
- is_reply, for comments that are replies = True
- comment_length, length of the comment text
- word_count, count of the words in comment text
- script, variable to detect the script of the text, for now it only works with `latin` and `korean`
- comment_emojis, list of emojis from the comment text
- emoji_count, count of emojis in comment text (or length of the previous column)

In [16]:
today_df = today_df.with_columns([
    # 1. Is it a reply? (parent_id not null)
    pl.col("parent_id").is_not_null().alias("is_reply"),

    # 2. Comment length in characters
    pl.col("comment").str.len_chars().alias("comment_length"),

    # 3. Word count (count non-space sequences)
    pl.col("comment").str.count_matches(r"\S+").alias("word_count"),

    # 4. Script detection
    pl.col("comment").map_elements(detect_script).alias("script"),

    # 5. Extract emojis
    pl.col("comment").map_elements(extract_emojis, return_dtype=pl.List(pl.Utf8)).alias("comment_emojis"),
])

# 6. Count emojis (length of list column)
today_df = today_df.with_columns([
    pl.col("comment_emojis").list.len().alias("emoji_count")
])

## Saving our clean file
Now we are almost done, we will save our clean file to parquet. Our handy `Paths` class will provide us with the object that has the correct path for the clean file for a specific YouTube channel and specific day. `paths.clean_comments_file` holds the correct path.

In [17]:
today_df.write_parquet(channel_paths.clean_comments_file_path)

# FAQ
- **Why is the notebook using polars?, why not pandas?**

A: In the development of this notebook there were several concerns regarding the use of memory by pandas, sometimes reaching up to 95% in a 16GB system. The decision to split the analyses into different days and using polars were part of the solution. Polars specially is incredibly memory friendly and can use parallelization with CPU. Not only memory friendly, but also faster. Not to mention that polars was written in Rust.

- **Why deduplicate after performing the forward fill?**

A: There were some unnexpected behaviors wen we deduplicated first and then performed the forward fill later. Like replies without a top level comment.

- **What is the purpose of `del` and `gc.collect()`?**

A: To free memory explicitly. The files handled here can be somewhat large for relatively low memory systems, so, if we don't need an object anymore we simply delete the refferences to it and force python to run the garbage collection process (since `del` only removes the pointers to an object, but sometimes the memory is not freed immediately).

- **Why latin script and Korean?**

A: One of the first channels that were tested with the current notebook had numerous Korean comments (specially Korean). The process of tokenization for the Korean language is different for that of latin languages, so the tokenizer checks the script first, if it is hangul it runs the Korean tokenizer, if it is not, it runs the latin tokenizer. The performace of the Korean tokenizer remains to be tested, due to the lack of Korean speakers during the development. Support for more languages and scripts (like Japanese, Chinese, Hindi, Arabic, etc) may be added in the future.