In [None]:
!pip install -q google-genai pandas tqdm

In [None]:
from google.colab import userdata

api_key = userdata.get('gemini_key')

GENERATING WIKI ARTICLES


In [None]:
import os
import json
import requests
import time
from tqdm import tqdm

# ==========================================
# 1. SETUP
# ==========================================
BASE_DIR = "/content/drive/MyDrive/rr_mlops/leftover_batches"
if not os.path.exists(BASE_DIR):
    os.makedirs(BASE_DIR)

OUTPUT_FILE = os.path.join(BASE_DIR, "wiki_source.jsonl")

# ==========================================
# 2. THE CORE KNOWLEDGE SYLLABUS
# ==========================================
TOPICS = [
    # --- 1. THE "MISSING" BASICS (Languages & Runtimes) ---
    "Python (programming language)", "C++", "Java (programming language)",
    "JavaScript", "TypeScript", "Go (programming language)", "Rust (programming language)",
    "SQL", "Bash (Unix shell)", "Node.js", "Java Virtual Machine", "Kotlin", "Swift (programming language)",
    "C#", "Ruby (programming language)", "Scala (programming language)", "Solidity", "Dart (programming language)",

    # --- 2. CLASSICAL ML ALGORITHMS ---
    "Random forest", "Gradient boosting", "XGBoost", "Decision tree learning",
    "Support vector machine", "K-nearest neighbors algorithm", "Naive Bayes classifier",
    "Linear regression", "Logistic regression", "Principal component analysis",
    "K-means clustering", "DBSCAN", "Ensemble learning", "Dimensionality reduction",
    "Time series analysis", "Bayesian statistics", "Causal inference",

    # --- 3. DEEP LEARNING & AI ---
    "Neural network", "Convolutional neural network", "Recurrent neural network",
    "Transformer (machine learning)", "Attention (machine learning)", "Backpropagation",
    "Large language model", "Generative pre-trained transformer", "BERT (language model)",
    "Fine-tuning (machine learning)", "Transfer learning", "Embedding",
    "Generative adversarial network", "Diffusion model", "Reinforcement learning",
    "Computer vision", "Natural language processing", "Explainable AI",

    # --- 4. DATA STRUCTURES ---
    "Array (data structure)", "Linked list", "Hash table", "Binary search tree",
    "Red‚Äìblack tree", "AVL tree", "B-tree", "Heap (data structure)", "Trie",
    "Graph (abstract data type)", "Stack (abstract data type)", "Queue (abstract data type)",
    "Bloom filter", "Disjoint-set data structure",

    # --- 5. ALGORITHMS ---
    "Sorting algorithm", "Quicksort", "Merge sort", "Binary search algorithm",
    "Dynamic programming", "Dijkstra's algorithm", "A* search algorithm",
    "Depth-first search", "Breadth-first search", "Greedy algorithm",
    "Pathfinding", "Simultaneous localization and mapping",

    # --- 6. SYSTEM DESIGN & ARCHITECTURE ---
    "Microservices", "Monolithic application", "Load balancing (computing)",
    "Database sharding", "Caching (computing)", "Content delivery network",
    "CAP theorem", "ACID", "Consistent hashing",
    "Replication (computing)", "Leader election", "Distributed system",
    "Event-driven architecture", "REST", "GraphQL", "gRPC", "WebSocket",
    "Serverless computing", "Service mesh", "API gateway", "OAuth", "JWT",

    # --- 7. MLOps & DEPLOYMENT ---
    "CI/CD", "DevOps", "MLOps", "Containerization (computing)", "Kubernetes",
    "Docker (software)", "Infrastructure as code", "Terraform (software)",
    "Model serving", "Feature store", "Data lineage", "A/B testing",
    "Model monitoring", "Experiment tracking", "Data version control",

    # --- 8. COMMON TOOLS & FRAMEWORKS ---
    "Librosa", "Streamlit", "Flask (web framework)", "Django (web framework)",
    "FastAPI", "Spring Boot", "React (software)", "Angular (web framework)",
    "Apache Kafka", "RabbitMQ", "Redis", "PostgreSQL", "MongoDB", "Elasticsearch",
    "Apache Spark", "Hadoop", "Pandas (software)", "NumPy", "Scikit-learn",

    # ==============================================================================
    # NEW EXPANSIONS BASED ON JOB DESCRIPTIONS
    # ==============================================================================

    # --- 9. GENERATIVE AI & LLM STACK ---
    "Retrieval-augmented generation", "LangChain", "LlamaIndex", "Vector database",
    "Pinecone (database)", "Weaviate", "Prompt engineering", "Parameter-efficient fine-tuning",
    "Low-rank adaptation", "Hugging Face Transformers", "Stable Diffusion", "Synthetic data",
    "Hallucination (artificial intelligence)", "Model alignment",

    # --- 10. MODERN DATA ENGINEERING & ANALYTICS ---
    "Data warehouse", "Data lake", "Data mesh", "Snowflake (software)", "Google BigQuery",
    "Amazon Redshift", "Databricks", "Apache Airflow", "dbt (software)", "ETL", "ELT",
    "Apache Flink", "Star schema", "Slowly changing dimension", "Data governance",

    # --- 11. CLOUD NATIVE & OBSERVABILITY ---
    "Amazon Web Services", "Microsoft Azure", "Google Cloud Platform", "Helm (software)",
    "Argo CD", "Prometheus (software)", "Grafana", "OpenTelemetry", "Istio",
    "Chaos engineering", "Site reliability engineering", "Infrastructure as Service",
    "Platform as a Service", "GitOps",

    # --- 12. FRONTEND & MOBILE DEVELOPMENT ---
    "Vue.js", "Next.js", "Tailwind CSS", "Redux (JavaScript library)", "Webpack",
    "React Native", "Flutter (software)", "SwiftUI", "Jetpack Compose", "Android Studio",
    "Xcode", "Progressive web app", "Single-page application", "WebAssembly",

    # --- 13. GAME DEV & GRAPHICS ---
    "Unity (game engine)", "Unreal Engine", "OpenGL", "Vulkan (API)", "DirectX",
    "Shader", "Ray tracing (graphics)", "3D computer graphics", "Physics engine",
    "Augmented reality", "Virtual reality", "WebGL",

    # --- 14. SECURITY & BLOCKCHAIN ---
    "Cybersecurity", "Penetration test", "Identity management", "Zero trust security model",
    "OWASP", "Smart contract", "Ethereum", "Decentralized application", "Web3",
    "Cryptography", "Public-key cryptography", "Consensus algorithm",

    # --- 15. EMBEDDED & HARDWARE ---
    "Embedded system", "Real-time operating system", "Microcontroller", "Firmware",
    "Internet of things", "Field-programmable gate array", "System on a chip",
    "Communication protocol", "I2C", "SPI", "UART"
]

