In [6]:
from pathlib import Path
import re
from typing import List, Tuple
from datetime import datetime

# ---------- Patterns for common conversation formats ----------

USER_PATTERNS = [
    r"^\*\*You:\*\*",
    r"^\*\*User:\*\*",
    r"^User:",
    r"^You:",
    r"^##\s*User",
    r"^##\s*You",
]

ASSISTANT_PATTERNS = [
    r"^\*\*ChatGPT:\*\*",
    r"^\*\*Assistant:\*\*",
    r"^Assistant:",
    r"^ChatGPT:",
    r"^##\s*Assistant",
    r"^##\s*ChatGPT",
]

USER_RE = re.compile("|".join(USER_PATTERNS), re.IGNORECASE)
ASSISTANT_RE = re.compile("|".join(ASSISTANT_PATTERNS), re.IGNORECASE)

# ---------- Core Parsing: split into (user, assistant) ----------

def split_conversation(md_text: str) -> List[Tuple[str, str]]:
    lines = md_text.splitlines()

    blocks = []
    current_role = None
    current_user = []
    current_assistant = []

    def flush():
        if current_user or current_assistant:
            blocks.append((
                "\n".join(current_user).strip(),
                "\n".join(current_assistant).strip()
            ))

    for line in lines:
        line_stripped = line.strip()

        if USER_RE.match(line_stripped):
            if current_role is not None:
                flush()
                current_user = []
                current_assistant = []
            current_role = "user"
            continue

        if ASSISTANT_RE.match(line_stripped):
            current_role = "assistant"
            continue

        if current_role == "user":
            current_user.append(line)
        elif current_role == "assistant":
            current_assistant.append(line)

    flush()
    return [(u, a) for u, a in blocks if u.strip() or a.strip()]

# ---------- Extract REAL follow-up questions from assistant ----------

def extract_suggested_block(text: str) -> List[str]:
    """
    Extracts bullet suggestions after lines like:
    'If you want, next I can:'
    'Next I can:'
    'Suggested:'
    """
    lines = text.splitlines()
    collected = []

    for i, line in enumerate(lines):
        if re.search(r"(if you want|next i can|suggest|follow[- ]?up)", line, re.IGNORECASE):
            j = i + 1
            while j < len(lines):
                l = lines[j].rstrip()
                if l.strip().startswith(("*", "-", "‚Ä¢")):
                    collected.append(l.strip())
                    j += 1
                else:
                    break
            if collected:
                break

    return collected

# ---------- Writers ----------

def write_outputs(pairs: List[Tuple[str, str]], output_dir: Path):
    output_dir.mkdir(parents=True, exist_ok=True)

    # -------- questions_only.md --------
    q_only = output_dir / "questions_only.md"
    with q_only.open("w", encoding="utf-8") as f:
        f.write("# Questions\n\n")

        for i, (q, a) in enumerate(pairs, 1):
            f.write(f"## Q{i}\n\n")
            f.write(q.strip() + "\n\n")

            suggestions = extract_suggested_block(a)
            for j, s in enumerate(suggestions, 1):
                clean = re.sub(r"^[\-\*‚Ä¢]\s*", "", s)
                f.write(f"Q{i}.{j} {clean}\n")

            f.write("\n")

    # -------- Individual Q&A files --------
    for i, (q, a) in enumerate(pairs, 1):
        out = output_dir / f"Q{i:03d}.md"
        with out.open("w", encoding="utf-8") as f:
            f.write(f"# Question {i}\n\n")
            f.write("## User Question\n\n")
            f.write(q.strip() + "\n\n")
            f.write("## Assistant Answer\n\n")
            f.write(a.strip() + "\n")

# ---------- Main ----------

def main():
    path = input("Enter path to conversation .md file: ").strip().strip('"')
    file_path = Path(path)

    if not file_path.exists():
        print("‚ùå File not found!")
        return

    text = file_path.read_text(encoding="utf-8", errors="ignore")

    pairs = split_conversation(text)

    if not pairs:
        print("‚ö†Ô∏è No conversation blocks detected. The format may be unknown.")
        return

    base_dir = Path(r"D:\Balaji-workbench\synthetic data")
    run_name = datetime.now().strftime("output_%Y%m%d_%H%M%S")
    output_dir = base_dir / run_name

    write_outputs(pairs, output_dir)

    print("‚úÖ Done!")
    print(f"üìÑ Extracted {len(pairs)} Q&A pairs")
    print(f"üìÅ Output folder: {output_dir}")

if __name__ == "__main__":
    main()


