# Signal Annotation on documents with Pandas and Modin

High quality, annotated data is key to pre-train LLMs. In general, you need to assemble a large corpus of web data and filter the data based on annotated signal. In this demo, we will show an example of how you can extract some signals from text data to annotate each document. 

Since large volumes of data needs to be processed, the annotation step can be take a very long time. For reference, modern LLM training often require several trillions of tokens, [Snowflake Arctic](https://www.snowflake.com/en/data-cloud/arctic/) is trained on 3.5 trillion tokens sourced from the public domain, encompassing web content, code & SQL, STEM, and more. 

We demonstrate how you can use Modin to parallelize the operations without having to rewrite your code and additional infrastructure setup. Modin seamlessly speed up your annotation code. 

ðŸ‘‰ To learn more about reciepes and best practices for pre-training, fine-tuning, and deployomg LLM models, check out our [Snowflake Arctic Cookbook](https://medium.com/snowflake/snowflake-arctic-cookbook-series-arctics-approach-to-data-b81a8a0958bd) for more details.

In [1]:
import warnings
warnings.filterwarnings("ignore")

# Annotation: Extracting signals from text

In [2]:
def annotate_Uppercase_Letter_Fraction(json):
    """
    Ratio between number of uppercase letters and total number of
    characters in each line
    """
    if "text" in json:
        if len(json["text"]) == 0:
            return 0
        else:
            score = sum(map(str.isupper, json["text"])) / len(json["text"])
            score = round(score, 8)
            return score
    return 0

In [3]:
def annotate_Num_Words(json):
    """
    Number of words in each line
    """
    if "text" in json:
        score = len(json["text"].split())
        return score
    return 0

In [4]:
def annotate_Num_Chars_Fraction(json):
    """
    Ratio between number of numerical characters and total number of
    characters in each line.
    """
    if "text" in json:
        if len(json["text"]) == 0:
            return 0
        else:
            score = sum(map(str.isnumeric, json["text"])) / len(json["text"])
            return score
    return 0

In [5]:
def annotate(json):
    # Extracting signals from text to annotate each document
    upper_frac = annotate_Uppercase_Letter_Fraction(json)
    num_words = annotate_Num_Words(json)
    numeric_frac = annotate_Num_Chars_Fraction(json)
    return [upper_frac,num_words,numeric_frac]

# Data processing with pandas

Using pandas, we load in JSON file containing large numbers of documents. Then we do an `apply` to run the signal annotation code on each document to extract and compute the three signals. pandas is single threaded and each document is processed one at a time.

In the examples Github repo, we have attached an example dataset `documents.jsonl`. The full file `job_0_part_1_3100000.jsonl` is over 900MB and represents 310000 documents in a similar format.

In [6]:
import pandas
json_list = pandas.read_json(path_or_buf="job_0_part_1_3100000.jsonl", lines=True)

In [7]:
import time
start = time.time()
annotated_json =  json_list.apply(annotate,axis=1)
end = time.time()

In [8]:
pandas_time = end-start
print (f"Time to process {len(json_list)} documents with pandas: {pandas_time:.2f}s")

Time to process 310000 documents with pandas: 25.43s


In [9]:
signals = pandas.DataFrame(annotated_json.to_list(),columns=["Uppercase_Letter_Fraction","Num_Words","Num_Chars_Fraction"])
pandas_df = pandas.concat([json_list,signals],axis=1)

In [10]:
pandas_df.head()

Unnamed: 0,url,timestamp,dump,segment,image_urls,text,Uppercase_Letter_Fraction,Num_Words,Num_Chars_Fraction
0,http://100parts.wordpress.com/2012/08/04/astra...,2013-05-18 10:42:00,CC-MAIN-2013-20,1368696382261,[],these birches can be found in many places in E...,0.027237,93,0.007782
1,http://100percentwinnersblog.com/watch-survivo...,2013-05-18 11:02:03,CC-MAIN-2013-20,1368696382261,[],Watch Survivor Redemption Island Season 22 Epi...,0.042616,1885,0.009166
2,http://101squadron.com/blog/2007/05/pesky-pecu...,2013-05-18 10:21:35,CC-MAIN-2013-20,1368696382261,[[http://101squadron.com/uploaded_images/Conge...,Pesky?\nthis was a high school project for a p...,0.005128,37,0.0
3,http://1037theloon.com/tags/scorpions/,2013-05-18 10:21:51,CC-MAIN-2013-20,1368696382261,[],metalkingdom.net [ 80â€²s @ 8 Feature Video â€“ Bi...,0.091816,92,0.027944
4,http://1063thebuzz.com/category/reviews/page/7/,2013-05-18 10:31:09,CC-MAIN-2013-20,1368696382261,[[https://s3.amazonaws.com/tsm-images/global/1...,Splice Review\nBlack Ops Escalation Map Pack [...,0.19,32,0.025


# Data processing with Modin

Using Modin, we perform the same operation. Modin parallelizes the `apply` operation so that the annotations are performed across different documents in parallel. You get the benefit of parallelism by just changing a single line of import!

In [11]:
import modin.pandas as pd
json_list = pd.read_json(path_or_buf="job_0_part_1_3100000.jsonl", lines=True)

2024-06-05 16:25:59,823	INFO worker.py:1752 -- Started a local Ray instance.


In [12]:
start = time.time()
annotated_json =  json_list.apply(annotate,axis=1)
end = time.time()

In [13]:
modin_time = end-start
print (f"Time to process {len(json_list)} documents with modin: {end-start:.2f}s")
print (f"Modin is {pandas_time/modin_time:.1f}X faster than pandas")

Time to process 310000 documents with modin: 7.86s
Modin is 3.2X faster than pandas


In [14]:
signals = pd.DataFrame(annotated_json.to_list(),columns=["Uppercase_Letter_Fraction","Num_Words","Num_Chars_Fraction"])
modin_df = pd.concat([json_list,signals],axis=1)

In [15]:
modin_df.head()

Unnamed: 0,url,timestamp,dump,segment,image_urls,text,Uppercase_Letter_Fraction,Num_Words,Num_Chars_Fraction
0,http://100parts.wordpress.com/2012/08/04/astra...,2013-05-18 10:42:00,CC-MAIN-2013-20,1368696000000.0,[],these birches can be found in many places in E...,0.027237,93,0.007782
1,http://100percentwinnersblog.com/watch-survivo...,2013-05-18 11:02:03,CC-MAIN-2013-20,1368696000000.0,[],Watch Survivor Redemption Island Season 22 Epi...,0.042616,1885,0.009166
2,http://101squadron.com/blog/2007/05/pesky-pecu...,2013-05-18 10:21:35,CC-MAIN-2013-20,1368696000000.0,[[http://101squadron.com/uploaded_images/Conge...,Pesky?\nthis was a high school project for a p...,0.005128,37,0.0
3,http://1037theloon.com/tags/scorpions/,2013-05-18 10:21:51,CC-MAIN-2013-20,1368696000000.0,[],metalkingdom.net [ 80â€²s @ 8 Feature Video â€“ Bi...,0.091816,92,0.027944
4,http://1063thebuzz.com/category/reviews/page/7/,2013-05-18 10:31:09,CC-MAIN-2013-20,1368696000000.0,[[https://s3.amazonaws.com/tsm-images/global/1...,Splice Review\nBlack Ops Escalation Map Pack [...,0.19,32,0.025


#### Results are based on running the following versions of pandas and Modin on a 8-core, 16 GB Macbook Air.

In [16]:
print(f"pandas version: {pandas.__version__}")
print(f"Modin version: {pd.__version__}")

pandas version: 2.2.1
Modin version: 0.28.1


# Filtering corpus based on signals

Let's use Arctic to help us filter the text corpus based on the signals we computed. You can access the Arctic chatbot (built on Streamlit! âœ¨) [here](https://arctic.streamlit.app/ )

Ask Arctic: 

```
For dataframe named pandas_df with columns Uppercase_Letter_Fraction Num_Words Num_Chars_Fraction,
filter to num_words > 50 and Uppercase_Letter_Fraction>0
```

In [17]:
filtered_df = pandas_df[(pandas_df['Num_Words'] > 50) & (pandas_df['Uppercase_Letter_Fraction'] > 0)]

Ask Arctic: 

```
Now do the same for a dataframe named modin_df but scale up my code with Modin
```

In [18]:
filtered_df = modin_df[(modin_df['Num_Words'] > 50) & (modin_df['Uppercase_Letter_Fraction'] > 0)]

Ask Arctic: 

```
compute and print out fraction of filter_df with original dataframe pandas_df
```

In [19]:
original_size = len(pandas_df)
filtered_size = len(filtered_df)
fraction = filtered_size / original_size
print("Fraction of filtered_df with respect to pandas_df:", fraction)

Fraction of filtered_df with respect to pandas_df: 0.9558354838709677


Ask Arctic: 
```
now do the same for modin_df
```

In [20]:
original_size = len(modin_df)
filtered_size = len(filtered_df)
fraction = filtered_size / original_size
print("Fraction of filtered_df with respect to modin_df:", fraction)

Fraction of filtered_df with respect to modin_df: 0.9558354838709677