# ==========================================
# 3. WIKI FETCHER ENGINE (FIXED)
# ==========================================
def fetch_wiki_content(title):
    """Fetches full text from Wikipedia API with User-Agent."""
    url = "https://en.wikipedia.org/w/api.php"

    # üö® FIX: Wikipedia requires a User-Agent or it blocks you
    headers = {
        "User-Agent": "CareerKnowledgeGraphBot/1.0 (educational_project@example.com)"
    }

    params = {
        "action": "query",
        "format": "json",
        "titles": title,
        "prop": "extracts",
        "explaintext": True,
        "redirects": 1
    }

    try:
        response = requests.get(url, params=params, headers=headers, timeout=10)

        # Check if the response is valid
        if response.status_code != 200:
            print(f"‚ö†Ô∏è HTTP Error {response.status_code} for {title}")
            return None

        data = response.json()
        pages = data.get("query", {}).get("pages", {})

        for page_id, page_data in pages.items():
            if page_id == "-1":
                # Try simple search fallback if direct title lookup fails
                print(f"   Note: Direct lookup failed for '{title}', skipping...")
                return None
            if "extract" in page_data:
                return page_data["extract"]

    except Exception as e:
        print(f"‚ö†Ô∏è Error fetching {title}: {e}")
    return None

# ==========================================
# 4. EXECUTION
# ==========================================
print(f"üöÄ Initializing Knowledge Injection for {len(TOPICS)} core topics...")
print(f"üìÇ Output Target: {OUTPUT_FILE}")

success_count = 0
with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
    for topic in tqdm(TOPICS, desc="Fetching Wiki Articles"):
        content = fetch_wiki_content(topic)

        if content:
            entry = {
                "title": topic,
                "text": content,
                "source": "wikipedia_core_concepts",
                "keyword": "Computer Science Fundamentals"
            }
            f.write(json.dumps(entry) + "\n")
            success_count += 1
            time.sleep(0.5) # Polite rate limiting
        else:
            pass # We already logged the error

print("\n" + "="*50)
print(f"‚úÖ SUCCESSFULLY FETCHED: {success_count} / {len(TOPICS)} Articles")
print(f"üìù Saved to: {OUTPUT_FILE}")

CLEANING GENERATED CYPHERS

In [None]:
import os
import json
import shutil
import time
from google import genai
from google.genai import types
from tqdm.notebook import tqdm

# ==========================================
# 1. CONFIGURATION
# ==========================================
class Config:
    API_KEY = api_key # <--- PASTE YOUR KEY HERE
    MODEL_NAME = "gemini-2.5-flash"

    # Paths (Pointing to the new LEFTOVER batch location)
    BASE_DIR = "/content/drive/MyDrive/rr_mlops/leftover_batches"

    # Inputs (We use the file we just generated)
    PATH_WIKI = os.path.join(BASE_DIR, "wiki_source.jsonl")

    # Batching
    BATCH_SIZE_CHARS = 53500  # Increased for Wiki articles (Context rich)
    RESUME_FROM_BATCH = 39

    # Output (Saved directly to leftover folder)
    # We write to a temp file first, then move to Drive to prevent partial corruption
    LOCAL_OUTPUT_FILE = "extracted_leftover_DAY6.jsonl"
    DRIVE_OUTPUT_PATH = os.path.join(BASE_DIR, LOCAL_OUTPUT_FILE)

