# 🧠 Assistant OS Factory v1.0
Includes full DAG Flow Builder (Phases 1–4) with GPT, Save/Load, and Simulation

In [12]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [13]:
import os

BASE_DIR = "/content/drive/MyDrive/assistant_markdown/"
OUTPUT_DIR = os.path.join(BASE_DIR, "processed/")
LAUNCHER_DIR = os.path.join(BASE_DIR, "streamlit_ready/")
DAG_FLOW_DIR = os.path.join(BASE_DIR, "dag_flows/")
ZIP_EXPORT_PATH = os.path.join(BASE_DIR, "assistant_bundle.zip")

os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(LAUNCHER_DIR, exist_ok=True)
os.makedirs(DAG_FLOW_DIR, exist_ok=True)


In [14]:
from google.colab import userdata
import openai
openai.api_key = userdata.get("OPENAI")


In [15]:
def enhance_markdown(md_text):
    prompt = f"""You are an AI assistant markdown enhancer. Given the raw markdown below, clean and complete it by:
- Filling in missing fields (e.g., description, input/output, category)
- Ensuring clarity and correct formatting
- Suggesting a better title if needed

Return only the improved markdown.

### Raw Markdown:
{md_text}
"""
    response = openai.ChatCompletion.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3
    )
    return response.choices[0].message.content.strip()


In [16]:
import re
def parse_markdown(md_text):
    fields = {
        "title": re.search(r"^# (.+)", md_text, re.MULTILINE),
        "description": re.search(r"## Description\n([\s\S]+?)\n(?:##|Category:|- Input:|- Output:|$)", md_text),
        "category": re.search(r"Category: (.+)", md_text),
        "inputs": re.findall(r"- Input: (.+)", md_text),
        "outputs": re.findall(r"- Output: (.+)", md_text),
    }
    return {k: (v.group(1).strip() if v else None) if not isinstance(v, list) else v for k, v in fields.items()}


In [20]:
def generate_streamlit_files(data, enhanced_md, output_folder):
    import os
    import json

    name = data.get('title', 'untitled').lower().replace(" ", "_")
    folder = os.path.join(output_folder, name)
    os.makedirs(folder, exist_ok=True)

    # Save enhanced markdown
    with open(os.path.join(folder, f"{name}.md"), "w") as f:
        f.write(enhanced_md)

    # Save .py with run_ui
    with open(os.path.join(folder, f"{name}.py"), "w") as f:
        f.write(f"""import streamlit as st

def run_ui():
    st.title("{data.get('title', 'Untitled')}")
    st.write("{data.get('description', 'No description provided.')}")
    # Add UI logic here

if __name__ == '__main__':
    run_ui()
""")

    # Save manifest.json
    manifest = {
        "title": data.get('title', 'Untitled'),
        "category": data.get("category", "Uncategorized"),
        "uses_gpt": "gpt" in enhanced_md.lower(),
        "has_run_ui": True,
        "path": folder
    }
    with open(os.path.join(folder, "manifest.json"), "w") as f:
        json.dump(manifest, f, indent=2)

    # Save README.md
    with open(os.path.join(folder, "README.md"), "w") as f:
        f.write(f"# {data.get('title', 'Untitled')}\n\n")
        f.write(f"## Description\n{data.get('description', 'N/A')}\n\n")
        f.write(f"### Category\n{data.get('category', 'Uncategorized')}\n\n")
        f.write("### Inputs\n")
        for input_item in data.get("inputs", []):
            f.write(f"- {input_item}\n")
        f.write("\n### Outputs\n")
        for output_item in data.get("outputs", []):
            f.write(f"- {output_item}\n")

    return folder

