In [None]:
import os
import random
import math
import pandas as pd
import torch
import torch.nn.functional as F
import gradio as gr
from datetime import datetime


In [None]:
DATA_PATH = "/mnt/data/realistic_contractors_100skills_100locations.csv"
JOB_LOG_CSV = "nbs_job_log.csv"
SEED = 42
random.seed(SEED)
torch.manual_seed(SEED)



<torch._C.Generator at 0x77ff8f573790>

In [None]:
df = pd.read_csv("realistic_contractors_100skills_100locations.csv"
)
# Ensure expected columns exist
expected_cols = ["ContractorID", "Name", "Skills", "Location", "Rating", "JobTypesHandled", "Availability"]
for c in expected_cols:
    if c not in df.columns:
        raise ValueError(f"Expected column '{c}' not found in CSV. Found columns: {df.columns.tolist()}")

# Preprocess skills and locations
def split_and_clean(s):
    if pd.isna(s): return []
    return [token.strip().lower() for token in str(s).split(",") if token.strip()]

df["skill_list"] = df["Skills"].apply(split_and_clean)
df["jobtype_list"] = df["JobTypesHandled"].apply(split_and_clean)
df["location_clean"] = df["Location"].astype(str).str.strip().str.lower()


In [None]:
all_skills = sorted({skill for skills in df["skill_list"] for skill in skills})
all_locations = sorted(set(df["location_clean"].tolist()))
skill_to_idx = {s:i for i,s in enumerate(all_skills)}
location_to_idx = {l:i for i,l in enumerate(all_locations)}


In [None]:
num_contractors = len(df)
num_skills = len(all_skills)
num_locations = len(all_locations)


In [None]:
contractor_skill_matrix = torch.zeros((num_contractors, num_skills), dtype=torch.float32)
contractor_location_matrix = torch.zeros((num_contractors, num_locations), dtype=torch.float32)
contractor_rating = torch.tensor(df["Rating"].fillna(df["Rating"].mean()).astype(float).values, dtype=torch.float32)
availability_list = df["Availability"].astype(str).str.lower().tolist()

for i, skills in enumerate(df["skill_list"]):
    for s in skills:
        if s in skill_to_idx:
            contractor_skill_matrix[i, skill_to_idx[s]] = 1.0
for i, loc in enumerate(df["location_clean"]):
    if loc in location_to_idx:
        contractor_location_matrix[i, location_to_idx[loc]] = 1.0


In [None]:
contractor_skill_matrix = F.normalize(contractor_skill_matrix, p=2, dim=1)


In [None]:
def match_contractors(required_skills, job_location, job_type=None, top_k=5, location_boost=1.3):
    """
    required_skills: list of skill strings (cleaned)
    job_location: string (cleaned)
    job_type: optional job type string
    """
    # Build job skill vector
    job_vec = torch.zeros((num_skills,), dtype=torch.float32)
    for s in required_skills:
        idx = skill_to_idx.get(s.lower().strip())
        if idx is not None:
            job_vec[idx] = 1.0
    # Normalize if non-zero
    if job_vec.sum() == 0:
        # No skill matchable -> small uniform vector to avoid division by zero
        job_vec = torch.ones_like(job_vec) * 1e-6
    job_vec = F.normalize(job_vec.unsqueeze(0), p=2, dim=1).squeeze(0)

    # Cosine similarities for skills
    sims = torch.matmul(contractor_skill_matrix, job_vec)  # shape: (num_contractors,)

    # Location matching boost
    loc_idx = location_to_idx.get(job_location.lower().strip()) if job_location else None
    if loc_idx is not None:
        same_loc_mask = contractor_location_matrix[:, loc_idx] == 1.0
        sims = sims + same_loc_mask.float() * (location_boost - 1.0)  # boost same location

    # If job_type is given, prefer contractors that handle that job_type
    if job_type:
        job_type_clean = job_type.lower().strip()
        handles_type_mask = torch.tensor([1.0 if job_type_clean in jt else 0.0 for jt in df["jobtype_list"]], dtype=torch.float32)
        sims = sims + handles_type_mask * 0.15  # small bump

    # Reduce score for unavailable contractors
    avail_mask = torch.tensor([1.0 if (str(a).strip().lower() == "yes") else 0.0 for a in availability_list], dtype=torch.float32)
    sims = sims * (0.6 + 0.4 * avail_mask)  # unavailable get 60% weight by default

    # Factor in rating (scale rating [min..max] to [0.9..1.1] multiplier)
    min_r, max_r = contractor_rating.min().item(), contractor_rating.max().item()
    if max_r - min_r < 1e-6:
        rating_mult = torch.ones_like(contractor_rating)
    else:
        rating_norm = (contractor_rating - min_r) / (max_r - min_r)  # 0..1
        rating_mult = 0.9 + 0.2 * rating_norm  # 0.9..1.1
    final_score = sims * rating_mult

    # Get top_k indices
    topk_scores, topk_idx = torch.topk(final_score, k=min(top_k, num_contractors))
    topk = []
    for score, idx in zip(topk_scores.tolist(), topk_idx.tolist()):
        row = df.iloc[idx]
        topk.append({
            "ContractorID": row["ContractorID"],
            "Name": row["Name"],
            "Location": row["Location"],
            "Skills": row["Skills"],
            "Rating": row["Rating"],
            "Availability": row["Availability"],
            "Score": round(float(score), 4)
        })
    return topk