# ==========================================
# 2. THE BRAIN (STRICT SYSTEM PROMPT)
# ==========================================
SYSTEM_PROMPT = """
You are a Knowledge Graph Extraction Engine for a Career Advisory System.

Your task is to extract structured knowledge that enables EXPLANATION, not just storage.

This knowledge graph must support:
1. WHY a job is the best match for a candidate
2. WHY other roles are second or third best
3. WHAT specific skills or concepts the candidate is missing
4. HOW skills transfer across roles via shared concepts

You must STRICTLY follow the ontology, rules, and output format below.
If something does not fit the rules, DO NOT output it.

--------------------------------------------------
ONTOLOGY CONTRACT (STRICT)
--------------------------------------------------

ALLOWED NODE TYPES:
JobPosting
JobRole
Skill
ProgrammingLanguage
Framework
Tool
Platform
CloudService
Database
Concept
Company
Location

--------------------------------------------------
CORE SEMANTIC PRINCIPLE (MANDATORY)
--------------------------------------------------

The graph MUST separate:
- EXECUTION (Skill)
- EXPLANATION (Concept)

This separation is NON-NEGOTIABLE.

--------------------------------------------------
SKILL vs CONCEPT CLASSIFICATION (CRITICAL RULE)
--------------------------------------------------

Skill = "Can this be directly practiced, implemented, or demonstrated?"

Concept = "Does this explain WHY or WHAT principle something is based on?"

HARD CONSTRAINT:
If a Skill candidate can be rewritten as
"Understanding of X" or "Knowledge of X",
it MUST be classified as Concept.

ANCHORING RULE:
Every Skill emitted for a JobRole SHOULD be supported
by at least one Concept via either:
- JobRole REQUIRES_CONCEPT
- Tool IMPLEMENTS_CONCEPT

If no Concept exists add one according to your knowledge.
If the current wiki article seems small as we are splitting the text across batch for current node make rekations according to your knowledge if there are not many made beacuse of less content

VALID Skill examples:
- SQL Query Writing
- REST API Development
- Feature Engineering
- Model Deployment
- Prompt Engineering
- Kubernetes Troubleshooting
- CI/CD Pipeline Implementation
- Data Visualization
- Unit Testing
- API Integration

INVALID as Skill (MUST be Concept):
- Machine Learning
- Deep Learning
- Object-Oriented Programming
- System Design
- Cloud Architecture
- Distributed Systems
- Microservices Architecture
- Natural Language Processing
- Security Best Practices
- Data Modeling
- Statistics

RULES:
- If a term explains WHY ‚Üí Concept
- If a term explains HOW ‚Üí Skill
- If uncertain ‚Üí Concept
- NEVER attach vague domains as Skill

--------------------------------------------------
TOOL vs FRAMEWORK vs PLATFORM (STRICT DEFINITIONS)
--------------------------------------------------
Classify technical entities using these rules:

1. FRAMEWORK: "Inverts control". It calls your code. It provides a scaffold.
   - EXAMPLES: React, Angular, Vue, Django, Spring Boot, TensorFlow, PyTorch, Flutter.
   - RULE: If it dictates the architecture of the app, it is a Framework.

2. TOOL: A library, utility, or software used to perform a task. You call it.
   - EXAMPLES: Pandas (Library -> Tool), NumPy (Library -> Tool), Git (CLI Tool), Docker (Container Tool), Jenkins (Build Tool), Jira (Project Tool).
   - RULE: If you import it to do math/data manipulation, it is a Tool (Library).

3. PLATFORM: An environment where software runs or is hosted.
   - EXAMPLES: iOS, Android, Linux, Windows, GitHub (SaaS Platform), GitLab.

--------------------------------------------------
ALLOWED RELATIONS (SEMANTIC)
--------------------------------------------------

-- Job Structure --
POSTED_BY          (JobPosting -> Company)
LOCATED_IN         (Company -> Location)
IS_FOR_ROLE        (JobPosting -> JobRole)
RELATED_ROLE       (JobRole -> JobRole)

-- Role Requirements (EXPLANATION-FIRST) --
REQUIRES_SKILL     (JobRole -> Skill)
REQUIRES_LANGUAGE  (JobRole -> ProgrammingLanguage)
REQUIRES_TOOL      (JobRole -> Tool | Framework | Database | Platform | CloudService)
REQUIRES_CONCEPT   (JobRole -> Concept)

-- Technical Reasoning (WHY / HOW CHAINS) --
IMPLEMENTS_CONCEPT (Tool | Framework | ProgrammingLanguage -> Concept)
USES_LANGUAGE      (Framework | Tool -> ProgrammingLanguage)
BUILT_WITH         (Tool -> ProgrammingLanguage)
IS_SIMILAR_TO      (Tool | Framework | Database -> same type)
CREATED_BY         (ProgrammingLanguage | Framework | Tool -> Company)

--------------------------------------------------
ALLOWED TRIPLE PATTERNS (MANDATORY)
--------------------------------------------------

You may emit a triplet ONLY if it matches one of these exact patterns:

-- Job Posting Context --
(JobPosting) IS_FOR_ROLE (JobRole)
(JobPosting) POSTED_BY (Company)
(Company) LOCATED_IN (Location)

-- Role Definition Context --
(JobRole) REQUIRES_SKILL (Skill)
(JobRole) REQUIRES_LANGUAGE (ProgrammingLanguage)
(JobRole) REQUIRES_TOOL (Tool | Framework | Database | Platform | CloudService)
(JobRole) REQUIRES_CONCEPT (Concept)
(JobRole) RELATED_ROLE (JobRole)

-- Technical Knowledge Context --
(Framework | Tool | ProgrammingLanguage) IMPLEMENTS_CONCEPT (Concept)
(Framework | Tool) USES_LANGUAGE (ProgrammingLanguage)
(Tool | Framework | Database) IS_SIMILAR_TO (same type)
(Tool) BUILT_WITH (ProgrammingLanguage)
(ProgrammingLanguage | Framework | Tool) CREATED_BY (Company)

If a candidate triplet does NOT match these patterns ‚Üí DISCARD IT.

--------------------------------------------------
FORBIDDEN OUTPUT (NEVER EMIT)
--------------------------------------------------

Do NOT output:
- Responsibilities or actions
- Soft skills (communication, leadership, teamwork)
- Abstract fillers (systems, solutions, techniques)
- Version numbers or editions
- Salary, experience ranges, dates
- Metrics or percentages
- Multi-sentence nodes

--------------------------------------------------
SOURCE-AWARE RULES
--------------------------------------------------

IF INPUT SOURCE IS JOB DESCRIPTION (CSV):
- Treat each record as a JobPosting.
- ALWAYS map JobPosting ‚Üí JobRole using IS_FOR_ROLE.
- Extract ONLY explicitly mentioned technical requirements.
- Do NOT infer skills not written in text.

IF INPUT SOURCE IS ROLE DEFINITION (JSON):
- Subject is the JobRole.
- Extract REQUIRED skills, tools, languages, and concepts.
- Extract adjacent or alternative roles using RELATED_ROLE.

IF INPUT SOURCE IS WIKI ARTICLE:
- DO NOT emit JobPosting or JobRole.
- Focus only on ProgrammingLanguage, Framework, Tool, Concept.
- Emit Company ONLY if it is the creator (CREATED_BY).
- Emit EXACTLY ONE definition object for the primary entity.

--------------------------------------------------
WIKI-SPECIFIC: DEFINITION EXTRACTION (MANDATORY)
--------------------------------------------------

If and ONLY if the input source is a Wiki article, return ONE definition object.

Definition MUST focus on UTILITY for career understanding.

Schema:
- definition: ONE sentence explaining what it is and why it is useful.
- summary: 2 sentences explaining where and why it is used.
- key_characteristics: 3‚Äì5 short, technical bullet points.
- source: provenance string (e.g. "wikipedia").

Do NOT include history, dates, or trivia.

--------------------------------------------------
ENTITY NORMALIZATION RULES
--------------------------------------------------

- Canonical names only (JS ‚Üí JavaScript).
- Singular form only.
- Title Case for entities.
- Disambiguate using context.
- Prefer CloudService over Platform for AWS, Azure, GCP.

--------------------------------------------------
OUTPUT FORMAT (STRICT JSON ONLY)
--------------------------------------------------

Return ONLY a valid JSON object. No markdown. No explanations.

{
  "triplets": [
    {
      "subject": "string",
      "subject_type": "string",
      "relation": "string",
      "object": "string",
      "object_type": "string"
    }
  ],
  "definitions": [
    {
      "name": "string",
      "node_type": "string",
      "definition": "string",
      "summary": "string",
      "key_characteristics": ["string"],
      "source": "string"
    }
  ]
}

"""