In [21]:
# ✅ Write DAG Phase 1
dag_code = """import streamlit as st
from streamlit_dag import Dag, Task
import json
import os

st.set_page_config(page_title="🧱 DAG Flow Builder", layout="wide")
st.title("🧱 DAG Flow Builder – Phase 1: Visual Enhancements")

# === CONFIGURATION ===
CATEGORY_STYLES = {
    "Data Cleaning": {"color": "#4BA3C7", "icon": "🧹"},
    "Data Structuring": {"color": "#FFD166", "icon": "🧱"},
    "Data Enrichment": {"color": "#EF476F", "icon": "✨"},
    "Validation & QA": {"color": "#06D6A0", "icon": "✅"},
    "Export": {"color": "#118AB2", "icon": "📤"},
    "Uncategorized": {"color": "#CCCCCC", "icon": "❓"},
}

# === MOCKED ASSISTANTS (can be loaded dynamically later) ===
assistants = [
    {"id": "clean_nulls", "title": "Clean Nulls", "category": "Data Cleaning"},
    {"id": "normalize_columns", "title": "Normalize Columns", "category": "Data Structuring"},
    {"id": "detect_outliers", "title": "Detect Outliers", "category": "Validation & QA"},
    {"id": "merge_datasets", "title": "Merge Datasets", "category": "Data Structuring"},
    {"id": "export_to_api", "title": "Export to API", "category": "Export"},
]

# === BUILD TASKS WITH STYLES ===
def style_node(node):
    cat = node.get("category", "Uncategorized")
    style = CATEGORY_STYLES.get(cat, CATEGORY_STYLES["Uncategorized"])
    return Task(
        id=node["id"],
        label=f"{style['icon']} {node['title']}",
        style={"backgroundColor": style["color"], "borderRadius": "10px", "boxShadow": "2px 2px 5px rgba(0,0,0,0.2)"},
    )

nodes = [style_node(a) for a in assistants]
edges = [("clean_nulls", "normalize_columns"), ("normalize_columns", "detect_outliers")]

# === LAYOUT ===
st.subheader("🎨 Styled Assistant Graph")
with Dag(nodes, edges, direction="LR", node_spacing=60, layer_spacing=80) as result:
    st.write("📦 Current flow:", result)

# === HOVER TOOLTIP MOCKUP ===
st.markdown("ℹ️ Hover over each node to view assistant category and color.")
"""
dag_path = os.path.join(LAUNCHER_DIR, "pipeline_designer_phase1.py")
with open(dag_path, "w") as f:
    f.write(dag_code)
print(f"✅ pipeline_designer_phase1.py written to:", dag_path)


✅ pipeline_designer_phase1.py written to: /content/drive/MyDrive/assistant_markdown/streamlit_ready/pipeline_designer_phase1.py


In [22]:

