In [22]:
import json
import hashlib
import time
import uuid
import spacy
import os
import re

# Load the English tokenizer from spaCy
nlp = spacy.load("en_core_web_sm")

# Function to create input hash (based on Prodigy's approach)
def create_hash(text):
    return int(hashlib.md5(text.encode()).hexdigest(), 16) % (2**32)

def labelstudio_to_prodigy(ls_data):
    prodigy_data = []
    
    for document in ls_data:
        # Extract the text from the document
        if "data" in document and "text" in document["data"]:
            # Get the original text
            original_text = document["data"]["text"]
            
            # Replace newlines with spaces to create a single continuous text
            text = original_text.replace("\n", " ")
            
            # Calculate the mapping between original text positions and new text positions
            position_map = {}
            new_pos = 0
            for old_pos in range(len(original_text)):
                if original_text[old_pos] == '\n':
                    # For newlines, don't increment new_pos (as they get replaced with spaces)
                    position_map[old_pos] = new_pos
                    new_pos += 1  # Add 1 for the space that replaces the newline
                else:
                    position_map[old_pos] = new_pos
                    new_pos += 1
            
            # Tokenize the text with spaCy
            doc = nlp(text)
            tokens = []
            for i, token in enumerate(doc):
                tokens.append({
                    "text": token.text,
                    "start": token.idx,
                    "end": token.idx + len(token.text),
                    "id": i,
                    "ws": token.whitespace_ != ""
                })
            
            # Process each annotation in the document
            spans = []
            
            if "annotations" in document:
                for annotation in document["annotations"]:
                    if "result" in annotation:
                        for result in annotation["result"]:
                            if "value" in result and "start" in result["value"] and "end" in result["value"] and "labels" in result["value"]:
                                # Get the original positions from the annotation
                                orig_start = result["value"]["start"]
                                orig_end = result["value"]["end"]
                                label = result["value"]["labels"][0]  # Taking the first label
                                
                                # Check if this is one of our known examples
                                annotated_text = result["value"]["text"]
                                
                                # For unknown examples, try to match as closely as possible
                                # Map the original positions to the new positions
                                start = position_map.get(orig_start, orig_start)
                                end = position_map.get(orig_end, orig_end)
                                
                                # Find the token indices
                                token_start = None
                                token_end = None
                                
                                for i, token in enumerate(tokens):
                                    if token_start is None and token["start"] <= start and token["end"] > start:
                                        token_start = i
                                    if token["start"] <= end and token["end"] >= end:
                                        token_end = i
                                        break
                                
                                # If we couldn't find exact matches, use the nearest tokens
                                if token_start is None:
                                    for i, token in enumerate(tokens):
                                        if token["start"] <= start:
                                            token_start = i
                                if token_end is None:
                                    for i, token in enumerate(tokens):
                                        if token["end"] >= end:
                                            token_end = i
                                            break
                                
                                spans.append({
                                    "start": start,
                                    "end": end,
                                    "token_start": token_start,
                                    "token_end": token_end,
                                    "label": label
                                })
            
            # Generate a unique ID for this annotation
            annotator_id = str(uuid.uuid4())
            session_id = str(uuid.uuid4())
            
            # Create the Prodigy entry
            prodigy_entry = {
                "text": text,
                "_input_hash": create_hash(text),
                "_task_hash": create_hash(str(spans)),
                "_is_binary": False,
                "tokens": tokens,
                "_view_id": "ner_manual",
                "spans": spans,
                "answer": "accept",
                "_timestamp": int(time.time()),
                "_annotator_id": annotator_id,
                "_session_id": session_id
            }
            
            return prodigy_entry

def convert_files(input_dir, output_dir):
    """Convert Label-Studio JSON files to Prodigy JSONL format."""
    try:
        for root, _, files in os.walk(input_dir):
            for file in files:
                if not file.endswith(".json"):  # Ensure processing only JSON files
                    continue
                input_path = os.path.join(root, file)
                relative_path = os.path.relpath(input_path, input_dir)
                relative_path = os.path.splitext(relative_path)[0] 
                output_path = os.path.join(output_dir, relative_path + ".jsonl")
                
                os.makedirs(os.path.dirname(output_path), exist_ok=True)
                
                ls_data = []
                with open(input_path, "r", encoding="utf-8") as file:
                    try:
                        ls_data = json.load(file)  # Load full JSON file
                    except json.JSONDecodeError as e:
                        print(f"Skipping invalid file {input_path}: {e}")
                        continue
                prodigy_json = labelstudio_to_prodigy(ls_data)  # Convert LS JSON to Doccano JSON
                with open(output_path, "w", encoding="utf-8") as file:
                    json.dump(prodigy_json, file, ensure_ascii=False, indent=None, separators=(',', ':'))
                
                print(f"Conversion complete! File saved to {output_path}")
    except:
        print("Error {e} writing to file")
    

def main():
    input_dir = "../Label-Studio_Annotations//JSON_Reports_Annotated"
    output_dir = "../Prodigy_Annotations/Label-Studio_to_Prodigy/JSON_Reports_Annotated"
    convert_files(input_dir, output_dir)

if __name__ == "__main__":
    main()        



Conversion complete! File saved to ../Prodigy_Annotations/Label-Studio_to_Prodigy/JSON_Reports_Annotated\9014961272\602581.jsonl
Conversion complete! File saved to ../Prodigy_Annotations/Label-Studio_to_Prodigy/JSON_Reports_Annotated\9014961272\649968.jsonl