‚úÖ Done!
üìÑ Extracted 10 Q&A pairs
üìÅ Output folder: D:\Balaji-workbench\synthetic data\output_20260208_220555


In [7]:
from pathlib import Path
import re
from datetime import datetime
from typing import List

# --------- Tune these keywords for "topology" meaning ----------
TOPOLOGY_KEYWORDS = [
    "topology", "connect", "connection", "connected", "flow", "communication",
    "drp", "emarker", "e-marker", "swap", "pr_swap", "dr_swap",
    "collision", "scenario", "cable", "device", "interoperability",
    "link", "network", "path", "routing", "role", "negotiation"
]

# ---------- Parse questions-only markdown ----------

def extract_questions_only(md_text: str) -> List[str]:
    """
    Extracts questions from a file formatted like:

    ## Q1
    question text

    ## Q2
    question text
    """
    lines = md_text.splitlines()
    questions = []
    current = []

    for line in lines:
        if re.match(r"^##\s*Q\d+", line.strip(), re.IGNORECASE):
            if current:
                questions.append("\n".join(current).strip())
                current = []
            continue

        # skip separators
        if line.strip().startswith("---"):
            continue

        if line.strip():
            current.append(line)

    if current:
        questions.append("\n".join(current).strip())

    return [q for q in questions if q.strip()]

# ---------- Topology filter ----------

def is_topology_question(text: str) -> bool:
    t = text.lower()
    return any(k.lower() in t for k in TOPOLOGY_KEYWORDS)

# ---------- Writers ----------

def write_outputs(questions: List[str], output_dir: Path):
    output_dir.mkdir(parents=True, exist_ok=True)

    # -------- topology_questions.md --------
    out_md = output_dir / "topology_questions.md"
    with out_md.open("w", encoding="utf-8") as f:
        f.write("# Topology Questions\n\n")
        for i, q in enumerate(questions, 1):
            f.write(f"## T{i}\n\n")
            f.write(q.strip() + "\n\n")

    # -------- Individual files --------
    for i, q in enumerate(questions, 1):
        out = output_dir / f"T{i:03d}.md"
        with out.open("w", encoding="utf-8") as f:
            f.write(f"# Topology Question {i}\n\n")
            f.write(q.strip() + "\n")

# ---------- Main ----------

def main():
    path = input("Enter path to questions .md file: ").strip().strip('"')
    file_path = Path(path)

    if not file_path.exists():
        print("‚ùå File not found!")
        return

    text = file_path.read_text(encoding="utf-8", errors="ignore")

    all_questions = extract_questions_only(text)

    if not all_questions:
        print("‚ö†Ô∏è No questions detected. Check the format (## Q1, ## Q2, ...).")
        return

    topo_questions = [q for q in all_questions if is_topology_question(q)]

    if not topo_questions:
        print("‚ö†Ô∏è No topology questions matched. Try adjusting TOPOLOGY_KEYWORDS.")
        return

    base_dir = Path(r"D:\Balaji-workbench\synthetic data")
    run_name = datetime.now().strftime("topology_output_%Y%m%d_%H%M%S")
    output_dir = base_dir / run_name

    write_outputs(topo_questions, output_dir)

    print("‚úÖ Done!")
    print(f"üìÑ Total questions: {len(all_questions)}")
    print(f"üß≠ Topology questions: {len(topo_questions)}")
    print(f"üìÅ Output folder: {output_dir}")

if __name__ == "__main__":
    main()


‚úÖ Done!
üìÑ Total questions: 1
üß≠ Topology questions: 1
üìÅ Output folder: D:\Balaji-workbench\synthetic data\topology_output_20260208_220938


In [11]:
import re
from pathlib import Path

# Ask user for input file path
input_path = input("Enter path to conversation.md: ").strip().strip('"').strip("'")


input_file = Path(input_path)

if not input_file.exists():
    print("‚ùå File not found:", input_file)
    exit(1)

text = input_file.read_text(encoding="utf-8")

# Split by User blocks
blocks = re.split(r"## User:\s*", text)
blocks = [b.strip() for b in blocks if b.strip()]

qa_pairs = []

for block in blocks:
    parts = re.split(r"## Assistant:\s*", block)
    if len(parts) != 2:
        continue

    user_q = parts[0].strip()
    assistant_text = parts[1].strip()

    # Extract suggested questions (lines starting with "- ")
    suggested = re.findall(r"- (.+)", assistant_text)

    # Remove suggested section from answer (simple cleanup)
    answer = re.sub(r"\*\*Suggested questions:\*\*[\s\S]*", "", assistant_text).strip()

    qa_pairs.append({
        "question": user_q,
        "answer": answer,
        "suggested": suggested
    })