# ✅ Write DAG Phase 2
dag_code = """import streamlit as st
from streamlit_dag import Dag, Task
import openai
import os

st.set_page_config(page_title="🧠 GPT DAG Builder", layout="wide")
st.title("🧠 DAG Flow Builder – Phase 2: GPT Enhancements")

# --- SETUP ---
openai.api_key = os.getenv("OPENAI_API_KEY", "sk-REPLACE_ME")  # Use your actual key or load from env

CATEGORY_STYLES = {
    "Data Cleaning": {"color": "#4BA3C7", "icon": "🧹"},
    "Data Structuring": {"color": "#FFD166", "icon": "🧱"},
    "Data Enrichment": {"color": "#EF476F", "icon": "✨"},
    "Validation & QA": {"color": "#06D6A0", "icon": "✅"},
    "Export": {"color": "#118AB2", "icon": "📤"},
    "Uncategorized": {"color": "#CCCCCC", "icon": "❓"},
}

# --- SIDEBAR: GPT GOAL FLOW ---
st.sidebar.title("🪄 GPT Auto-Builder")
goal = st.sidebar.text_input("Describe your pipeline goal:")
if st.sidebar.button("⚡ Generate DAG from Goal"):
    try:
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{
                "role": "user",
                "content": f"Given the goal '{goal}', suggest a 3-5 step assistant pipeline with titles and categories."
            }],
            temperature=0.4
        )
        st.session_state['gpt_dag_result'] = response.choices[0].message.content.strip()
    except Exception as e:
        st.sidebar.error(f"GPT error: {e}")

if 'gpt_dag_result' in st.session_state:
    st.sidebar.markdown("### 🧠 Suggested Flow:")
    st.sidebar.code(st.session_state['gpt_dag_result'])

# --- MOCKED DATA (later replaced by dynamic flow)
assistants = [
    {"id": "clean_nulls", "title": "Clean Nulls", "category": "Data Cleaning"},
    {"id": "normalize_columns", "title": "Normalize Columns", "category": "Data Structuring"},
    {"id": "detect_outliers", "title": "Detect Outliers", "category": "Validation & QA"},
    {"id": "merge_datasets", "title": "Merge Datasets", "category": "Data Structuring"},
    {"id": "export_to_api", "title": "Export to API", "category": "Export"},
]

def style_node(node):
    cat = node.get("category", "Uncategorized")
    style = CATEGORY_STYLES.get(cat, CATEGORY_STYLES["Uncategorized"])
    return Task(
        id=node["id"],
        label=f"{style['icon']} {node['title']}",
        style={"backgroundColor": style["color"], "borderRadius": "10px", "boxShadow": "2px 2px 5px rgba(0,0,0,0.2)"}
    )

nodes = [style_node(a) for a in assistants]
edges = [("clean_nulls", "normalize_columns"), ("normalize_columns", "detect_outliers")]

# --- MAIN DAG DISPLAY ---
st.subheader("🎨 Styled DAG with GPT Support")
with Dag(nodes, edges, direction="LR", node_spacing=60, layer_spacing=80) as result:
    st.write("📦 Current flow:", result)

# --- GPT Assistant Suggestion (per node) ---
st.markdown("### 🔮 Suggest Next Assistant")
selected_node = st.selectbox("Select current node", [a['title'] for a in assistants])
if st.button("💡 GPT: What comes next?"):
    try:
        prompt = f"I'm building a data pipeline. After the step '{selected_node}', what assistant should come next?"
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.5
        )
        st.success(response.choices[0].message.content.strip())
    except Exception as e:
        st.error(f"GPT error: {e}")

# --- PIPELINE SUMMARY ---
if st.button("🧠 Summarize Pipeline"):
    try:
        steps = ", then ".join([a["title"] for a in assistants])
        prompt = f"Summarize the purpose of this pipeline: {steps}."
        response = openai.ChatCompletion.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0.3
        )
        st.info(response.choices[0].message.content.strip())
    except Exception as e:
        st.error(f"GPT error: {e}")
"""
dag_path = os.path.join(LAUNCHER_DIR, "pipeline_designer_phase2.py")
with open(dag_path, "w") as f:
    f.write(dag_code)
print(f"✅ pipeline_designer_phase2.py written to:", dag_path)


✅ pipeline_designer_phase2.py written to: /content/drive/MyDrive/assistant_markdown/streamlit_ready/pipeline_designer_phase2.py


