<a href="https://colab.research.google.com/github/horasan/eng_to_sql_ner/blob/main/NER_A_2_1_syntetic_data_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import csv
import itertools
import random
import json

In [None]:
from google.colab import drive
# read data from google drive
drive.mount('/content/drive')
FOLDER_PATH = "NER_for_SQL"
FULL_PATH = "/content/drive/My Drive/Colab Notebooks/" + FOLDER_PATH + "/"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# utils

In [None]:
import random
import re

def generate_synthetic_queries(queries, num_samples_per_query=10):
    synthetic_data = []

    for query_key, query_info in queries.items():
        templates = query_info["templates"]
        params = query_info["params"]

        for _ in range(num_samples_per_query):
            template = random.choice(templates)
            chosen_params = {
                param: random.choice(values)
                for param, values in params.items()
            }

            text = template
            entity_spans = []

            # Replace placeholders one by one, tracking offsets carefully
            def replace_and_track(match):
                placeholder = match.group(0)
                param_name = placeholder.strip("{}")
                value = chosen_params[param_name]
                start = match.start()
                end = start + len(value)
                entity_spans.append((start, end, param_name))
                return value

            # Use regex to substitute placeholders and track entity positions
            pattern = re.compile(r"\{(\w+)\}")
            output = []
            last_idx = 0
            for match in pattern.finditer(template):
                output.append(template[last_idx:match.start()])
                param_name = match.group(1)
                value = chosen_params[param_name]
                current_start = len("".join(output))
                current_end = current_start + len(value)
                entity_spans.append((current_start, current_end, param_name))
                output.append(value)
                last_idx = match.end()
            output.append(template[last_idx:])
            text = "".join(output)

            # Sort entity spans by start index to ensure natural sentence order
            entity_spans.sort(key=lambda x: x[0])

            synthetic_data.append({
                "text": text,
                "entities": entity_spans,
                "query_type": query_key
            })

    return synthetic_data

In [None]:
import re

def tokenize_with_char_spans(text):
    """Tokenize and get (token, start_char, end_char) for each token."""
    tokens = []
    spans = []
    for match in re.finditer(r'\S+', text):
        tokens.append(match.group())
        spans.append((match.start(), match.end()))
    return tokens, spans

def convert_to_bio_tags(samples):
    """Convert text and entity spans to token-level BIO tags."""
    bio_tagged = []

    for sample in samples:
        text = sample["text"]
        entities = sample["entities"]
        tokens, spans = tokenize_with_char_spans(text)
        tags = ["O"] * len(tokens)

        for ent_start, ent_end, label in entities:
            for i, (tok_start, tok_end) in enumerate(spans):
                if tok_end <= ent_start:
                    continue  # Token is before entity
                if tok_start >= ent_end:
                    break   # Token is after entity
                if ent_start <= tok_start < ent_end:  # Token inside entity
                    tags[i] = f"B-{label}" if tok_start == ent_start else f"I-{label}"

        bio_tagged.append({
            "text": text,
            "tokens": tokens,
            "tags": tags
        })

    return bio_tagged


In [None]:
def save_bio_tagged_data(bio_tagged_data, filename):
    with open(filename, "w", encoding="utf-8") as f:
        for sample in bio_tagged_data:
            for token, tag in zip(sample["tokens"], sample["tags"]):
                f.write(f"{token}\t{tag}\n")
            f.write("\n")  # Blank line between sentences

# 1) Read syntetic templates (with data)

In [None]:
# read syntetic-data-query-templates.json
synthetic_data_query_templates = 'syntetic_data_query_templates.json'
with open(FULL_PATH + synthetic_data_query_templates, 'r') as f:
    queries = json.load(f)

# 2) Generate syntetic sentences

In [None]:
"""
synthetic_samples will be a list of dictionaries, each containing:
- "text": the generated query text
- "entities": a list of tuples (start, end, entity_type) indicating the spans of entities in the text
- "query_type": the type of query (e.g., "query1", "query2", etc.)
"""
synthetic_samples = generate_synthetic_queries(queries, num_samples_per_query=300)

# save to JSON file
synthetic_samples_output_file = "synthetic_queries_300.json"
with open(FULL_PATH + synthetic_samples_output_file, 'w') as f:
    json.dump(synthetic_samples, f, indent=2)


# 3) Generate BIO tagged data

In [None]:
#read samples from JSON file
with open(FULL_PATH + synthetic_samples_output_file, 'r') as f:
    samples_from_file = json.load(f)

bio_tagged_syntetic_data = convert_to_bio_tags(samples_from_file)



In [None]:
# save bio tagged data to JSON file
big_tagged_file_name = "synthetic_queries_300_bio_tagged.txt"
save_bio_tagged_data(bio_tagged_syntetic_data, FULL_PATH + big_tagged_file_name)