In [None]:
from syft_rds.orchestra import setup_rds_stack
from syft_rds import RDS_NOTEBOOKS_PATH
from pathlib import Path

In [None]:
key = "custom_functions_notebook"
stack = setup_rds_stack(key=key, log_level="INFO", reset=True)

In [None]:
do_client = stack.do_rds_client
ds_client = stack.ds_rds_client

# Create a dataset as DO

In [None]:
CWD = RDS_NOTEBOOKS_PATH / "quickstart"

In [None]:
dataset_name = "sqlite-chat-logs"
private_dir = CWD / "data" / dataset_name / "private"
mock_dir = CWD / "data" / dataset_name / "mock"
markdown_path = CWD / "data" / dataset_name / "description.md"

private_dir.mkdir(parents=True, exist_ok=True)
mock_dir.mkdir(parents=True, exist_ok=True)

with open(markdown_path, "w") as f:
    f.write("# SQLite database")

print(f"Created directories: {private_dir}, {mock_dir}")

## Create a private and mock sqlite database

In [None]:
import sqlite3
import json


def create_chat_log_db(base_dir, messages: list[dict]):
    """Create a simple SQLite database with a chat log table."""
    base_dir = Path(base_dir)
    base_dir.mkdir(parents=True, exist_ok=True)

    # Write a settings.json, so the user knows how to connect to the database
    config_path = base_dir / "settings.json"
    db_config = {
        "database": "chat_logs.db",
    }
    config_path.write_text(json.dumps(db_config, indent=2))

    # Create SQLite DB with a simple user chat log table and one row
    db_path = base_dir / "chat_logs.db"
    if db_path.exists():
        db_path.unlink()
    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute("""
        CREATE TABLE IF NOT EXISTS chat_log (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            username TEXT NOT NULL,
            message TEXT NOT NULL
        )
    """)
    for message in messages:
        cur.execute(
            "INSERT INTO chat_log (username, message) VALUES (?, ?)",
            (message["username"], message["message"]),
        )
    conn.commit()
    conn.close()

    print(f"Database created at {db_path}")

In [None]:
mock_messages = [
    {"username": "alice", "message": "Hello, this is a test message!"},
    {"username": "bob", "message": "Hi Alice, message received!"},
]

private_messages = [
    {"username": "alice", "message": "Let's keep this conversation private."},
    {"username": "bob", "message": "Sure, I won't share it with anyone."},
]

create_chat_log_db(mock_dir, mock_messages)
create_chat_log_db(private_dir, private_messages)

In [None]:
try:
    data = do_client.dataset.create(
        name=dataset_name,
        path=private_dir,
        mock_path=mock_dir,
        summary="sqlite dataset",
        description_path=markdown_path,
    )
except Exception as e:
    if "already exists" in str(e):
        print(e)
    else:
        raise e

In [None]:
do_client.dataset.get(name=dataset_name).describe()

# Submit Custom function as DO

In [None]:
custom_function_dir = CWD / "data" / "custom_function"
custom_function_dir.mkdir(parents=True, exist_ok=True)
custom_function_path = custom_function_dir / "sqlite_custom_function.py"
custom_function_readme_path = custom_function_dir / "README.md"

# Note: %%writefile is a magic command to write the content of the cell to a file.

# In the following cell, we create a custom function that loads a query from a file,
# executes it against a SQLite database and writes the result to a file.
# The %%writefile command then writes the cell to custom_function_path.

In [None]:
%%writefile $custom_function_path

import os
import sqlite3
from pathlib import Path
import json

DATA_DIR = Path(os.environ["DATA_DIR"])
CODE_DIR = Path(os.environ["CODE_DIR"])
OUTPUT_DIR = Path(os.environ["OUTPUT_DIR"])

def execute_user_query(code_dir: Path, data_dir: Path, output_dir: Path):
    """
    - Load a DB from data_dir
    - Load user-defined query from code_dir
    - Execute the query on the DB
    - Save the results to output_dir
    """
    with open(data_dir / "settings.json", "r") as f:
        db_config = json.load(f)
        db_path = data_dir / db_config["database"]

    with open(code_dir / "user_params.json", "r") as f:
        user_params = json.load(f)
        db_query = user_params["db_query"]

    conn = sqlite3.connect(db_path)
    cur = conn.cursor()
    cur.execute(db_query)
    rows = cur.fetchall()

    with open(output_dir / "results.json", "w") as f:
        json.dump(rows, f, indent=2)

execute_user_query(CODE_DIR, DATA_DIR, OUTPUT_DIR)

In [None]:
# Create a README

readme = """
# SQLite Custom Function
This function executes a user-defined query on an SQLite database.

## Usage
Create a `user_params.json`, and submit this as a Job with this custom function.
User params should include a `db_query` with the SQL query you want to execute:
```json
{
    "db_query": "SELECT * FROM chat_log"
}
```

Alternatively, you can submit a Job directly to this function:
```
dataset = client.dataset.get(name="sqlite-chat-logs")
custom_function = client.custom_function.get(name="execute_user_query")
job = custom_function.submit_job(dataset_name=dataset.name, db_query="SELECT * FROM chat_log")
```

This will create `user_params.json` automatically and submit the job for you.
"""

custom_function_readme_path.write_text(readme.strip())

In [None]:
do_client.custom_function.submit(
    name="execute_user_query",
    code_path=custom_function_path,
    readme_path=custom_function_readme_path,
)

# Use dataset as DS

In [None]:
ds_client.custom_function.get_all()

In [None]:
# User investigates content of the custom function, and submits a query for execution

custom_func = ds_client.custom_function.get(name="execute_user_query")
custom_func.describe()

In [None]:
job = ds_client.job.submit_with_params(
    dataset_name=dataset_name,
    custom_function=custom_func,
    db_query="SELECT * FROM chat_log WHERE username = 'alice'",
)

# Alternatively, this is equivalent to the above:
# job = custom_func.submit_job(
#     dataset_name=dataset_name,
#     db_query="SELECT * FROM chat_log WHERE username = 'alice'",
# )

# DO reviews

In [None]:
jobs = do_client.job.get_all(status="pending_code_review")
jobs

In [None]:
job = jobs[-1]

job.describe()

In [None]:
job.user_code.describe()

In [None]:
res_job = do_client.run_private(job)

In [None]:
# Review
# NOTE review_results is only available for the data owner, and will look at the Job outputs that are not yet shared to SyftBox
results_to_review = do_client.job.review_results(job)

results_to_review.describe()

print("Loading all output files for job {job.id}...")
for output_name, output in results_to_review.outputs.items():
    print(f"{output_name}: {output}")

In [None]:
# Share
do_client.job.share_results(res_job)

# DS views the results

In [None]:
job = ds_client.job.get_all()[-1]

In [None]:
# DS can see files in the output path
job.describe()

In [None]:
job_results = ds_client.job.get_results(job)

job_results.describe()
for output_name, output in job_results.outputs.items():
    print(f"{output_name}: {output}")

## Clean up

In [None]:
from syft_rds.orchestra import remove_rds_stack_dir

stack.stop()
remove_rds_stack_dir(key=key)