In [23]:
# ✅ Write DAG Phase 3
dag_code = """import streamlit as st
from streamlit_dag import Dag, Task
import os
import json
from datetime import datetime

st.set_page_config(page_title="💾 DAG Builder – Phase 3", layout="wide")
st.title("💾 DAG Builder – Phase 3: Save, Load, Export")

# --- Path setup ---
FLOW_DIR = "/content/drive/MyDrive/assistant_markdown/dag_flows/"
os.makedirs(FLOW_DIR, exist_ok=True)

# --- Assistant templates ---
CATEGORY_STYLES = {
    "Data Cleaning": {"color": "#4BA3C7", "icon": "🧹"},
    "Data Structuring": {"color": "#FFD166", "icon": "🧱"},
    "Data Enrichment": {"color": "#EF476F", "icon": "✨"},
    "Validation & QA": {"color": "#06D6A0", "icon": "✅"},
    "Export": {"color": "#118AB2", "icon": "📤"},
    "Uncategorized": {"color": "#CCCCCC", "icon": "❓"},
}

# Default mock flow
default_assistants = [
    {"id": "clean_nulls", "title": "Clean Nulls", "category": "Data Cleaning"},
    {"id": "normalize_columns", "title": "Normalize Columns", "category": "Data Structuring"},
    {"id": "detect_outliers", "title": "Detect Outliers", "category": "Validation & QA"},
    {"id": "merge_datasets", "title": "Merge Datasets", "category": "Data Structuring"},
    {"id": "export_to_api", "title": "Export to API", "category": "Export"},
]
default_edges = [("clean_nulls", "normalize_columns"), ("normalize_columns", "detect_outliers")]

# Session state
if "nodes" not in st.session_state:
    st.session_state.nodes = default_assistants
if "edges" not in st.session_state:
    st.session_state.edges = default_edges

# Build styled DAG nodes
def style_node(node):
    cat = node.get("category", "Uncategorized")
    style = CATEGORY_STYLES.get(cat, CATEGORY_STYLES["Uncategorized"])
    return Task(
        id=node["id"],
        label=f"{style['icon']} {node['title']}",
        style={"backgroundColor": style["color"], "borderRadius": "10px", "boxShadow": "2px 2px 5px rgba(0,0,0,0.2)"}
    )

styled_nodes = [style_node(n) for n in st.session_state.nodes]

# UI: DAG Display
st.subheader("🧱 DAG Canvas")
with Dag(styled_nodes, st.session_state.edges, direction="LR", node_spacing=60, layer_spacing=80) as result:
    st.write("🔗 Flow result:", result)

# 💾 SAVE FLOW
if st.button("💾 Save Flow as JSON"):
    flow_data = {
        "nodes": st.session_state.nodes,
        "edges": st.session_state.edges
    }
    filename = f"dag_flow_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(os.path.join(FLOW_DIR, filename), "w") as f:
        json.dump(flow_data, f, indent=2)
    st.success(f"Saved to {filename}")

# 📂 LOAD FLOW
flow_files = [f for f in os.listdir(FLOW_DIR) if f.endswith(".json")]
selected_flow = st.selectbox("📂 Load existing flow", ["-- Select --"] + flow_files)

if selected_flow != "-- Select --":
    with open(os.path.join(FLOW_DIR, selected_flow)) as f:
        loaded = json.load(f)
        st.session_state.nodes = loaded["nodes"]
        st.session_state.edges = loaded["edges"]
    st.success(f"Loaded flow: {selected_flow}")

# 📝 EXPORT AS MARKDOWN
if st.button("📄 Export Flow as Markdown"):
    lines = ["# DAG Flow Summary\n"]
    for n in st.session_state.nodes:
        lines.append(f"## {n['title']}")
        lines.append(f"- ID: `{n['id']}`")
        lines.append(f"- Category: `{n.get('category', 'N/A')}`\n")
    st.download_button("📥 Download Markdown", data="\n".join(lines), file_name="dag_flow_summary.md")
"""
dag_path = os.path.join(LAUNCHER_DIR, "pipeline_designer_phase3.py")
with open(dag_path, "w") as f:
    f.write(dag_code)
print(f"✅ pipeline_designer_phase3.py written to:", dag_path)


✅ pipeline_designer_phase3.py written to: /content/drive/MyDrive/assistant_markdown/streamlit_ready/pipeline_designer_phase3.py


