In [1]:
! pip install simpletransformers
! pip install tensorboardX
! pip install Unidecode
! pip install nltk

In [2]:
import torch
torch.cuda.is_available()

True

In [3]:
import numpy as np
import pandas as pd
from datasets import load_dataset
import cudf
import os


In [4]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [5]:
def get_wiki_paraquet_files():
    
    ids = [str(i) for i in range(10)]
    base_url = "https://huggingface.co/api/datasets/fever/parquet/wiki_pages/wikipedia_pages/"
    cach_dir = '/home/rahvk/data/tmp/cache' # change this to your own path
    output_dir = 'wiki_pages_parquets'
    os.makedirs(output_dir, exist_ok=True)  # Create the output directory if it doesn't exist

    for index in ids:
        data_files = {"wikipedia_pages": base_url + f"{index}.parquet"}
        wiki = load_dataset("parquet", data_files=data_files, split="wikipedia_pages", cache_dir=cache_dir)
        
        wiki.to_csv(f"{output_dir}/{index}_parquet_wiki.csv")
        
        del wiki
        
        print(f"completed downloading {index}")

In [6]:
get_wiki_paraquet_files()

In [7]:
import re
from nltk.corpus import stopwords
from unidecode import unidecode

# Clean text
def remove_non_ascii(text):
    return re.sub(r'[^\x00-\x7F]', ' ', text)


def remove_punctuation(text):
    return re.sub(r'[^\w]', ' ', text)

def remove_digits(text):
    return re.sub(r'[\d]', '', text)


def to_lowercase(text):
    return text.lower()


def remove_extra_space(text):
    return re.sub(' +', ' ', text)


def remove_url(text):
    return re.sub(r'http\S+', ' ', text)


def remove_underline(text):
    return text.replace('_', ' ')


def remove_hyphen(text):
    return text.replace('-', ' ')


def remove_leading_whitespace(text):
    return text.lstrip()

def remove_stopwords(text):
    stop_words = set(stopwords.words('english'))
    return ' '.join([word for word in text.split() if word not in stop_words])

def decode_special_chars(text):
    return re.sub(r'-[A-Z]+-', ' ', text)

def remove_newline(text):
    return re.sub('\n', ' ', text)

def remove_tabs(text):
    return re.sub('\t', '', text)

def remove_intext_tabs(text):
    return re.sub(r'(?<!\d)\t', ' ', text)

def split_wiki_lines(lines):
    """
    Seperates lines in Wiki pages based on line index followed by 
    new tab char.
    @param lines - lines column from wikipedia pages DataFrame.
    ______
    Returns pd.DataFrame: new column containing list of lines 
            in wikipedia pages separated by comma.
    """
    lines = re.split(r'\d+\t', lines)
    lines = lines[1:len(lines)-1]
    return lines

def remove_special_tokens(text):
    return re.sub(r'-[A-Z]+-', '', text)

def remove_quotes(text):
    text = re.sub(r'(``|\' \')', '', text)
    return re.sub(r"''", '', text)

def remove_empty_lines(lines):
    return [s for s in lines if s != '\n']


def clean_text(df: pd.DataFrame, column: str):
    
    df[column] = df[column].apply(remove_special_tokens)
    df[column] = df[column].apply(remove_extra_space)
    df[column] = df[column].apply(remove_quotes)
    df[column] = df[column].apply(split_wiki_lines)
    df[column] = df[column].apply(remove_empty_lines)

    return df 

In [8]:
def process_single_file(file_index, uid_start):
    print(f"Started processing file - {file_index}")
    wiki_csv = pd.read_csv(f"wiki_pages_parquets/{file_index}_parquet_wiki.csv")

    # Remove "lines" column
    df_v0 = wiki_csv.drop(columns=['text'])
    del wiki_csv
    # Remove NaN rows
    df_v0 = df_v0.dropna()

    # Clean Text
    df_v1 = clean_text(df=df_v0, column='lines')
    del df_v0
    # Drop rows where `id` is NaN (or empty)
    df_v1['id'].replace('', np.nan, inplace=True)
    df_v2 = df_v1[df_v1['id'].notna()]
    df_v2.rename(columns={'id': 'title'}, inplace=True)

    del df_v1

    # Adjust index to create a unique identifier
    df_v2.reset_index(drop=True, inplace=True)
    df_v2.index += uid_start

    # Convert to cudf
    df_v2_gpu = cudf.DataFrame.from_pandas(df_v2)

    # Return processed DataFrame and last UID
    return df_v2_gpu, df_v2.index[-1] + 1