if not qa_pairs:
    print("‚ö†Ô∏è No Q&A pairs found. Check your file format.")
    exit(1)

# Output files in same directory as input
output_dir = input_file.parent

# 1) Write questions_only.md
q_only_lines = ["# Questions\n"]
for qa in qa_pairs:
    q_only_lines.append(f"- {qa['question']}")
    for s in qa["suggested"]:
        q_only_lines.append(f"  - Suggested: {s}")
    q_only_lines.append("")

(output_dir / "questions_only.md").write_text("\n".join(q_only_lines), encoding="utf-8")

# 2) Write many files: one Q&A per file
qa_dir = output_dir / "qa_files"
qa_dir.mkdir(exist_ok=True)

for i, qa in enumerate(qa_pairs, start=1):
    content = (
        f"# Question\n{qa['question']}\n\n"
        f"# Answer\n{qa['answer']}\n"
    )
    (qa_dir / f"q{i}.md").write_text(content, encoding="utf-8")

print("‚úÖ Done!")
print("Generated:")
print("-", output_dir / "questions_only.md")
print("-", qa_dir, "(q1.md, q2.md, ...)")


‚ö†Ô∏è No Q&A pairs found. Check your file format.
‚úÖ Done!
Generated:
- C:\Users\GRL\Downloads\questions_only.md
- C:\Users\GRL\Downloads\qa_files (q1.md, q2.md, ...)


In [1]:
from pathlib import Path
import re
from typing import List, Tuple

# ---------- Patterns for common conversation formats ----------

USER_PATTERNS = [
    r"^\*\*You:\*\*",
    r"^\*\*User:\*\*",
    r"^User:",
    r"^You:",
    r"^##\s*User",
    r"^##\s*You",
]

ASSISTANT_PATTERNS = [
    r"^\*\*ChatGPT:\*\*",
    r"^\*\*Assistant:\*\*",
    r"^Assistant:",
    r"^ChatGPT:",
    r"^##\s*Assistant",
    r"^##\s*ChatGPT",
]

USER_RE = re.compile("|".join(USER_PATTERNS), re.IGNORECASE)
ASSISTANT_RE = re.compile("|".join(ASSISTANT_PATTERNS), re.IGNORECASE)

# ---------- Conversation Splitter ----------

def split_conversation(md_text: str) -> List[Tuple[str, str]]:
    lines = md_text.splitlines()

    blocks = []
    current_role = None
    current_user = []
    current_assistant = []

    def flush():
        if current_user or current_assistant:
            blocks.append((
                "\n".join(current_user).strip(),
                "\n".join(current_assistant).strip()
            ))

    for line in lines:
        line_stripped = line.strip()

        if USER_RE.match(line_stripped):
            if current_role == "assistant":
                flush()
                current_user = []
                current_assistant = []
            current_role = "user"
            continue

        if ASSISTANT_RE.match(line_stripped):
            current_role = "assistant"
            continue

        if current_role == "user":
            current_user.append(line)
        elif current_role == "assistant":
            current_assistant.append(line)

    flush()
    return [(u, a) for u, a in blocks if u.strip() or a.strip()]

# ---------- Precise Suggested Question Extraction ----------

SECTION_TRIGGERS = [
    "suggested questions",
    "follow-up questions",
    "follow up questions",
    "next questions",
    "you can ask",
    "you might ask",
    "consider asking",
    "if you want next",
]

def extract_suggested_questions(answer_text: str) -> List[str]:
    suggestions = []
    seen = set()

    lines = [l.strip() for l in answer_text.splitlines() if l.strip()]

    capture_mode = False

    # Verbs that often indicate "offer-style" suggestions
    OFFER_VERBS = (
        "show", "help", "design", "draft", "map", "explain", "build",
        "create", "generate", "analyze", "analyse", "walk", "outline"
    )

    for line in lines:
        low = line.lower()

        # Detect section headers like "If you want next, I can:"
        if any(t in low for t in SECTION_TRIGGERS) or low.endswith("i can:") or low.endswith("i can"):
            capture_mode = True
            continue

        # Bullet or numbered list items
        is_list_item = bool(
            re.match(r"^[-*‚Ä¢]\s+", line) or re.match(r"^\d+[\.\)]\s+", line)
        )

        if is_list_item:
            clean = re.sub(r"^([-*‚Ä¢]|\d+[\.\)])\s*", "", line).strip()
            if len(clean) > 5:
                # Case 1: already a real question
                if "?" in clean:
                    q = clean if clean.endswith("?") else clean + "?"
                else:
                    # Case 2: offer-style suggestion ‚Üí convert to question
                    first_word = clean.split()[0].lower()
                    if first_word in OFFER_VERBS:
                        # "Show X" -> "Can you show X?"
                        q = "Can you " + clean[0].lower() + clean[1:]
                        if not q.endswith("?"):
                            q += "?"
                    else:
                        continue

                key = q.lower()
                if key not in seen:
                    seen.add(key)
                    suggestions.append(q)
            continue

        # If we are in a "suggestions" section, also capture plain question sentences
        if capture_mode:
            if line.endswith("?"):
                key = line.lower()
                if key not in seen:
                    seen.add(key)
                    suggestions.append(line)
            else:
                # stop when normal paragraph resumes
                capture_mode = False

    return suggestions

