In [10]:
import pandas as pd
import numpy as np
from ast import literal_eval

In [11]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"


pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)
pd.options.display.max_rows

In [15]:
cd D:\Project\Toolkit_for_Preprocessing_MXH\ViHOS

D:\Project\Toolkit_for_Preprocessing_MXH\ViHOS


In [16]:
train_path = r"Data\Raw_data\train.csv"
dev_path = r"Data\Raw_data\dev.csv"
test_path = r"Data\Raw_data\test.csv"

In [17]:
train = pd.read_csv(train_path)
dev = pd.read_csv(dev_path)
test = pd.read_csv(test_path)
test['index_spans'] = test['index_spans'].apply(literal_eval)
train['index_spans'] = train['index_spans'].apply(literal_eval)
dev['index_spans'] = dev['index_spans'].apply(literal_eval)

headers = ['Unnamed: 0',  'content', 'index_spans']
train.columns = headers
dev.columns = headers
test.columns = headers
test.head(2)

Unnamed: 0.1,Unnamed: 0,content,index_spans
0,0,Anh bar .,[]
1,1,Hello thầy,[]


# Pre-processing

In [18]:
from Code.Preprocessing import unicode
# Apply the replacement function to the 'content' column
test['content'] = test['content'].apply(unicode)
train['content'] = train['content'].apply(unicode)
dev['content'] = dev['content'].apply(unicode)


In [19]:
import os
folder_path = "spans_text"
if not os.path.exists(folder_path):
    # Create the folder if it doesn't exist
    os.makedirs(folder_path)

In [20]:
df = train[['index_spans', 'content']]
df.columns = ['spans', 'text']
df.to_csv(r'spans_text\df_train.csv')

df = dev[['index_spans', 'content']]
df.columns = ['spans', 'text']
df.to_csv(r'spans_text\df_dev.csv')

df = test[['index_spans', 'content']]
df.columns = ['spans', 'text']
df.to_csv(r'spans_text\df_test.csv')

# Tokeniner

In [21]:
# from vncorenlp import VnCoreNLP
# annotator = VnCoreNLP(r"D:\Project\Toolkit_for_Preprocessing_MXH\ViHOS\Code\VnCoreNLP", annotators="wseg", max_heap_size='-Xmx500m')

from vncorenlp import VnCoreNLP

# Ensure that the JAR file path is correct
annotator = VnCoreNLP(r"D:\\Project\\Toolkit_for_Preprocessing_MXH\\ViHOS\\Code\\VnCoreNLP\\VnCoreNLP-1.1.1.jar", annotators="wseg", max_heap_size='-Xmx500m')


In [22]:
text = test['content'][325]
annotator_text = annotator.tokenize(text)
tokens = []
for i in range(len(annotator_text)):
  for j in range(len(annotator_text[i])):
    tokens.append(annotator_text[i][j])
tokens


['A', 'méo', 'nên', 'về', 'nước', 'thì', 'mới', 'đúng']

In [23]:
# import more_itertools as mit
from itertools import groupby
from operator import itemgetter

def find_ranges(span):
    # Group consecutive numbers and create ranges.
    # ex: [0, 1, 3, 20, 21] --> [(0, 1), (3,3), (20, 21)]
    return [(group[0], group[-1]) for _, g in groupby(enumerate(span), lambda x: x[0] - x[1])
            for group in [list(map(itemgetter(1), g))]]

def tokenize_word(text, pos):
    tokens = [token for sentence in annotator.tokenize(text) for token in sentence]
    alignment, start = [], 0

    for t in tokens:
        if t == "_":
            res = text.find(t, start)
        else:
            t = t.lstrip("_").replace("_", " ")
            res = text.find(t, start)

        alignment.append(pos[res:res + len(t)])
        start = res + len(t)

    assert len(tokens) == len(alignment)
    return tokens, alignment

def annotate(spans, alignment, tokens):
    annotations = pd.DataFrame({'Tokens': tokens, 'Tag': ['O'] * len(tokens)})

    for span in spans:
        for i, align in enumerate(alignment):
            if align[-1] < span[0]:
                continue
            elif align[0] <= span[0] <= align[-1]:
                annotations.at[i, 'Tag'] = 'B-T'
            elif span[0] < align[0] <= span[-1]:
                annotations.at[i, 'Tag'] = 'I-T'
            elif align[0] > span[-1]:
                break

    return annotations['Tag']