In [9]:
# Global counter for generating unique IDs
global_unique_id = 0

def process_and_store_parquet_files():
    global global_unique_id  # Ensure we are using the global variable
    ids = [str(i) for i in range(10)]  # 10 files
    output_dir = "processed_wiki"  # Directory to store processed Parquet files
    os.makedirs(output_dir, exist_ok=True)  # Create the output directory if it doesn't exist
    
    # Process each file and store the processed DataFrame as a separate Parquet file
    for index in ids:
        uid_start = global_unique_id  # Start from the current global_unique_id
        df_processed, _ = process_single_file(index, uid_start)
        
        exploded_df = df_processed.explode('lines').reset_index(drop=True)
        del df_processed
        
        # Generate unique IDs for each row within the Parquet file
        exploded_df['unique_id'] = cudf.Series(range(uid_start, uid_start + len(exploded_df)))
        
        output_filename = os.path.join(output_dir, f"{index}.parquet")
        exploded_df.to_parquet(output_filename, index=False)
        torch.cuda.empty_cache()

        # Update global_unique_id for the next file
        global_unique_id += len(exploded_df)
        del exploded_df

    print("Processing and storing Parquet files complete")


In [10]:
process_and_store_parquet_files()

Started processing file - 0
Started processing file - 1
Started processing file - 2
Started processing file - 3
Started processing file - 4
Started processing file - 5
Started processing file - 6
Started processing file - 7
Started processing file - 8
Started processing file - 9
Processing and storing Parquet files complete


In [13]:
def process_parquet_files_to_json():
    read_dir = "processed_wiki"  # Directory containing processed Parquet files
    output_dir = "processed_wiki_jsons"
    processed_files = sorted(os.listdir(read_dir))
    os.makedirs(output_dir, exist_ok=True)  # Create the output directory if it doesn't exist


    for filename in processed_files:
        # Read the Parquet file
        wiki_df = pd.read_parquet(os.path.join(read_dir, filename))

        # Convert unique_id to string
        wiki_df['unique_id'] = wiki_df['unique_id'].astype(str)

        # Rename columns
        wiki_df = wiki_df.rename(columns={'lines': 'contents', 'unique_id': 'id'})

        # Drop the 'title' column
        wiki_df = wiki_df.drop(columns=['title'])

        # Write to JSON file
        json_filename = filename.split(".")[0] + ".json"
        wiki_df.to_json(output_dir+"/"+json_filename, orient='records')

        print(f"Processed {filename} and saved as {json_filename}")

    print("Conversion completed")


In [14]:
process_parquet_files_to_json()

Processed 0.parquet and saved as 0.json
Processed 1.parquet and saved as 1.json
Processed 2.parquet and saved as 2.json
Processed 3.parquet and saved as 3.json
Processed 4.parquet and saved as 4.json
Processed 5.parquet and saved as 5.json
Processed 6.parquet and saved as 6.json
Processed 7.parquet and saved as 7.json
Processed 8.parquet and saved as 8.json
Processed 9.parquet and saved as 9.json
Conversion completed


In [None]:
# NOT WORKING DUE TO CUDF MEMORY CONSTRAINTS

def concatenate_processed_parquet_files():
    output_dir = "processed_data"  # Directory containing processed Parquet files
    processed_files = sorted(os.listdir(output_dir))
    batch_size = 2  # Define your batch size based on memory constraints
    all_batches = []

    for i in range(0, len(processed_files), batch_size):
        batch_files = processed_files[i:i + batch_size]
        batch_dfs = [cudf.read_parquet(os.path.join(output_dir, filename)) for filename in batch_files]
        
        # Concatenate smaller batches
        concatenated_batch = cudf.DataFrame()
        for df in batch_dfs:
            concatenated_batch = cudf.concat([concatenated_batch, df], axis=0)
        all_batches.append(concatenated_batch)

        del batch_dfs, concatenated_batch
        torch.cuda.empty_cache()

    print("Reading and concatenating processed Parquet files complete")

    # Concatenate all batches into a final DataFrame
    final_df = cudf.concat(all_batches, axis=0)

    # Remove processed Parquet files
    for filename in processed_files:
        os.remove(os.path.join(output_dir, filename))
    os.rmdir(output_dir)

    return final_df