In [None]:
def simulate_acceptance(contractor_id, score):
    # Acceptance probability increases with score and if available
    idx = df.index[df["ContractorID"] == contractor_id].tolist()
    if not idx:
        return False, "Contractor not found"
    idx = idx[0]
    avail = str(df.at[idx, "Availability"]).strip().lower() == "yes"
    base_prob = min(max(score, 0.0), 1.0)  # score can be >1 due to boosts; clamp
    prob = 0.3 + 0.6 * base_prob  # between 0.3 and 0.9
    if not avail:
        prob *= 0.4
    accepted = random.random() < prob
    return accepted, f"Acceptance probability: {prob:.2f}"


In [None]:
def generate_invoice(job_id, client_name, contractor, base_rate=100.0, complexity=1.0):
    """
    contractor: dict with Name, ContractorID, Rating
    base_rate: base price estimate
    complexity: multiplier based on job difficulty
    """
    # Price could depend on rating (higher rating -> higher price) and complexity
    rating = float(contractor.get("Rating", 3.0))
    price = base_rate * complexity * (1 + (5.0 - rating) * 0.05)  # small inverse effect
    vat = price * 0.23  # example VAT 23%
    total = price + vat
    invoice = {
        "InvoiceID": f"INV-{job_id}",
        "Client": client_name,
        "Contractor": contractor["Name"],
        "ContractorID": contractor["ContractorID"],
        "Subtotal": round(price, 2),
        "VAT": round(vat, 2),
        "Total": round(total, 2),
        "Date": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    return invoice


In [None]:
def log_job(job_record):
    # Append job_record dictionary to CSV
    df_log = pd.DataFrame([job_record])
    if os.path.exists(JOB_LOG_CSV):
        df_log.to_csv(JOB_LOG_CSV, mode="a", header=False, index=False)
    else:
        df_log.to_csv(JOB_LOG_CSV, index=False)



In [None]:
SESSION_STATE = {}

def step1_report_issue(client_name, job_title, job_description, job_skills_raw, job_location, job_type):
    """
    STEP 1: Client reports issue
    """
    # record time and basic details
    job_id = f"JOB-{int(datetime.now().timestamp())}-{random.randint(100,999)}"
    required_skills = split_and_clean(job_skills_raw) if job_skills_raw else split_and_clean(job_description)
    SESSION_STATE["job"] = {
        "JobID": job_id,
        "ClientName": client_name,
        "JobTitle": job_title,
        "JobDescription": job_description,
        "RequiredSkills": required_skills,
        "Location": job_location.lower().strip() if job_location else "",
        "JobType": job_type
    }
    msg = f"Recorded job {job_id} for client '{client_name}'. Required skills: {required_skills}"
    return msg, job_id

def step2_record_job():
    """
    STEP 2: Record the job - we already stored in SESSION_STATE; return recorded data
    """
    job = SESSION_STATE.get("job", None)
    if not job:
        return "No job recorded yet. Please report the issue first.", {}
    # Persist minimal record to job log
    job_record = {
        "JobID": job["JobID"],
        "ClientName": job["ClientName"],
        "JobTitle": job["JobTitle"],
        "JobDescription": job["JobDescription"],
        "RequiredSkills": ";".join(job["RequiredSkills"]),
        "Location": job["Location"],
        "JobType": job["JobType"],
        "Status": "Recorded",
        "Timestamp": datetime.now().isoformat()
    }
    log_job(job_record)
    return f"Job {job['JobID']} recorded to log.", job_record

def step3_find_contractor(top_k=5):
    """
    STEP 3: Find a contractor
    """
    job = SESSION_STATE.get("job", None)
    if not job:
        return "No job recorded yet. Please report the issue first.", []
    matched = match_contractors(job["RequiredSkills"], job["Location"], job_type=job["JobType"], top_k=top_k)
    SESSION_STATE["matches"] = matched
    pretty = "\n".join([f"{i+1}. {m['Name']} ({m['ContractorID']}) - Score: {m['Score']} - Availability: {m['Availability']}" for i,m in enumerate(matched)])
    return f"Top {len(matched)} matches:\n{pretty}", matched

def step4_contract_accepts(selected_contractor_id):
    """
    STEP 4: Contractor accepts the job (simulate)
    """
    matches = SESSION_STATE.get("matches", [])
    chosen = None
    for m in matches:
        if m["ContractorID"] == selected_contractor_id:
            chosen = m
            break
    if not chosen:
        return "Selected contractor not in last matched list. Please pick from the matched contractors.", None
    accepted, info = simulate_acceptance(chosen["ContractorID"], chosen["Score"])
    job = SESSION_STATE["job"]
    decision = "Accepted" if accepted else "Rejected"
    job_record = {
        "JobID": job["JobID"],
        "ContractorID": chosen["ContractorID"],
        "ContractorName": chosen["Name"],
        "Decision": decision,
        "DecisionInfo": info,
        "Timestamp": datetime.now().isoformat()
    }
    log_job(job_record)
    SESSION_STATE["assigned_contractor"] = chosen if accepted else None
    return f"Contractor {chosen['Name']} - {decision}. ({info})", chosen if accepted else None

def step5_work_completion(simulate_complexity=1.0):
    """
    STEP 5: Work completion - simulate
    """
    assigned = SESSION_STATE.get("assigned_contractor", None)
    if not assigned:
        return "No contractor has accepted the job yet.", None
    # Simulate some outcome and invoice
    job = SESSION_STATE["job"]
    complexity = max(0.5, float(simulate_complexity))
    invoice = generate_invoice(job["JobID"], job["ClientName"], assigned, base_rate=120.0, complexity=complexity)
    # Mark job as completed in log
    completion_record = {
        "JobID": job["JobID"],
        "ContractorID": assigned["ContractorID"],
        "Status": "Completed",
        "Complexity": complexity,
        "InvoiceID": invoice["InvoiceID"],
        "Total": invoice["Total"],
        "Timestamp": datetime.now().isoformat()
    }
    log_job(completion_record)
    SESSION_STATE["invoice"] = invoice
    return f"Work completed by {assigned['Name']}. Invoice {invoice['InvoiceID']} generated. Total: {invoice['Total']}", invoice

def step6_job_cycle_completion():
    """
    STEP 6: Job cycle completion - finalize
    """
    invoice = SESSION_STATE.get("invoice", None)
    job = SESSION_STATE.get("job", None)
    assigned = SESSION_STATE.get("assigned_contractor", None)
    if not (job and assigned and invoice):
        return "Job not fully processed yet. Make sure a contractor accepted and work is completed."
    # Final record
    final_record = {
        "JobID": job["JobID"],
        "Status": "Closed",
        "ContractorID": assigned["ContractorID"],
        "InvoiceID": invoice["InvoiceID"],
        "TotalPaid": invoice["Total"],
        "ClosedAt": datetime.now().isoformat()
    }
    log_job(final_record)
    # Return summary
    summary = {
        "JobID": job["JobID"],
        "Client": job["ClientName"],
        "Contractor": assigned["Name"],
        "Invoice": invoice
    }
    # Clear session state (optional)
    SESSION_STATE.clear()
    return "Job cycle completed and closed.", summary



In [None]:
with gr.Blocks(title="NBS Workflow Process (PyTorch-based matcher)") as demo:
    gr.Markdown("## NBS Workflow Process â€” Steps 1 to 6\nFill the form to report a job and follow through the steps. (This demo uses a simple PyTorch-based matching on skills & location.)")
    with gr.Row():
        with gr.Column(scale=2):
            client_name = gr.Textbox(label="Client Name", value="Alice")
            job_title = gr.Textbox(label="Job Title", value="Repair broken wall")
            job_description = gr.Textbox(label="Job Description (optional)", lines=3, value="Cracked plaster and minor masonry repair")
            job_skills_raw = gr.Textbox(label="Required Skills (comma-separated) - optional", value="bricklaying, painting")
            job_location = gr.Textbox(label="Job Location (town/city)", value="Dublin")
            job_type = gr.Textbox(label="Job Type (Maintenance/Repair/Emergency etc.)", value="Maintenance")
            report_btn = gr.Button("STEP 1: Report Issue")
            record_btn = gr.Button("STEP 2: Record Job")
            find_btn = gr.Button("STEP 3: Find Contractor")
            matches_out = gr.Textbox(label="Matches Output", interactive=False)
            matches_dropdown = gr.Dropdown(label="Select ContractorID from matches (for STEP 4)", choices=[], value=None)
            accept_btn = gr.Button("STEP 4: Contractor Accepts (simulate)")
            accept_out = gr.Textbox(label="Accept Output", interactive=False)
            complete_btn = gr.Button("STEP 5: Work Completion (simulate & invoice)")
            complete_out = gr.Textbox(label="Completion Output", interactive=False)
            finalize_btn = gr.Button("STEP 6: Job Cycle Completion")
            finalize_out = gr.Textbox(label="Finalization Output", interactive=False)
        with gr.Column(scale=1):
            gr.Markdown("### Latest Dataset Stats")
            stats = gr.Textbox(value=f"Contractors: {num_contractors}\nUnique skills: {num_skills}\nUnique locations: {num_locations}", interactive=False)
            gr.Markdown("### Logs")
            logs_btn = gr.Button("Open job log CSV (if exists)")
            log_file = gr.Textbox(label="Job log file path", value=JOB_LOG_CSV, interactive=False)

    # Wire callbacks
    def on_report_click(name, title, desc, skills, location, jtype):
        msg, jobid = step1_report_issue(name, title, desc, skills, location, jtype)
        return msg

    def on_record_click():
        msg, record = step2_record_job()
        return msg

    def on_find_click(k=5):
        msg, matched = step3_find_contractor(top_k=5)
        # prepare dropdown choices
        choices = [m["ContractorID"] for m in matched]
        md = msg + ("\n\n(Select a ContractorID in the dropdown and proceed to STEP 4.)" if choices else "")
        return md, gr.update(choices=choices, value=choices[0] if choices else None)

    def on_accept_click(selected_cid):
        if not selected_cid:
            return "Please select a ContractorID from the dropdown.", None
        msg, contractor = step4_contract_accepts(selected_cid)
        return msg

    def on_complete_click(complexity=1.0):
        msg, invoice = step5_work_completion(simulate_complexity=complexity)
        return msg

    def on_finalize_click():
        msg, summary = step6_job_cycle_completion()
        pretty = msg + "\n\n" + (str(summary) if summary else "")
        return pretty

    report_btn.click(on_report_click, inputs=[client_name, job_title, job_description, job_skills_raw, job_location, job_type], outputs=[matches_out])
    record_btn.click(lambda: step2_record_job()[0], inputs=None, outputs=[matches_out])
    find_btn.click(on_find_click, inputs=None, outputs=[matches_out, matches_dropdown])
    accept_btn.click(on_accept_click, inputs=[matches_dropdown], outputs=[accept_out,])
    complete_btn.click(lambda: step5_work_completion()[0], inputs=None, outputs=[complete_out])
    finalize_btn.click(on_finalize_click, inputs=None, outputs=[finalize_out])
    logs_btn.click(lambda: JOB_LOG_CSV if os.path.exists(JOB_LOG_CSV) else "No log file yet.", inputs=None, outputs=[log_file])



In [None]:
if __name__ == "__main__":
    demo.launch(share=False)  # set share=True in Colab if you want a public link

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Note: opening Chrome Inspector may crash demo inside Colab notebooks.
* To create a public link, set `share=True` in `launch()`.


<IPython.core.display.Javascript object>

In [None]:
import random
import torch
import pandas as pd

# ---------- FIXED MATCH FUNCTION ----------
def match_contractors(required_skills, job_location, job_type=None, top_k=5):
    """
    Match contractors based on skills, location, and job type.
    Returns top_k contractors with scores.
    """

    # Convert skills to lowercase for matching
    required_skills = [s.lower().strip() for s in required_skills] if required_skills else []

    # Compute skill match
    contractor_skills = df["skill_list"].tolist()
    sims = []
    for skills in contractor_skills:
        if not skills:
            sims.append(0.0)
            continue
        overlap = len(set(required_skills) & set(skills))
        sims.append(overlap / max(len(required_skills), 1))
    sims = torch.tensor(sims, dtype=torch.float32)

    # Location boost
    if job_location is not None and isinstance(job_location, str) and job_location.strip():
        loc_clean = job_location.lower().strip()
        loc_mask = torch.tensor(
            [1.0 if loc_clean in str(l) else 0.0 for l in df["location_clean"]],
            dtype=torch.float32
        )
        sims = sims + loc_mask * 0.2

    # Job type boost
    if job_type is not None and isinstance(job_type, str) and job_type.strip():
        job_type_clean = job_type.lower().strip()
        type_mask = torch.tensor(
            [1.0 if job_type_clean in jt else 0.0 for jt in df["jobtype_list"]],
            dtype=torch.float32
        )
        sims = sims + type_mask * 0.15

    # Normalize scores
    if sims.max() > 0:
        sims = sims / sims.max()

    # Top-k selection
    topk_vals, topk_idx = torch.topk(sims, min(top_k, len(sims)))
    results = []
    for idx, val in zip(topk_idx.tolist(), topk_vals.tolist()):
        contractor = df.iloc[idx].to_dict()
        contractor["match_score"] = round(float(val), 3)
        results.append(contractor)

    return results


# ---------- SYNTHETIC JOB GENERATOR ----------
def generate_synthetic_jobs(df, n_jobs=200):
    """
    Generate synthetic test dataset with true contractors.
    """
    jobs = []
    for _ in range(n_jobs):
        row = df.sample(1).iloc[0]
        true_id = row["ContractorID"]

        required_skills = random.sample(
            row["skill_list"],
            min(len(row["skill_list"]), random.randint(1, 3))
        ) if row["skill_list"] else []

        job_loc = row["location_clean"]
        job_type = random.choice(row["jobtype_list"]) if row["jobtype_list"] else None

        jobs.append({
            "JobID": f"TEST-{random.randint(1000,9999)}",
            "RequiredSkills": required_skills,
            "Location": job_loc,
            "JobType": job_type,
            "TrueContractorID": true_id
        })
    return pd.DataFrame(jobs)


# ---------- ACCURACY EVALUATION ----------
def evaluate_accuracy(test_df, top_k=5):
    """
    Evaluate Top-1 and Top-k accuracy on synthetic jobs.
    """
    correct_top1, correct_topk = 0, 0
    total = len(test_df)

    for _, row in test_df.iterrows():
        preds = match_contractors(row["RequiredSkills"], row["Location"], row["JobType"], top_k=top_k)
        pred_ids = [p["ContractorID"] for p in preds]

        if pred_ids and pred_ids[0] == row["TrueContractorID"]:
            correct_top1 += 1
        if row["TrueContractorID"] in pred_ids:
            correct_topk += 1

    return {
        "Top-1 Accuracy": round(correct_top1 / total, 3),
        f"Top-{top_k} Accuracy": round(correct_topk / total, 3)
    }


# ---------- RUN TEST ----------
synthetic_test = generate_synthetic_jobs(df, n_jobs=200)
accuracy_results = evaluate_accuracy(synthetic_test, top_k=5)
print("Synthetic Accuracy Results:", accuracy_results)


Synthetic Accuracy Results: {'Top-1 Accuracy': 0.74, 'Top-5 Accuracy': 0.99}