# ==========================================
# 3. DATA INGESTOR
# ==========================================
class DataIngestor:
    @staticmethod
    def read_wiki(path):
        print(f"Loading Wiki from: {path}")
        if not os.path.exists(path): return
        with open(path, 'r') as f:
            for line in f:
                try:
                    item = json.loads(line)
                    title = item.get('title', 'Unknown')
                    content = item.get('text', '')
                    # Cap at 2500 words for detailed concepts
                    words = content.split()
                    capped_content = " ".join(words[:2500])
                    yield (
                        f"SOURCE: WIKI_ARTICLE\n"
                        f"PRIMARY_ENTITY: {title}\n"
                        f"CONTENT: {capped_content}\n\n"
                    )
                except: continue

def batch_generator(iterable, max_chars):
    current_batch = []
    current_len = 0
    for text_chunk in iterable:
        chunk_len = len(text_chunk)
        if current_len + chunk_len > max_chars:
            yield "".join(current_batch)
            current_batch = []
            current_len = 0
        current_batch.append(text_chunk)
        current_len += chunk_len
    if current_batch: yield "".join(current_batch)

# ==========================================
# 4. MAIN EXECUTION
# ==========================================
def main():
    client = genai.Client(api_key=Config.API_KEY)

    wiki_gen = DataIngestor.read_wiki(Config.PATH_WIKI)
    all_batches = batch_generator(wiki_gen, Config.BATCH_SIZE_CHARS)

    print(f"\nüöÄ Starting Knowledge Extraction (Batch Size: {Config.BATCH_SIZE_CHARS})...")

    # We append to the file so we don't overwrite if we restart
    mode = 'a' if os.path.exists(Config.DRIVE_OUTPUT_PATH) else 'w'

    with open(Config.DRIVE_OUTPUT_PATH, mode, encoding='utf-8') as f_out:
        for i, batch_text in enumerate(tqdm(all_batches), 1):
            if i < Config.RESUME_FROM_BATCH: continue

            try:
                response = client.models.generate_content(
                    model=Config.MODEL_NAME,
                    contents=f"{SYSTEM_PROMPT}\n\nDATA TO EXTRACT:\n{batch_text}",
                    config=types.GenerateContentConfig(response_mime_type="application/json")
                )

                if response.text:
                    # Sanity check: Ensure it's valid JSON before writing
                    json.loads(response.text)

                    f_out.write(response.text + "\n")
                    f_out.flush()
                    print(f"‚úÖ Batch {i} saved to Drive.")

                time.sleep(15) # Fast model, lower latency needed

            except Exception as e:
                print(f"‚ùå Batch {i} Error: {e}")
                time.sleep(30)

    print(f"\nüéâ Extraction Complete! Saved to: {Config.DRIVE_OUTPUT_PATH}")

if __name__ == "__main__":
    main()

In [None]:
import json
import re
import os
from collections import defaultdict

# ==========================================
# 1. CONFIGURATION
# ==========================================
BASE_DIR = "/content/drive/MyDrive/rr_mlops/leftover_batches"
INPUT_FILE = os.path.join(BASE_DIR, "extracted_leftover_DAY6.jsonl")
CLEAN_FILE = os.path.join(BASE_DIR, "clean_leftover_DAY6.jsonl")
CYPHER_FILE = os.path.join(BASE_DIR, "ingest_leftover_DAY6.cypher")

BATCH_SIZE = 1000

# ==========================================
# 2. ONTOLOGY (UNCHANGED)
# ==========================================
TYPE_OVERRIDES = {
    # Data Science / ML
    "Pandas": "Tool", "NumPy": "Tool", "Scikit-Learn": "Framework",
    "Keras": "Framework", "PyTorch": "Framework", "TensorFlow": "Framework",
    "OpenCV": "Tool", "NLTK": "Tool", "SpaCy": "Tool", # OpenCV/NLTK are libraries -> Tools
    "Matplotlib": "Tool", "Seaborn": "Tool", "D3.js": "Tool", "MLflow": "Tool",
    # DevOps / Cloud
    "Docker": "Tool", "Kubernetes": "Tool", "Terraform": "Tool",
    "Jenkins": "Tool", "Git": "Tool", "Ansible": "Tool",
    "AWS": "CloudService", "Azure": "CloudService", "GCP": "CloudService",
    # Concepts (Must NEVER be Skills)
    "Statistics": "Concept", "Probability": "Concept", "Linear Algebra": "Concept",
    "Calculus": "Concept", "Machine Learning": "Concept", "Deep Learning": "Concept",
    "Data Science": "Concept", "Computer Vision": "Concept", "NLP": "Concept",
    "CI/CD": "Concept", "DevOps": "Concept", "MLOps": "Concept",
    "Agile": "Concept", "Scrum": "Concept", "REST": "Concept",
    "Microservices": "Concept", "System Design": "Concept", "Algorithms": "Concept",
    "Data Structures": "Concept", "Object-Oriented Programming": "Concept"
}

# B. Canonical Name Mapping (The "Alias Crusher")
# Maps variations -> Single Official Name
CANONICAL_NAMES = {
    # JS Ecosystem
    "react.js": "React", "reactjs": "React",
    "vue.js": "Vue", "vuejs": "Vue", "vue": "Vue",
    "node.js": "Node.js", "nodejs": "Node.js", "node": "Node.js",
    "express.js": "Express", "expressjs": "Express",
    "next.js": "Next.js", "nextjs": "Next.js",
    # Cloud
    "amazon web services": "AWS", "aws": "AWS",
    "google cloud platform": "GCP", "google cloud": "GCP", "gcp": "GCP",
    "microsoft azure": "Azure", "azure": "Azure",
    # Databases
    "postgresql": "PostgreSQL", "postgres": "PostgreSQL",
    "mongodb": "MongoDB", "mongo": "MongoDB",
    "mssql": "SQL Server", "sql server": "SQL Server",
    # ML/Python
    "tensorflow": "TensorFlow", "tf": "TensorFlow",
    "pytorch": "PyTorch",
    "scikit-learn": "Scikit-Learn", "sklearn": "Scikit-Learn",
    "mlflow": "MLflow",
    # General
    "golang": "Go", "go lang": "Go",
    "c#": "C#", "c sharp": "C#", "c++": "C++",
    "dotnet": ".NET", ".net": ".NET", ".net core": ".NET Core"
}