def load_data(path):
    tsd = pd.read_csv(path)
    tsd['spans'] = tsd['spans'].apply(literal_eval)

    data = []
    for _, row in tsd.iterrows():
        text, span = row['text'], row['spans']
        segments = find_ranges(span) if span else []
        temp = [[seg[0], seg[-1]] if len(seg) > 1 else [seg[0]] for seg in segments]
        text_spans = [text[seg[0]:seg[-1] + 1] for seg in segments]
        
        data.append({'text': text, 'spans': temp, 'text_spans': text_spans})
    
    return data



In [24]:
test_data = load_data(r'spans_text\df_test.csv')
train_data = load_data(r'spans_text\df_train.csv')
dev_data = load_data(r'spans_text\df_dev.csv')
test_data[30]

{'text': 'đ m, thầy giáo cũng sống ảo',
 'spans': [[0, 2]],
 'text_spans': ['đ m']}

# Annotate BIO

In [43]:
def annotate(spans, alignment, tokens):
    # Initialize the DataFrame with tokens and default "O" tags
    annotations = pd.DataFrame({'Tokens': tokens, 'Tag': ['O'] * len(tokens)})

    # Check if alignment is empty before processing spans
    if not alignment:
        return annotations['Tag']  # Return default "O" tags if alignment is empty

    for span in spans:
        i = 0
        while i < len(alignment):
            # Check if alignment[i] has the expected structure
            if len(alignment[i]) < 2:  # Ensuring there are at least two elements
                i += 1
                continue

            align_start, align_end = alignment[i][0], alignment[i][-1]

            # Check if the current alignment ends before the span starts
            if align_end < span[0]:
                i += 1
            # Check for the beginning of the span
            elif align_start <= span[0] <= align_end:
                annotations.at[i, 'Tag'] = 'B-T'  # Beginning of the tag
                i += 1
            # Check for continuation of the span
            elif span[0] < align_start <= span[-1]:
                annotations.at[i, 'Tag'] = 'I-T'  # Inside of the tag
                i += 1
            # Stop if the current alignment starts after the span ends
            elif align_start > span[-1]:
                break

    return annotations['Tag']

In [44]:
import pandas as pd
import numpy as np
from Code.Preprocessing import dupplicate_punctuation


def data_BIO(data):
    # Prepare a list to hold formatted data
    formated_data = []

    for d in data:
        text = d['text']
        pos = list(range(len(text)))  # Create a position list
        text, pos = dupplicate_punctuation(text, pos)  # Clean up punctuation
        tokens, alignment = tokenize_word(text, pos)  # Tokenize the cleaned text
        annotations = annotate(d['spans'], alignment, tokens)  # Annotate the tokens

        # Combine tokens and their corresponding annotations
        formated_data.extend(zip(tokens, annotations))  # Using zip for better performance
        formated_data.append((None, None))  # Append a marker for sentence separation


    # Create a DataFrame from the formatted data
    df_final = pd.DataFrame(formated_data, columns=['Word', 'Tag'])

    # Generate sentence IDs
    sentence_id = []
    sentence = 0
    for word in df_final['Word']:
        if word is not None:
            sentence_id.append(sentence)
        else:
            sentence_id.append(np.nan)
            sentence += 1

    df_final['sentence_id'] = sentence_id
    df_final.dropna(inplace=True)  # Remove rows where Word is None
    df_final['sentence_id'] = df_final['sentence_id'].astype("int64")  # Convert to int64

    return df_final


In [45]:
test_IBO = data_BIO(test_data)
train_IBO = data_BIO(train_data)
dev_IBO = data_BIO(dev_data)
test_IBO.head()

In [None]:
test_IBO[30]

In [46]:
test_IBO.reset_index(inplace=True)
dev_IBO.reset_index(inplace=True)
train_IBO.reset_index(inplace=True)

In [8]:
import os
folder_path = "BIO_data"
if not os.path.exists(folder_path):
    # Create the folder if it doesn't exist
    os.makedirs(folder_path)

In [9]:
train_IBO.to_csv(r'BIO_data\train_BIO.csv', index=False)
dev_IBO.to_csv(r'BIO_data\dev_BIO.csv', index=False)
test_IBO.to_csv(r'BIO_data\test_BIO.csv', index=False)

NameError: name 'train_IBO' is not defined