<a href="https://colab.research.google.com/github/ghoshmoumita04/ProviderOnboarding/blob/main/Provider_onboarding_draft.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install scikit-learn




In [None]:
import os
import json
import re
import base64
import datetime
import numpy as np
from typing import List, Dict
from sklearn.ensemble import RandomForestClassifier


In [None]:
ATTACHMENT_DIR = "attachments"
AUDIT_LOG_DIR = "audit_logs"

os.makedirs(ATTACHMENT_DIR, exist_ok=True)
os.makedirs(AUDIT_LOG_DIR, exist_ok=True)

print("Directories ready")


Directories ready


In [None]:
def log_audit(request_id: str, provider_name: str, action: str, details: dict):
    log_entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "request_id": request_id,
        "provider_name": provider_name,
        "action": action,
        "details": details
    }

    logfile = f"{AUDIT_LOG_DIR}/{request_id}.json"

    if os.path.exists(logfile):
        with open(logfile, "r") as f:
            logs = json.load(f)
    else:
        logs = []

    logs.append(log_entry)

    with open(logfile, "w") as f:
        json.dump(logs, f, indent=2)


In [None]:
class AttachmentHandler:

    def save_small_attachments(self, small_attachments: List[Dict]) -> List[str]:
        saved = []
        for att in small_attachments:
            path = f"{ATTACHMENT_DIR}/{att['file_name']}"
            with open(path, "wb") as f:
                f.write(base64.b64decode(att["content_base64"]))
            saved.append(path)
        return saved

    def process_all(self, small_attachments, large_attachments):
        files = []
        if small_attachments:
            files += self.save_small_attachments(small_attachments)

        if large_attachments:
            print("Large attachment download skipped in Colab (handled via SN in prod)")

        return files


In [None]:
def read_documents(files: List[str]) -> str:
    text = ""
    for file in files:
        with open(file, "r", errors="ignore") as f:
            text += f.read() + "\n"
    return text


In [None]:
def extract_provider_data(text):
    data = {}

    def find(pattern):
        m = re.search(pattern, text)
        return m.group(1) if m else None

    data["provider_name"] = find(r"Provider Name:\s*(.+)")
    data["npi"] = find(r"NPI:\s*(\d+)")
    data["license_number"] = find(r"License Number:\s*(\S+)")
    data["license_state"] = find(r"License State:\s*(\S+)")
    data["license_expiry"] = find(r"License Expiry:\s*(\S+)")
    data["tax_id"] = find(r"Tax ID:\s*(\S+)")
    data["specialty"] = find(r"Specialty:\s*(.+)")

    return data


In [None]:
def verify_npi(npi):
    return npi and len(npi) == 10

def verify_license(expiry):
    return expiry and expiry > "2025-01-01"

def oig_exclusion_check(npi):
    return False  # External API in real life

def validate_provider(data):
    issues = []
    if not verify_npi(data.get("npi")):
        issues.append("Invalid NPI")
    if not verify_license(data.get("license_expiry")):
        issues.append("License expired")
    if oig_exclusion_check(data.get("npi")):
        issues.append("OIG exclusion")
    return issues


In [None]:

def build_risk_features(validation_issues, name_similarity, sanctions_flag):
    """
    Converts provider compliance signals into ML-ready features
    """
    return np.array([[
        len(validation_issues),          # Number of validation failures
        int(name_similarity < 0.85),     # Name mismatch flag
        int(sanctions_flag)              # Regulatory risk flag
    ]])
def predict_provider_risk(validation_issues, name_similarity, sanctions_flag):
    features = build_risk_features(
        validation_issues,
        name_similarity,
        sanctions_flag
    )
    return risk_model.predict(features)[0]





In [None]:
# alternative to test
X_train = np.array([
    [0, 0, 0],
    [1, 0, 0],
    [1, 1, 0],
    [2, 1, 1]
])
y_train = np.array([0, 0, 1, 1])

risk_model = RandomForestClassifier()
risk_model.fit(X_train, y_train)



In [None]:
def generate_risk_features(issues, name_mismatch):
    return np.array([[len(issues), name_mismatch, 0]])

In [None]:
def onboarding_decision(risk, issues):
    if risk == 0 and not issues:
        return "AUTO_APPROVED"
    return "MANUAL_REVIEW"


In [None]:
class AIProcessor:

    def process(self, request_id, provider_data, attachment_files):
        text = read_documents(attachment_files)
        extracted = extract_provider_data(text)

        issues = validate_provider(extracted)

        name_mismatch = 0 if extracted.get("provider_name") == provider_data.get("provider_name") else 1
        features = generate_risk_features(issues, name_mismatch)

        risk = risk_model.predict(features)[0]
        decision = onboarding_decision(risk, issues)

        return {
            "decision": decision,
            "risk": int(risk),
            "issues": issues,
            "extracted_data": extracted
        }


In [None]:
def create_provider_record(provider_data, ai_result):
    if ai_result["decision"] != "AUTO_APPROVED":
        return {"status": "SKIPPED"}

    record = {
        "provider_id": provider_data["provider_id"],
        "provider_name": provider_data["provider_name"],
        "created_at": datetime.datetime.now().isoformat(),
        "status": "ACTIVE"
    }

    return {"status": "CREATED", "record": record}


In [None]:
def handle_provider_request(payload):
    request_id = payload["request_id"]
    provider_data = payload["provider_data"]

    log_audit(request_id, provider_data["provider_name"], "REQUEST_RECEIVED", payload)

    handler = AttachmentHandler()
    files = handler.process_all(
        payload.get("small_attachments", []),
        payload.get("large_attachments", [])
    )

    log_audit(request_id, provider_data["provider_name"], "ATTACHMENTS_READY", files)

    ai = AIProcessor()
    ai_result = ai.process(request_id, provider_data, files)

    log_audit(request_id, provider_data["provider_name"], "AI_COMPLETED", ai_result)

    provider_insert = create_provider_record(provider_data, ai_result)
    log_audit(request_id, provider_data["provider_name"], "PROVIDER_TABLE_UPDATE", provider_insert)

    return {
        "request_id": request_id,
        "decision": ai_result["decision"],
        "issues": ai_result["issues"],
        "provider_table": provider_insert
    }