# C. Vague Phrase Detector
BAD_PATTERNS = [
    r"\bability to\b", r"\bexperience in\b", r"\bknowledge of\b",
    r"\bresponsible for\b", r"\bworking with\b", r"\bunderstanding of\b",
    r"\bfamiliarity with\b", r"\bproven track record\b", r"\bproficient in\b"
]

# D. Adjective Stripper
ADJECTIVES = [
    r"^Strong\s+", r"^Advanced\s+", r"^Basic\s+", r"^Senior\s+", r"^Junior\s+",
    r"^Hands-on\s+", r"^Expertise in\s+", r"\s+Skills$", r"\s+Development$"
]

# E. Knowledge Injection
INJECTED_EDGES = {
    "PyTorch": [("IMPLEMENTS_CONCEPT", "Deep Learning", "Concept")],
    "TensorFlow": [("IMPLEMENTS_CONCEPT", "Deep Learning", "Concept")],
    "Scikit-Learn": [("IMPLEMENTS_CONCEPT", "Machine Learning", "Concept")],
    "React": [("IMPLEMENTS_CONCEPT", "Frontend Development", "Concept"), ("IMPLEMENTS_CONCEPT", "Component-Based Architecture", "Concept")],
    "Docker": [("IMPLEMENTS_CONCEPT", "Containerization", "Concept")],
    "Kubernetes": [("IMPLEMENTS_CONCEPT", "Orchestration", "Concept")],
    "PostgreSQL": [("IMPLEMENTS_CONCEPT", "Relational Database", "Concept")],
    "MongoDB": [("IMPLEMENTS_CONCEPT", "NoSQL", "Concept")],
    "REST": [("IS_SUBTOPIC_OF", "Architectural Style", "Concept")],
    "CI/CD": [("IS_SUBTOPIC_OF", "DevOps Practice", "Concept")]
}

# F. Allowed Pattern Schema
ALLOWED_PATTERNS = {
    ("JobRole", "REQUIRES_SKILL", "Skill"),
    ("JobRole", "REQUIRES_KNOWLEDGE", "Concept"),
    ("JobRole", "REQUIRES_CONCEPT", "Concept"),
    ("JobRole", "REQUIRES_TOOL", "Tool"),
    ("JobRole", "REQUIRES_TOOL", "Framework"),
    ("JobRole", "REQUIRES_TOOL", "Database"),
    ("JobRole", "REQUIRES_TOOL", "Platform"),
    ("JobRole", "REQUIRES_TOOL", "CloudService"),
    ("JobRole", "REQUIRES_LANGUAGE", "ProgrammingLanguage"),
    ("Tool", "IMPLEMENTS_CONCEPT", "Concept"),
    ("Framework", "IMPLEMENTS_CONCEPT", "Concept"),
    ("ProgrammingLanguage", "IMPLEMENTS_CONCEPT", "Concept"),
    ("Tool", "USES_LANGUAGE", "ProgrammingLanguage"),
    ("Framework", "USES_LANGUAGE", "ProgrammingLanguage"),
    ("Concept", "IS_SUBTOPIC_OF", "Concept")
}
# ==========================================
# 3. HELPER FUNCTIONS
# ==========================================
def clean_name(name):
    if not name: return None
    name = name.strip()
    for pattern in ADJECTIVES:
        name = re.sub(pattern, "", name, flags=re.IGNORECASE)
    lower_name = name.lower()
    if lower_name in CANONICAL_NAMES:
        return CANONICAL_NAMES[lower_name]
    if lower_name in ["sql", "api", "etl", "elt", "bi"]:
        return name.upper()
    return name.title()

def validate_triplet(t):
    # Flexible key access
    s = t.get('subject') or t.get('subject_node')
    o = t.get('object') or t.get('object_node')
    r = t.get('relation') or t.get('relationship') or 'RELATED_TO'

    s_clean = clean_name(s)
    o_clean = clean_name(o)
    if not s_clean or not o_clean: return None

    s_type = TYPE_OVERRIDES.get(s_clean, t.get('subject_type', 'Concept'))
    o_type = TYPE_OVERRIDES.get(o_clean, t.get('object_type', 'Concept'))

    # Pattern Logic
    if (s_type, r, o_type) not in ALLOWED_PATTERNS:
        # Fallback for Wiki definitions
        if r in ["IMPLEMENTS_CONCEPT", "IS_SUBTOPIC_OF"]:
            pass
        else:
            return None

    return {
        "subject": s_clean, "subject_type": s_type,
        "relation": r,
        "object": o_clean, "object_type": o_type
    }

def chunker(seq, size):
    seq_list = list(seq)
    for i in range(0, len(seq_list), size):
        yield seq_list[i:i + size]

def generate_json_blocks(file_content):
    """
    Generator that parses concatenated JSON objects from a string.
    Works for pretty-printed JSONs stacked one after another.
    """
    decoder = json.JSONDecoder()
    pos = 0
    while pos < len(file_content):
        # Skip whitespace/newlines
        while pos < len(file_content) and file_content[pos].isspace():
            pos += 1
        if pos >= len(file_content):
            break

        try:
            obj, end_pos = decoder.raw_decode(file_content, idx=pos)
            yield obj
            pos = end_pos
        except json.JSONDecodeError:
            # If we hit garbage, try to skip ahead or just break
            # In a clean pipeline, we shouldn't hit this often if skipping whitespace worked
            pos += 1