# ---------- Writers ----------

def write_outputs(pairs: List[Tuple[str, str]], output_dir: Path):
    output_dir.mkdir(exist_ok=True)

    # questions_only.md
    q_only = output_dir / "questions_only.md"
    with q_only.open("w", encoding="utf-8") as f:
        f.write("# Suggested / Follow-up Questions Only\n\n")

        for i, (q, a) in enumerate(pairs, 1):
            suggestions = extract_suggested_questions(a)

            if not suggestions:
                continue

            f.write(f"## Q{i}\n\n")
            f.write(q.strip() + "\n\n")

            for j, s in enumerate(suggestions, 1):
                f.write(f"- Q{i}.{j} {s}\n")

            f.write("\n")

    # Individual QA files
    for i, (q, a) in enumerate(pairs, 1):
        out = output_dir / f"Q{i:03d}.md"
        with out.open("w", encoding="utf-8") as f:
            f.write(f"# Question {i}\n\n")
            f.write("## User Question\n\n")
            f.write(q.strip() + "\n\n")
            f.write("## Assistant Answer\n\n")
            f.write(a.strip() + "\n\n")

            suggestions = extract_suggested_questions(a)
            if suggestions:
                f.write("## Suggested / Follow-up Questions\n\n")
                for j, s in enumerate(suggestions, 1):
                    f.write(f"- Q{i}.{j} {s}\n")

# ---------- Main ----------

def main():
    path = input("Enter path to conversation .md file: ").strip().strip('"')
    file_path = Path(path)

    if not file_path.exists():
        print("‚ùå File not found!")
        return

    text = file_path.read_text(encoding="utf-8", errors="ignore")

    pairs = split_conversation(text)

    if not pairs:
        print("‚ö†Ô∏è No conversation blocks detected. The format may be unknown.")
        return

    output_dir = Path(r"D:\Balaji-workbench\synthetic data")

    write_outputs(pairs, output_dir)

    print("‚úÖ Done!")
    print(f"üìÑ Processed {len(pairs)} Q&A pairs")
    print(f"üìÅ Output folder: {output_dir}")

if __name__ == "__main__":
    main()


‚úÖ Done!
üìÑ Processed 9 Q&A pairs
üìÅ Output folder: D:\Balaji-workbench\synthetic data


In [2]:
from pathlib import Path
import re
from typing import List, Tuple

# ---------- Patterns for common conversation formats ----------

USER_PATTERNS = [
    r"^\*\*You:\*\*",
    r"^\*\*User:\*\*",
    r"^User:",
    r"^You:",
    r"^##\s*User",
    r"^##\s*You",
]

ASSISTANT_PATTERNS = [
    r"^\*\*ChatGPT:\*\*",
    r"^\*\*Assistant:\*\*",
    r"^Assistant:",
    r"^ChatGPT:",
    r"^##\s*Assistant",
    r"^##\s*ChatGPT",
]

USER_RE = re.compile("|".join(USER_PATTERNS), re.IGNORECASE)
ASSISTANT_RE = re.compile("|".join(ASSISTANT_PATTERNS), re.IGNORECASE)

# ---------- Conversation Splitter ----------

def split_conversation(md_text: str) -> List[Tuple[str, str]]:
    lines = md_text.splitlines()

    blocks = []
    current_role = None
    current_user = []
    current_assistant = []

    def flush():
        if current_user or current_assistant:
            blocks.append((
                "\n".join(current_user).strip(),
                "\n".join(current_assistant).strip()
            ))

    for line in lines:
        line_stripped = line.strip()

        if USER_RE.match(line_stripped):
            if current_role == "assistant":
                flush()
                current_user = []
                current_assistant = []
            current_role = "user"
            continue

        if ASSISTANT_RE.match(line_stripped):
            current_role = "assistant"
            continue

        if current_role == "user":
            current_user.append(line)
        elif current_role == "assistant":
            current_assistant.append(line)

    flush()
    return [(u, a) for u, a in blocks if u.strip() or a.strip()]

