In [1]:
import os
from data_partitioner_jsonl import *
from lisa_tagging import *

In [2]:
file_name = 'corpus.jsonl'
dataset = "blogcorpus"

In [3]:
%time
# input_path = os.path.join(f'/shared/3/projects/hiatus/idiolect/data/full_pilot/{dataset}', file_name)
# partition_dir = os.path.join(f'/shared/3/projects/hiatus/tagged_data/lisa_partitions/{dataset}', file_name.split('.')[0]) 
# tag_dir = os.path.join(f'/shared/3/projects/hiatus/tagged_data/lisa_partitions/tagged-{dataset}', file_name.split('.')[0])
# output_dir= f'/shared/3/projects/hiatus/tagged_data/lisa-{dataset}'
input_path = os.path.join(f'/shared/3/projects/hiatus/idiolect/data/full_pilot/{dataset}', file_name)
partition_dir = os.path.join(f'/shared/3/projects/hiatus/tagged_data/lisa_partitions/{dataset}', file_name.split('.')[0]) 
tag_dir = os.path.join(f'/shared/3/projects/hiatus/tagged_data/lisa_partitions/tagged-{dataset}', file_name.split('.')[0])
output_dir= f'/shared/3/projects/hiatus/tagged_data/lisa-{dataset}'


# ensures all directories will exist
for directory in [partition_dir, tag_dir, output_dir]:
    os.makedirs(directory, exist_ok=True)


CPU times: user 4 µs, sys: 3 µs, total: 7 µs
Wall time: 17.2 µs


## Partition the file into 50 parts

In [4]:
%%time
partition_file(input_path, partition_dir, chunks=2)

INFO:root:158763 lines in file
INFO:root:Chunk size: 79382 lines
INFO:root:Saving /shared/3/projects/hiatus/tagged_data/lisa_partitions/blogcorpus/corpus/partition-1.jsonl
INFO:root:Saving /shared/3/projects/hiatus/tagged_data/lisa_partitions/blogcorpus/corpus/partition-2.jsonl


CPU times: user 7.92 s, sys: 1.6 s, total: 9.52 s
Wall time: 13.1 s


In [None]:
from tokenization_enc_t5 import EncT5Tokenizer
from modeling_enc_t5 import EncT5ForSequenceClassification
import os 
import json 

tokenizer = EncT5Tokenizer.from_pretrained("t5-base")
model = EncT5ForSequenceClassification.from_pretrained(
    "/shared/3/projects/hiatus/idiolect/models/stylegenome_lisa_sfam/lisa_checkpoint",
    num_labels=768, problem_type="regression"
)

def tag_lisa(obj):
    try:
        tokenized = tokenizer(
            [obj['fullText']],
            truncation=True, max_length=512, padding=True, return_tensors="pt"
        )
        print(f"Tokenized input for object: {obj['documentID']}")
        prediction = model.forward(**tokenized)[0][0].cpu().detach().float()
        print(f"Prediction for object {obj['documentID']}: {prediction}")
        vector = torch.clamp(prediction, min=0.0, max=1.0).numpy().tolist()
        if 'encodings' in obj:
            del obj['encodings']
        obj['lisa_vector'] = vector
    except Exception as e:
        print(f"Error processing object {obj['documentID']}: {e}")
    return obj

def tag_partition(input_file, output_file):
    print(f"Tagging file {input_file} with worker {os.getpid()}")
    tagged_objects = []

    try:
        with open(input_file, 'r') as reader:
            for line in reader:
                obj = json.loads(line.strip())
                tagged_object = tag_lisa(obj)
                tagged_objects.append(tagged_object)
                print(f"Tagged object {obj['documentID']} for worker {os.getpid()}. Count: {len(tagged_objects)}")

                if len(tagged_objects) % 10 == 0:
                    print(f"Appending chunk of 10 objects to {output_file}")
                    append_chunk(output_file, tagged_objects)
                    tagged_objects = []

        if tagged_objects:
            append_chunk(output_file, tagged_objects)
    except Exception as e:
        print(f"Error tagging partition {input_file}: {e}")

tag_partition('/shared/3/projects/hiatus/tagged_data/lisa_partitions/blogcorpus/corpus/partition-1.jsonl', 'test_tagged.jsonl')

## Tag each partition with 1 CPU 

Set nice value low so we don't hog the server


In [5]:
%%time
import logging
original_level = logging.getLogger().getEffectiveLevel()
logging.getLogger().setLevel(logging.ERROR)

tag_partitions(
               input_directory=partition_dir,
               output_directory=tag_dir,
               num_workers=2,
#                default_niceness=0
               )
logging.getLogger().setLevel(original_level)


Setting CPU affinity to use 2 CPUs
Starting multiprocessing pool...


The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'T5Tokenizer'. 
The class this function is called from is 'EncT5Tokenizer'.
The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'T5Tokenizer'. 
The class this function is called from is 'EncT5Tokenizer'.
You are using the default legacy behaviour of the <class 'tokenization_enc_t5.EncT5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transf

Worker 701992 initialized
Tagging file /shared/3/projects/hiatus/tagged_data/lisa_partitions/blogcorpus/corpus/partition-1.jsonl with worker 701992
Worker 701993 initialized
Tagging file /shared/3/projects/hiatus/tagged_data/lisa_partitions/blogcorpus/corpus/partition-2.jsonl with worker 701993Tokenized input for object: 39cc6cd5-f27a-44b7-accb-9c504fe8ff71

Tokenized input for object: 01f97b8b-a28f-4bb7-b303-dab4315893d0


KeyboardInterrupt: 

## Join the partitioned files

In [None]:
# join_tagged_files(input_directory=tag_dir,
#                   output_file=os.path.join(output_dir, file_name))

## Delete the temp directories

In [None]:
# delete_partitioned_files(partition_dir)
# delete_partitioned_files(tag_dir)