# ==========================================
# 4. MAIN EXECUTION
# ==========================================
def main_validation_and_cypher():
    print(f"üöÄ Processing: {INPUT_FILE}")

    valid_triplets = []
    seen_hashes = set()

    # --- PHASE 1: STREAM PARSING & VALIDATION ---
    if not os.path.exists(INPUT_FILE):
        print(f"‚ùå Error: {INPUT_FILE} not found.")
        return

    try:
        with open(INPUT_FILE, 'r', encoding='utf-8') as f:
            raw_text = f.read()

            # Use the STREAM PARSER here
            block_count = 0
            for data in generate_json_blocks(raw_text):
                block_count += 1

                # Normalize inputs (List vs Dict)
                triplets_to_process = []
                if isinstance(data, dict):
                    triplets_to_process = data.get("triplets", [])
                elif isinstance(data, list):
                    triplets_to_process = data

                for raw in triplets_to_process:
                    clean_t = validate_triplet(raw)
                    if not clean_t: continue

                    # Deduplicate
                    h = f"{clean_t['subject']}|{clean_t['relation']}|{clean_t['object']}"
                    if h not in seen_hashes:
                        seen_hashes.add(h)
                        valid_triplets.append(clean_t)

                    # Inject Edges (Hydration)
                    for side in ['subject', 'object']:
                        token = clean_t[side]
                        if token in INJECTED_EDGES:
                            for rel, target, target_type in INJECTED_EDGES[token]:
                                new_t = {
                                    "subject": token,
                                    "subject_type": clean_t[f'{side}_type'],
                                    "relation": rel,
                                    "object": target,
                                    "object_type": target_type
                                }
                                h_new = f"{new_t['subject']}|{new_t['relation']}|{new_t['object']}"
                                if h_new not in seen_hashes:
                                    seen_hashes.add(h_new)
                                    valid_triplets.append(new_t)

            print(f"üîç Successfully parsed {block_count} JSON blocks.")

        # Save Clean JSONL
        with open(CLEAN_FILE, 'w', encoding='utf-8') as f_out:
            for t in valid_triplets:
                f_out.write(json.dumps(t) + "\n")

        print(f"‚úÖ Cleaned {len(valid_triplets)} unique triplets.")

    except Exception as e:
        print(f"‚ùå Critical parsing error: {e}")
        return

    # --- PHASE 2: CYPHER GENERATION ---
    if not valid_triplets:
        print("‚ö†Ô∏è No data to ingest.")
        return

    print("‚öôÔ∏è Generating Cypher...")
    statements = ["// INGESTION SCRIPT\n"]

    # 1. Constraints
    all_types = {t['subject_type'] for t in valid_triplets} | {t['object_type'] for t in valid_triplets}
    for t in sorted(all_types):
        statements.append(f"CREATE CONSTRAINT {t.lower()}_uniq IF NOT EXISTS FOR (n:`{t}`) REQUIRE n.name IS UNIQUE;")

    # 2. Nodes
    nodes = defaultdict(set)
    for t in valid_triplets:
        nodes[t['subject_type']].add(t['subject'])
        nodes[t['object_type']].add(t['object'])

    statements.append("\n// --- NODES ---")
    for n_type, names in nodes.items():
        for batch in chunker(names, BATCH_SIZE):
            # Escape quotes for Cypher
            safe_batch = [n.replace('"', '\\"') for n in batch]
            statements.append(f"UNWIND {json.dumps(safe_batch)} AS name MERGE (n:`{n_type}` {{name: name}});")

    # 3. Relationships
    statements.append("\n// --- RELATIONSHIPS ---")
    rels = defaultdict(list)
    for t in valid_triplets:
        key = (t['subject_type'], t['relation'], t['object_type'])
        rels[key].append({"s": t['subject'], "o": t['object']})

    for (st, rel, ot), pairs in rels.items():
        for batch in chunker(pairs, BATCH_SIZE):
            statements.append(
                f"UNWIND {json.dumps(batch)} AS row "
                f"MATCH (s:`{st}` {{name: row.s}}) MATCH (o:`{ot}` {{name: row.o}}) "
                f"MERGE (s)-[:{rel}]->(o);"
            )

    with open(CYPHER_FILE, 'w', encoding='utf-8') as f:
        f.write("\n".join(statements))

    print(f"üéâ Success! Final script ready at: {CYPHER_FILE}")

if __name__ == "__main__":
    main_validation_and_cypher()

In [None]:
import json
import os

# ==========================================
# 1. CONFIGURATION
# ==========================================
BASE_DIR = "/content/drive/MyDrive/rr_mlops/leftover_batches"
INPUT_FILE = os.path.join(BASE_DIR, "clean_leftover_DAY6.jsonl")
OUTPUT_FILE = os.path.join(BASE_DIR, "ingest_leftover_DAY6.cypher")
BATCH_SIZE = 1000  # Safe batch size for AuraDB

# ==========================================
# 2. HELPER: CHUNKING
# ==========================================
def chunker(seq, size):
    """Yields chunks of a specific size from a list."""
    for i in range(0, len(seq), size):
        yield seq[i:i + size]

# ==========================================
# 3. GENERATORS
# ==========================================
def generate_constraints(types):
    """Generates unique constraints for every node type found."""
    return [
        f"CREATE CONSTRAINT {t.lower()}_uniq IF NOT EXISTS FOR (n:`{t}`) REQUIRE n.name IS UNIQUE;"
        for t in sorted(list(types))
    ]

def generate_node_cypher(triplets):
    """Generates batched UNWIND statements for Node creation."""
    nodes = {}

    # 1. Aggregate unique names per type
    for t in triplets:
        for side in ['subject', 'object']:
            typ = t[f'{side}_type']
            name = t[f'{side}']
            if typ not in nodes: nodes[typ] = set()
            nodes[typ].add(name)

    statements = []
    statements.append("// --- 2. NODE CREATION ---")

    # 2. Generate Batched Queries
    for n_type, names_set in nodes.items():
        all_names = list(names_set)

        for batch in chunker(all_names, BATCH_SIZE):
            json_batch = json.dumps(batch)  # Safe JSON serializing

            query = (
                f"// Batch: Create {n_type} nodes\n"
                f"UNWIND {json_batch} AS name "
                f"MERGE (n:`{n_type}` {{name: name}});"
            )
            statements.append(query)

    return statements