# ---------- Suggested Question Extraction ----------

SECTION_TRIGGERS = [
    "suggested questions",
    "follow-up questions",
    "follow up questions",
    "next questions",
    "you can ask",
    "you might ask",
    "consider asking",
    "if you want next",
    "i can:",
]

def strip_markdown(text: str) -> str:
    # Remove **bold**, *italic*, __underline__, etc.
    text = re.sub(r"(\*\*|\*|__|_)", "", text)
    return text.strip()

def extract_suggested_questions(answer_text: str) -> List[str]:
    suggestions = []
    seen = set()

    lines = [l.rstrip() for l in answer_text.splitlines() if l.strip()]

    capture_mode = False

    for line in lines:
        low = line.lower().strip()

        # Detect section headers like "If you want next, I can:"
        if any(t in low for t in SECTION_TRIGGERS):
            capture_mode = True
            continue

        # Bullet or numbered list items
        is_list_item = bool(
            re.match(r"^\s*[-*‚Ä¢]\s+", line) or re.match(r"^\s*\d+[\.\)]\s+", line)
        )

        if capture_mode and is_list_item:
            clean = re.sub(r"^\s*([-*‚Ä¢]|\d+[\.\)])\s*", "", line).strip()
            clean = strip_markdown(clean)

            # Remove leading "or "
            if clean.lower().startswith("or "):
                clean = clean[3:].strip()

            if len(clean) > 5:
                # Turn into a question
                # Make first letter lowercase after "Can you"
                q_body = clean[0].lower() + clean[1:]
                q = "Can you " + q_body
                if not q.endswith("?"):
                    q += "?"

                key = q.lower()
                if key not in seen:
                    seen.add(key)
                    suggestions.append(q)
            continue

        # If we leave the bullet list, stop capture mode
        if capture_mode and not is_list_item:
            capture_mode = False

    return suggestions

# ---------- Writers ----------

def write_outputs(pairs: List[Tuple[str, str]], output_dir: Path):
    output_dir.mkdir(exist_ok=True)

    # questions_only.md
    q_only = output_dir / "questions_only.md"
    with q_only.open("w", encoding="utf-8") as f:
        f.write("# Suggested / Follow-up Questions Only\n\n")

        for i, (q, a) in enumerate(pairs, 1):
            suggestions = extract_suggested_questions(a)

            if not suggestions:
                continue

            f.write(f"## Q{i}\n\n")
            f.write(q.strip() + "\n\n")

            for j, s in enumerate(suggestions, 1):
                f.write(f"- Q{i}.{j} {s}\n")

            f.write("\n")

    # Individual QA files
    for i, (q, a) in enumerate(pairs, 1):
        out = output_dir / f"Q{i:03d}.md"
        with out.open("w", encoding="utf-8") as f:
            f.write(f"# Question {i}\n\n")
            f.write("## User Question\n\n")
            f.write(q.strip() + "\n\n")
            f.write("## Assistant Answer\n\n")
            f.write(a.strip() + "\n\n")

            suggestions = extract_suggested_questions(a)
            if suggestions:
                f.write("## Suggested / Follow-up Questions\n\n")
                for j, s in enumerate(suggestions, 1):
                    f.write(f"- Q{i}.{j} {s}\n")

# ---------- Main ----------

def main():
    path = input("Enter path to conversation .md file: ").strip().strip('"')
    file_path = Path(path)

    if not file_path.exists():
        print("‚ùå File not found!")
        return

    text = file_path.read_text(encoding="utf-8", errors="ignore")

    pairs = split_conversation(text)

    if not pairs:
        print("‚ö†Ô∏è No conversation blocks detected. The format may be unknown.")
        return

    output_dir = Path(r"D:\Balaji-workbench\synthetic data")

    write_outputs(pairs, output_dir)

    print("‚úÖ Done!")
    print(f"üìÑ Processed {len(pairs)} Q&A pairs")
    print(f"üìÅ Output folder: {output_dir}")

if __name__ == "__main__":
    main()


‚úÖ Done!
üìÑ Processed 9 Q&A pairs
üìÅ Output folder: D:\Balaji-workbench\synthetic data