In [24]:
# ✅ Write DAG Phase 4
dag_code = """import streamlit as st
from streamlit_dag import Dag, Task
import openai
import json
import os
from copy import deepcopy

st.set_page_config(page_title="🛠️ DAG Builder – Phase 4", layout="wide")
st.title("🛠️ DAG Builder – Phase 4: Full UX + GPT Simulation")

# GPT Setup
openai.api_key = os.getenv("OPENAI_API_KEY", "sk-REPLACE_ME")

# State
if "nodes" not in st.session_state:
    st.session_state.nodes = []
if "edges" not in st.session_state:
    st.session_state.edges = []
if "history" not in st.session_state:
    st.session_state.history = []

CATEGORY_STYLES = {
    "Data Cleaning": {"color": "#4BA3C7", "icon": "🧹"},
    "Data Structuring": {"color": "#FFD166", "icon": "🧱"},
    "Data Enrichment": {"color": "#EF476F", "icon": "✨"},
    "Validation & QA": {"color": "#06D6A0", "icon": "✅"},
    "Export": {"color": "#118AB2", "icon": "📤"},
    "Uncategorized": {"color": "#CCCCCC", "icon": "❓"},
}

# Node Library (static for now)
node_library = [
    {"title": "Clean Nulls", "category": "Data Cleaning"},
    {"title": "Normalize Columns", "category": "Data Structuring"},
    {"title": "Detect Outliers", "category": "Validation & QA"},
    {"title": "Merge Datasets", "category": "Data Structuring"},
    {"title": "Export to API", "category": "Export"},
]

def generate_node_id(title):
    return title.lower().replace(" ", "_") + "_" + str(len(st.session_state.nodes))

# Sidebar: Node Library
st.sidebar.title("📚 Node Library")
search = st.sidebar.text_input("Search Assistants")
filtered = [n for n in node_library if search.lower() in n["title"].lower()]

for item in filtered:
    if st.sidebar.button(f"➕ Add: {item['title']}"):
        new_id = generate_node_id(item["title"])
        st.session_state.history.append((deepcopy(st.session_state.nodes), deepcopy(st.session_state.edges)))
        st.session_state.nodes.append({
            "id": new_id,
            "title": item["title"],
            "category": item["category"]
        })

# Sidebar: Undo/Redo
st.sidebar.markdown("---")
if st.sidebar.button("↩️ Undo"):
    if st.session_state.history:
        last_nodes, last_edges = st.session_state.history.pop()
        st.session_state.nodes = last_nodes
        st.session_state.edges = last_edges

# Build Task objects
def style_node(node):
    cat = node.get("category", "Uncategorized")
    style = CATEGORY_STYLES.get(cat, CATEGORY_STYLES["Uncategorized"])
    return Task(
        id=node["id"],
        label=f"{style['icon']} {node['title']}",
        style={"backgroundColor": style["color"], "borderRadius": "10px", "boxShadow": "2px 2px 5px rgba(0,0,0,0.2)"}
    )

tasks = [style_node(n) for n in st.session_state.nodes]

st.subheader("🎯 DAG Canvas")
with Dag(tasks, st.session_state.edges, direction="LR", node_spacing=60, layer_spacing=80) as result:
    st.write("🧩 Flow result:", result)
    # Allow edge creation through canvas
    if result["added_edges"]:
        st.session_state.edges += result["added_edges"]

# GPT Simulation
if st.button("🧪 Run Simulation"):
    if not st.session_state.nodes:
        st.warning("No nodes to simulate.")
    else:
        try:
            steps = ", then ".join([n["title"] for n in st.session_state.nodes])
            prompt = f"Simulate what this assistant pipeline will do: {steps}."
            response = openai.ChatCompletion.create(
                model="gpt-4",
                messages=[{"role": "user", "content": prompt}],
                temperature=0.4
            )
            st.info(response.choices[0].message.content.strip())
        except Exception as e:
            st.error(f"GPT error: {e}")
"""
dag_path = os.path.join(LAUNCHER_DIR, "pipeline_designer_phase4.py")
with open(dag_path, "w") as f:
    f.write(dag_code)
print(f"✅ pipeline_designer_phase4.py written to:", dag_path)


✅ pipeline_designer_phase4.py written to: /content/drive/MyDrive/assistant_markdown/streamlit_ready/pipeline_designer_phase4.py