def generate_rel_cypher(triplets):
    """Generates batched UNWIND statements for Relationship creation."""
    rels = {}

    # 1. Aggregate relations by signature
    for t in triplets:
        key = (t['subject_type'], t['relation'], t['object_type'])
        if key not in rels: rels[key] = []

        rels[key].append({"s": t['subject'], "o": t['object']})

    statements = []
    statements.append("// --- 3. RELATIONSHIP CREATION ---")

    # 2. Generate Batched Queries
    for (st, rel, ot), pairs in rels.items():
        for batch in chunker(pairs, BATCH_SIZE):
            json_batch = json.dumps(batch)

            query = (
                f"// Batch: {st} -> {rel} -> {ot}\n"
                f"UNWIND {json_batch} AS row "
                f"MATCH (s:`{st}` {{name: row.s}}) "
                f"MATCH (o:`{ot}` {{name: row.o}}) "
                f"MERGE (s)-[:{rel}]->(o);"
            )
            statements.append(query)

    return statements

# ==========================================
# 4. MAIN
# ==========================================
def main():
    print("‚öôÔ∏è Generating Optimized Cypher Script...")
    triplets = []

    try:
        # Load JSONL
        with open(INPUT_FILE, 'r', encoding='utf-8') as f:
            for line in f:
                if line.strip():
                    try:
                        triplets.append(json.loads(line))
                    except json.JSONDecodeError:
                        continue

        print(f"üìä Loaded {len(triplets)} triplets.")

        if not triplets:
            print("‚ö†Ô∏è Warning: No triplets loaded. Check your input file.")
            return

        # 1. Generate Components
        all_types = set()
        for t in triplets:
            all_types.add(t['subject_type'])
            all_types.add(t['object_type'])

        constraints = generate_constraints(all_types)
        node_queries = generate_node_cypher(triplets)
        rel_queries = generate_rel_cypher(triplets)

        # 2. Write to File with Instructions
        with open(OUTPUT_FILE, 'w', encoding='utf-8') as f:
            # Header Instructions
            f.write("// ======================================================\n")
            f.write("// INSTRUCTIONS FOR NEO4J BROWSER:\n")
            f.write("// 1. Ensure 'Connect result nodes' is unchecked in settings (optional, for speed).\n")
            f.write("// 2. Ensure 'Enable multi-statement query editor' is CHECKED.\n")
            f.write("// 3. Copy-paste this entire file and run it.\n")
            f.write("// ======================================================\n\n")

            f.write("// --- 1. CONSTRAINTS ---\n")
            f.write("\n".join(constraints) + "\n\n")

            # Double newline (\n\n) ensures clear separation for the Browser parser
            f.write("\n\n".join(node_queries) + "\n\n")
            f.write("\n\n".join(rel_queries) + "\n")

        print(f"‚úÖ Success! Script saved to: {OUTPUT_FILE}")
        print(f"   - Node Batches: {len(node_queries)}")
        print(f"   - Relationship Batches: {len(rel_queries)}")

    except FileNotFoundError:
        print("‚ùå Error: Input file not found. Run validation script first.")

if __name__ == "__main__":
    main()

PUSHING ALL GENERATED CYPHERS TO DB

In [None]:
!pip install neo4j~=5.28

In [None]:
from google.colab import userdata

NEO4J_PASSWORD= userdata.get('neo4j_rr_DB')

In [None]:
import os
import glob
import json
import time
from neo4j import GraphDatabase, exceptions

# ==========================================
# 1. CONFIGURATION
# ==========================================
class Config:
    # üéØ TARGET DIRECTORY: Where your clean leftover files live
    SOURCE_DIR = "/content/drive/MyDrive/rr_mlops/leftover_batches"

    # üîê DB CREDENTIALS
    NEO4J_URI = "neo4j+s://0c60999b.databases.neo4j.io"
    NEO4J_USER = "neo4j"
    NEO4J_PASSWORD =  NEO4J_PASSWORD

    BATCH_SIZE = 1000
    MAX_RETRIES = 3

    # Labels to enforce uniqueness on
    ONTOLOGY_LABELS = [
        "JobPosting", "JobRole", "Skill", "Concept",
        "ProgrammingLanguage", "Framework", "Tool",
        "Platform", "CloudService", "Database",
        "Company", "Location"
    ]

# ==========================================
# 2. ROBUST UTILS
# ==========================================
def run_with_retry(session, query, parameters, retries=Config.MAX_RETRIES):
    """Executes a query with transient error handling."""
    for attempt in range(retries):
        try:
            result = session.run(query, parameters)
            return result.consume()
        except (exceptions.ServiceUnavailable, exceptions.TransientError) as e:
            if attempt < retries - 1:
                sleep_time = 2 * (attempt + 1)
                print(f"      ‚ö†Ô∏è Transient Error (Attempt {attempt+1}/{retries}). Retrying in {sleep_time}s...")
                time.sleep(sleep_time)
            else:
                raise e

# ==========================================
# 3. SCHEMA SAFETY (Constraints)
# ==========================================
def ensure_constraints(driver):
    print("\nüõ°Ô∏è Verifying Constraints (Speed & Integrity)...")
    with driver.session() as session:
        for label in Config.ONTOLOGY_LABELS:
            query = f"CREATE CONSTRAINT {label.lower()}_uniq IF NOT EXISTS FOR (n:`{label}`) REQUIRE n.name IS UNIQUE"
            try:
                session.run(query)
                print(f"   ‚úÖ Constraint active: :{label}")
            except Exception as e:
                print(f"   ‚ùå Failed to set constraint for :{label} -> {e}")

# ==========================================
# 4. DATA PREPARATION (Signature Grouping)
# ==========================================
def load_and_group_data(source_dir):
    print(f"\nüìñ Scanning {source_dir} for cleaned data...")

    # We look for ANY .jsonl file, but prioritize 'clean' ones if they exist
    files = sorted(glob.glob(os.path.join(source_dir, "*.jsonl")))

    if not files:
        print("‚ùå No .jsonl files found! Check your path.")
        return {}, {}

    print(f"   found {len(files)} files.")

    # 1. Group Nodes by Label: { "JobRole": {"DevOps", "SRE"}, ... }
    nodes_by_label = {}

    # 2. Group Edges by Signature: { ("JobRole", "REQUIRES_SKILL", "Skill"): [{"s": "DevOps", "o": "Docker"}, ...], ... }
    edges_by_signature = {}

    triplet_count = 0

    for f_path in files:
        print(f"   ... processing {os.path.basename(f_path)}")

        with open(f_path, 'r', encoding='utf-8') as f:
            for line in f:
                if not line.strip(): continue
                try:
                    # Handle both single-line JSONL and potential block JSON
                    data = json.loads(line)

                    # Normalizer: sometimes data is a dict with "triplets", sometimes just the triplet dict
                    if "triplets" in data:
                        items = data["triplets"]
                    elif isinstance(data, list):
                        items = data
                    elif "subject" in data:
                        items = [data] # It's a single triplet line
                    else:
                        continue

                    for t in items:
                        s = t.get("subject")
                        st = t.get("subject_type")
                        r = t.get("relation")
                        o = t.get("object")
                        ot = t.get("object_type")

                        if all([s, st, r, o, ot]):
                            # Add to Nodes (Set dedupes automatically)
                            nodes_by_label.setdefault(st, set()).add(s)
                            nodes_by_label.setdefault(ot, set()).add(o)

                            # Add to Edges (Signature Based)
                            sig = (st, r, ot)
                            edges_by_signature.setdefault(sig, []).append({"s": s, "o": o})
                            triplet_count += 1
                except json.JSONDecodeError:
                    continue

    print(f"‚úÖ Parsed {triplet_count} valid triplets.")
    print(f"   - {len(nodes_by_label)} Node Labels found")
    print(f"   - {len(edges_by_signature)} Unique Relationship Patterns found")

    return nodes_by_label, edges_by_signature

# ==========================================
# 5. INGESTION ENGINE
# ==========================================
def ingest_data(driver, nodes_map, edges_map):
    with driver.session() as session:

        # --- PHASE 1: NODES (The Foundation) ---
        print("\nüèóÔ∏è Phase 1: Ingesting Nodes...")
        for label, names in nodes_map.items():
            name_list = list(names)
            total = len(name_list)
            print(f"   üîπ :{label} ({total} items)")

            for i in range(0, total, Config.BATCH_SIZE):
                batch = name_list[i:i+Config.BATCH_SIZE]

                # QUERY: Hardcoded Label -> Fast & Safe
                query = f"UNWIND $batch AS name MERGE (n:`{label}` {{name: name}})"

                try:
                    run_with_retry(session, query, {"batch": batch})
                except Exception as e:
                    print(f"      ‚ùå Failed batch {i}: {e}")

        # --- PHASE 2: RELATIONSHIPS (The Context) ---
        print("\nüîó Phase 2: Ingesting Relationships...")
        for (s_label, rel_type, o_label), edge_list in edges_map.items():
            total = len(edge_list)
            print(f"   üî∏ ({s_label}) -[:{rel_type}]-> ({o_label}) : {total} items")

            for i in range(0, total, Config.BATCH_SIZE):
                batch = edge_list[i:i+Config.BATCH_SIZE]

                # QUERY: Dynamic Construction, Static Execution
                query = (
                    f"UNWIND $batch AS row "
                    f"MATCH (s:`{s_label}` {{name: row.s}}) "
                    f"MATCH (o:`{o_label}` {{name: row.o}}) "
                    f"MERGE (s)-[:`{rel_type}`]->(o)"
                )

                try:
                    run_with_retry(session, query, {"batch": batch})
                except Exception as e:
                    print(f"      ‚ùå Failed batch {i}: {e}")

    print("\nüéâ Ingestion Pipeline Complete!")

# ==========================================
# 6. MAIN
# ==========================================
def main():
    driver = None
    try:
        driver = GraphDatabase.driver(
            Config.NEO4J_URI,
            auth=(Config.NEO4J_USER, Config.NEO4J_PASSWORD)
        )
        driver.verify_connectivity()
        print("üîå Connected to Neo4j AuraDB")

        # 1. Safety
        ensure_constraints(driver)

        # 2. Preparation
        nodes, edges = load_and_group_data(Config.SOURCE_DIR)

        if not nodes:
            print("‚ö†Ô∏è No data found to ingest. Exiting.")
            return

        # 3. Execution
        ingest_data(driver, nodes, edges)

    except Exception as e:
        print(f"üíÄ Critical Error: {e}")
    finally:
        if driver: driver.close()

if __name__ == "__main__":
    main()

In [None]:
import json
import os

# ==========================================
# 1. CONFIGURATION (Match your Extraction Config)
# ==========================================
WIKI_PATH = "/content/drive/MyDrive/rr_mlops/leftover_batches/wiki_source.jsonl"
BATCH_SIZE_CHARS = 53500

def count_batches():
    if not os.path.exists(WIKI_PATH):
        print(f"‚ùå File not found: {WIKI_PATH}")
        return

    print(f"üìä Analyzing: {os.path.basename(WIKI_PATH)}")

    total_chars = 0
    total_items = 0
    batch_count = 0
    current_batch_len = 0

    with open(WIKI_PATH, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                item = json.loads(line)
                # Mimic the Ingestor's capping logic (2000 words)
                content = item.get('text', '')
                words = content.split()
                capped_content = " ".join(words[:2500])

                # Format the text exactly like it's sent to the LLM
                chunk_text = f"SOURCE: WIKI_ARTICLE\nPRIMARY_ENTITY: {item['title']}\nCONTENT: {capped_content}\n\n"

                chunk_len = len(chunk_text)
                total_chars += chunk_len
                total_items += 1

                # Logic: If adding this chunk exceeds limit, it starts a new batch
                if current_batch_len + chunk_len > BATCH_SIZE_CHARS:
                    batch_count += 1
                    current_batch_len = chunk_len
                else:
                    current_batch_len += chunk_len

            except json.JSONDecodeError:
                continue

    # Don't forget the last remaining batch
    if current_batch_len > 0:
        batch_count += 1

    print("-" * 40)
    print(f"üìù Total Articles:    {total_items}")
    print(f"üî† Total Characters:  {total_chars:,}")
    print(f"üì¶ Total LLM Batches: {batch_count}")
    print(f"‚è±Ô∏è Est. Time:        ~{batch_count * 15 / 60:.1f} minutes (at 15s/batch)")
    print("-" * 40)

if __name__ == "__main__":
    count_batches()