# Building a Feature Store with a MinIO Bucket and the Docker SDK
##### By: David Cannan Cdaprod

To achieve secure and containerized execution of scripts while logging the results in a stateful manner using Docker, here's a detailed approach. We will use the Docker Python SDK to manage the Docker containers and interact with MinIO to fetch the scripts.

### Prerequisites
1. Ensure the Docker daemon is accessible from your iPhone using the Juno iOS Python notebooks.
2. Ensure you have the `docker` and `minio` Python packages installed:
    ```bash
    pip install docker minio
    ```

### 1. Create a Dockerfile for Your Script
Create a `Dockerfile` that defines the environment for running your scripts. For example:
```Dockerfile
# Use an official Python runtime as a parent image
FROM python:3.9-slim

# Set the working directory in the container
WORKDIR /usr/src/app

# Copy the script into the container
COPY your_script.sh /usr/src/app/your_script.sh

# Make the script executable
RUN chmod +x /usr/src/app/your_script.sh

# Define the command to run the script
CMD ["./your_script.sh"]
```

### 2. Write the Python Script to Fetch and Execute Scripts from MinIO
Below is the Python script to fetch the script from MinIO, build a Docker image, run the container, and log the output.

#### Python Script (`run_script.py`):

To ensure the script can take a URL as an input parameter when running, you can use command-line arguments. The `argparse` library in Python is perfect for this purpose. Here’s how you can modify the script to accept the URL as an input parameter:

### Updated Web Scraping Script with Command-Line Arguments

```python
import os
import requests
from bs4 import BeautifulSoup

def scrape_web_page(url):
    """
    Scrapes the specified web page and returns the text content.

    Args:
        url (str): The URL of the web page to scrape.

    Returns:
        str: The text content of the web page.
    """
    try:
        response = requests.get(url)
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            page_text = soup.get_text(separator='\n')
            return page_text
        else:
            return f"Failed to retrieve the page. Status code: {response.status_code}"
    except requests.RequestException as e:
        return f"An error occurred while making the request: {str(e)}"

if __name__ == "__main__":
    url = os.environ.get("URL")
    if url:
        result = scrape_web_page(url)
        print(result)
    else:
        print("Error: No URL provided.")
```

### Example Usage

You can run the script from the command line, providing the function call as a JSON string:

```bash
python3 llm_web_scraper.py '{"function_name": "scrape_web_page", "inputs": {"url": "https://example.com"}}'
```

### Explanation

1. **scrape_web_page Function**:
   - Scrapes the specified web page and returns the text content.
   - Handles HTTP requests and parses the HTML content using BeautifulSoup.

2. **run_llm_function**:
   - Takes a function call string in JSON format as input.
   - Parses the string to extract the function name and inputs.
   - Calls the specified function with the provided inputs.
   - Handles errors and returns appropriate messages.

3. **main Function**:
   - Uses the `argparse` library to handle command-line arguments.
   - Accepts the function call as a JSON string from the command line.
   - Calls the `run_llm_function` with the provided input and prints the result.

### Running the Script

Save the script to a file, for example, `llm_web_scraper.py`. You can then run it from the command line, passing the function call as a JSON string as shown in the example usage.

This setup allows you to dynamically set the URL (or any other inputs) when running the script, making it flexible and easy to integrate with an AI or other systems.


### Steps to Execute the Script
1. Ensure your Docker daemon is set up to allow remote connections.
2. Save the below Python script (`run_feature.py`) on your iPhone within Juno iOS Python notebooks.
3. Run the Python script to fetch the script from MinIO, build the Docker image, and execute the script in a container.

This approach ensures that your script runs in a secure and isolated environment (Docker container) and logs the output in a stateful manner. The logs are saved to a file on the host machine, ensuring you can review the output after execution.

In [1]:
# run_feature.py
import docker
from minio import Minio
from minio.error import S3Error
import os
import uuid
import datetime
import re

class FeatureStoreInitializer:
    def __init__(self, minio_config, docker_config):
        self.minio_client = Minio(**minio_config)
        self.docker_client = docker.DockerClient(**docker_config)
        self.bucket_name = "function-bucket"
    
    def create_bucket(self):
        try:
            if not self.minio_client.bucket_exists(self.bucket_name):
                self.minio_client.make_bucket(self.bucket_name, location="us-east")
                print(f"Bucket {self.bucket_name} created successfully.")
            else:
                print(f"Bucket {self.bucket_name} already exists.")
        except S3Error as exc:
            if exc.code in ['BucketAlreadyOwnedByYou', 'BucketAlreadyExists']:
                print(f"Bucket {self.bucket_name} already exists.")
            else:
                print(f"Error occurred: {exc}")

    def fetch_script(self, object_name, file_path):
        try:
            self.minio_client.fget_object(self.bucket_name, object_name, file_path)
            print(f"Script {object_name} fetched and saved to {file_path}")
        except S3Error as exc:
            print(f"Error occurred: {exc}")

    def build_docker_image(self, script_path, dockerfile_path='Dockerfile', image_name='script_executor'):
        if not os.path.exists(dockerfile_path):
            dockerfile_content = f"""
FROM python:3.9-slim
WORKDIR /usr/src/app
COPY {os.path.basename(script_path)} /usr/src/app/{os.path.basename(script_path)}
RUN pip install requests beautifulsoup4
RUN chmod +x /usr/src/app/{os.path.basename(script_path)}
CMD ["python", "/usr/src/app/{os.path.basename(script_path)}"]
"""
            with open(dockerfile_path, 'w') as dockerfile:
                dockerfile.write(dockerfile_content.strip())
            print(f"Generated default Dockerfile at {dockerfile_path}")
        else:
            print(f"Using existing Dockerfile at {dockerfile_path}")

        image, logs = self.docker_client.images.build(path='.', tag=image_name, dockerfile=dockerfile_path, nocache=True)
        for log in logs:
            print(log)
        return image

    def clean_up_old_containers(self):
        for container in self.docker_client.containers.list(all=True):
            if 'script_container_' in container.name:
                container.remove(force=True)
                print(f"Removed old container: {container.name}")

    def clean_url(self, url):
        cleaned_url = re.sub(r'https?://', '', url)  # Remove http:// or https://
        cleaned_url = cleaned_url.replace('.com', '')  # Remove .com
        return re.sub(r'\W+', '_', cleaned_url)

    def run_docker_container(self, image_name, environment):
        container_name = f'script_container_{uuid.uuid4()}'
        container = self.docker_client.containers.run(image_name, name=container_name, detach=True, environment=environment)
        container_id = container.id[:3]
        
        cleaned_url = self.clean_url(environment["URL"])
        log_file_name = f'scrape_{cleaned_url}_{container_id}_logs.txt'
        
        logs = container.logs(stream=True)
        
        log_file_path = os.path.join(os.getcwd(), log_file_name)
        with open(log_file_path, 'w') as log_file:
            for log in logs:
                log_line = log.decode('utf-8').strip()
                if log_line:
                    log_file.write(f"{log_line}\n")
                    print(log_line)
        
        container.wait()
        container.remove()
        print(f"Logs saved to {log_file_path}")

    def initialize(self, script_name, environment):
        script_path = os.path.join(os.getcwd(), script_name)
        self.create_bucket()
        self.fetch_script(script_name, script_path)
        self.build_docker_image(script_path)
        self.clean_up_old_containers()
        self.run_docker_container('script_executor', environment)

def scrape_url(url):
    minio_config = {
        "endpoint": "192.168.0.25:9000",
        "access_key": "cda_cdaprod",
        "secret_key": "cda_cdaprod",
        "secure": False,
        "region": "us-east"
    }

    docker_config = {
        "base_url": 'tcp://rpi4-2.local:2375'
    }

    initializer = FeatureStoreInitializer(minio_config, docker_config)
    initializer.initialize("scrape_web_page.py", {"URL": url})

# Example usage within the notebook
scrape_url("https://github.com/cdaprod/cdaprod")  # Replace this with the actual URL you want to scrape

Bucket function-bucket already exists.
Script scrape_web_page.py fetched and saved to /private/var/mobile/Library/Mobile Documents/iCloud~com~rationalmatter~junoapp/Documents/My Scripts/docker-sdk/minio-fs/scrape_web_page.py
Using existing Dockerfile at Dockerfile
{'stream': 'Step 1/6 : FROM python:3.9-slim'}
{'stream': '\n'}
{'stream': ' ---> 0473f6c1be93\n'}
{'stream': 'Step 2/6 : WORKDIR /usr/src/app'}
{'stream': '\n'}
{'stream': ' ---> Running in 805243454de8\n'}
{'stream': ' ---> ccc510ba5294\n'}
{'stream': 'Step 3/6 : COPY scrape_web_page.py /usr/src/app/scrape_web_page.py'}
{'stream': '\n'}
{'stream': ' ---> d1ce8999a6ee\n'}
{'stream': 'Step 4/6 : RUN pip install requests beautifulsoup4'}
{'stream': '\n'}
{'stream': ' ---> Running in d0bbac47d0d1\n'}
{'stream': 'Collecting requests\n'}
{'stream': '  Downloading requests-2.32.3-py3-none-any.whl (64 kB)\n'}
{'stream': '     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 64.9/64.9 kB 1.1 MB/s eta 0:00:00'}
{'stream': '\n'}
{'stream': 'Co

12
MinIO Networking with Overlay Networks
Overlay networks enable seamless multi-host deployments for MinIO’s cloud-native S3-compatible storage solutions. Emphasizing security, scalability, and robust container networking, these technologies streamline complex cloud architectures.
2024-03-29
Link
11
Disaster Proof MinIO with GitOps
When disaster strikes, the power of GitOps shines, transforming potential chaos into a choreographed comeback. Learn how strategic automation, redundancy, and Docker and GitHub integration ensure swift recovery, turning system wipes into minor setbacks.
2024-03-19
Link
10
Powering AI/ML Innovation: Building Feature Stores with MinIO’s High-Performance Object Storage
MinIO’s high-performance object storage is key for AI innovation, offering scalability and integration for feature stores. Its capabilities enable seamless ML workflows, enhancing data management for AI development and deployment, impacting sectors like e-commerce and healthcare.
2024-03-12
Link

Logs saved to /private/var/mobile/Library/Mobile Documents/iCloud~com~rationalmatter~junoapp/Documents/My Scripts/docker-sdk/minio-fs/scrape_github_cdaprod_cdaprod_c88_logs.txt


---

# Additional Development Documentation

- Function Serialization with cloudpickle
- Packaging Python Module
- Logging with Prometheus Labels
- Dashboard with Grafana
- Frontend with Google Mesop
- Metastore with Python (tbc)

---

# CloudPickle & Docker SDK Method

Using `cloudpickle` to serialize functions and the Docker SDK to run these functions in isolated environments is a powerful combination for a flexible and dynamic AI orchestration layer. Here’s how you can integrate `cloudpickle` with Docker SDK to achieve this:

### High-Level Approach

1. **Serialize Functions with `cloudpickle`**: Serialize your Python functions and upload them to MinIO.
2. **Fetch and Deserialize Functions**: Fetch the serialized functions from MinIO and deserialize them using `cloudpickle`.
3. **Run Functions in Docker Containers**: Use Docker SDK to create and run containers that execute the deserialized functions.

### Step-by-Step Implementation

#### 1. Serialize and Upload Functions to MinIO

```python
import cloudpickle
from minio import Minio
import io

def store_function_in_minio(minio_client, bucket_name, function, function_name):
    serialized_function = cloudpickle.dumps(function)
    minio_client.put_object(
        bucket_name,
        function_name,
        data=io.BytesIO(serialized_function),
        length=len(serialized_function)
    )

# Initialize MinIO client
minio_client = Minio(
    "play.min.io",
    access_key="YOUR_ACCESS_KEY",
    secret_key="YOUR_SECRET_KEY",
    secure=True
)

bucket_name = "function-bucket"
if not minio_client.bucket_exists(bucket_name):
    minio_client.make_bucket(bucket_name)

# Example function
def sample_function(data):
    return data * 2

store_function_in_minio(minio_client, bucket_name, sample_function, "sample_function.pkl")
```

#### 2. Fetch and Deserialize Functions

```python
import cloudpickle
from minio import Minio

def fetch_function_from_minio(minio_client, bucket_name, function_name):
    response = minio_client.get_object(bucket_name, function_name)
    serialized_function = response.read()
    return cloudpickle.loads(serialized_function)

# Fetch the function
sample_function = fetch_function_from_minio(minio_client, bucket_name, "sample_function.pkl")
```

#### 3. Run Functions in Docker Containers

Create a Docker container that can fetch and execute the serialized functions.

**Dockerfile**:

```Dockerfile
FROM python:3.9-slim
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY script.py ./
CMD ["python", "./script.py"]
```

**requirements.txt**:

```
minio
cloudpickle
```

**script.py**:

```python
import os
import cloudpickle
from minio import Minio

def fetch_function_from_minio(minio_client, bucket_name, function_name):
    response = minio_client.get_object(bucket_name, function_name)
    serialized_function = response.read()
    return cloudpickle.loads(serialized_function)

if __name__ == "__main__":
    minio_client = Minio(
        "play.min.io",
        access_key=os.getenv("MINIO_ACCESS_KEY"),
        secret_key=os.getenv("MINIO_SECRET_KEY"),
        secure=True
    )

    bucket_name = "function-bucket"
    function_name = os.getenv("FUNCTION_NAME")

    # Fetch and execute the function
    function = fetch_function_from_minio(minio_client, bucket_name, function_name)
    result = function(5)  # Example input
    print(result)
```

#### 4. Orchestration with Docker SDK

Use Docker SDK to build and run the container that fetches and executes the serialized function.

```python
import docker
import os

def build_and_run_docker_container(script_path, dockerfile_path='Dockerfile', image_name='function_executor'):
    client = docker.from_env()

    # Build Docker image
    image, logs = client.images.build(path='.', tag=image_name)
    for log in logs:
        print(log)

    # Run Docker container
    container = client.containers.run(
        image_name,
        environment={
            "MINIO_ACCESS_KEY": "YOUR_ACCESS_KEY",
            "MINIO_SECRET_KEY": "YOUR_SECRET_KEY",
            "FUNCTION_NAME": "sample_function.pkl"
        },
        detach=True
    )

    # Stream logs
    for log in container.logs(stream=True):
        print(log.decode('utf-8').strip())

    # Wait for container to finish and get the logs
    container.wait()
    container.remove()

# Example usage
build_and_run_docker_container('script.py')
```

### Conclusion

This approach combines `cloudpickle` for function serialization, MinIO for object storage, and Docker SDK for isolated execution environments. It allows you to dynamically fetch and run functions in a flexible and scalable way, suitable for building complex AI workflows.

This setup can be further integrated with LangChain and LCEL for enhanced orchestration and management of AI tasks.

---

# Packaging with Python

Certainly! We can package this entire setup into a Python package. This will allow for easy distribution and installation. Here’s how you can structure and implement the Python package:

### Directory Structure

```
ai_orchestration/
├── ai_orchestration/
│   ├── __init__.py
│   ├── orchestrator.py
│   ├── minio_utils.py
│   ├── docker_utils.py
├── scripts/
│   ├── Dockerfile
│   ├── requirements.txt
│   └── script.py
├── tests/
│   ├── __init__.py
│   └── test_orchestrator.py
├── README.md
├── setup.py
└── MANIFEST.in
```

### Implementing the Package

#### 1. `ai_orchestration/__init__.py`

Initialize the package.

```python
# ai_orchestration/__init__.py

from .orchestrator import OrchestrationManager
from .minio_utils import store_function_in_minio, fetch_function_from_minio
from .docker_utils import build_and_run_docker_container
```

#### 2. `ai_orchestration/orchestrator.py`

The orchestration manager.

```python
# ai_orchestration/orchestrator.py

from .minio_utils import fetch_function_from_minio

class OrchestrationManager:
    def __init__(self, minio_client, bucket_name):
        self.minio_client = minio_client
        self.bucket_name = bucket_name
        self.tasks = []

    def add_task(self, function_name, *args, **kwargs):
        task = {"function_name": function_name, "args": args, "kwargs": kwargs}
        self.tasks.append(task)

    def run(self, initial_input):
        input_data = initial_input
        for task in self.tasks:
            function = fetch_function_from_minio(self.minio_client, self.bucket_name, task["function_name"])
            input_data = function(input_data, *task["args"], **task["kwargs"])
        return input_data
```

#### 3. `ai_orchestration/minio_utils.py`

Utilities for MinIO operations.

```python
# ai_orchestration/minio_utils.py

import cloudpickle
import io
from minio import Minio

def store_function_in_minio(minio_client, bucket_name, function, function_name):
    serialized_function = cloudpickle.dumps(function)
    minio_client.put_object(
        bucket_name,
        function_name,
        data=io.BytesIO(serialized_function),
        length=len(serialized_function)
    )

def fetch_function_from_minio(minio_client, bucket_name, function_name):
    response = minio_client.get_object(bucket_name, function_name)
    serialized_function = response.read()
    return cloudpickle.loads(serialized_function)
```

#### 4. `ai_orchestration/docker_utils.py`

Utilities for Docker operations.

```python
# ai_orchestration/docker_utils.py

import docker

def build_and_run_docker_container(script_path, dockerfile_path='scripts/Dockerfile', image_name='function_executor'):
    client = docker.from_env()

    # Build Docker image
    image, logs = client.images.build(path='scripts', tag=image_name)
    for log in logs:
        print(log)

    # Run Docker container
    container = client.containers.run(
        image_name,
        environment={
            "MINIO_ACCESS_KEY": "YOUR_ACCESS_KEY",
            "MINIO_SECRET_KEY": "YOUR_SECRET_KEY",
            "FUNCTION_NAME": "sample_function.pkl"
        },
        detach=True
    )

    # Stream logs
    for log in container.logs(stream=True):
        print(log.decode('utf-8').strip())

    # Wait for container to finish and get the logs
    container.wait()
    container.remove()

    return container.logs()
```

#### 5. `scripts/Dockerfile`

The Dockerfile used to create the execution environment.

```Dockerfile
# scripts/Dockerfile

FROM python:3.9-slim
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
COPY script.py ./
CMD ["python", "./script.py"]
```

#### 6. `scripts/requirements.txt`

The requirements file for the Docker container.

```
minio
cloudpickle
```

#### 7. `scripts/script.py`

The script that runs inside the Docker container.

```python
# scripts/script.py

import os
import cloudpickle
from minio import Minio

def fetch_function_from_minio(minio_client, bucket_name, function_name):
    response = minio_client.get_object(bucket_name, function_name)
    serialized_function = response.read()
    return cloudpickle.loads(serialized_function)

if __name__ == "__main__":
    minio_client = Minio(
        "play.min.io",
        access_key=os.getenv("MINIO_ACCESS_KEY"),
        secret_key=os.getenv("MINIO_SECRET_KEY"),
        secure=True
    )

    bucket_name = "function-bucket"
    function_name = os.getenv("FUNCTION_NAME")

    function = fetch_function_from_minio(minio_client, bucket_name, function_name)
    result = function(5)
    print(result)
```

#### 8. `setup.py`

The setup script for the package.

```python
# setup.py

from setuptools import setup, find_packages

setup(
    name='ai_orchestration',
    version='0.1.0',
    description='A Python package for AI orchestration using MinIO and Docker',
    author='Your Name',
    author_email='your.email@example.com',
    url='https://github.com/yourusername/ai_orchestration',
    packages=find_packages(),
    install_requires=[
        'minio',
        'cloudpickle',
        'docker',
    ],
    entry_points={
        'console_scripts': [
            'ai_orchestration=ai_orchestration:main',
        ],
    },
)
```

#### 9. `README.md`

The readme file for the package.

```markdown
# AI Orchestration

A Python package for AI orchestration using MinIO and Docker.

## Installation

```bash
pip install ai_orchestration
```

## Usage

```python
from ai_orchestration import OrchestrationManager, store_function_in_minio

# Initialize MinIO client
minio_client = Minio(
    "play.min.io",
    access_key="YOUR_ACCESS_KEY",
    secret_key="YOUR_SECRET_KEY",
    secure=True
)

bucket_name = "function-bucket"
if not minio_client.bucket_exists(bucket_name):
    minio_client.make_bucket(bucket_name)

# Store a function in MinIO
def sample_function(data):
    return data * 2

store_function_in_minio(minio_client, bucket_name, sample_function, "sample_function.pkl")

# Initialize Orchestration Manager
manager = OrchestrationManager(minio_client, bucket_name)

# Add tasks
manager.add_task("sample_function.pkl")

# Run the orchestration with initial input
result = manager.run({"data": 5})
print(result)
```
```

### Packaging the Python Package

To package and distribute your Python package, follow these steps:

1. **Build the package**:

```bash
python setup.py sdist bdist_wheel
```

2. **Upload to PyPI**:

```bash
pip install twine
twine upload dist/*
```

3. **Install the package**:

```bash
pip install ai_orchestration
```

This setup allows you to dynamically fetch, deserialize, and execute functions using MinIO and Docker, all packaged neatly into a Python package. This approach provides a flexible, scalable, and reusable solution for AI orchestration.

---

# With Prometheus Labels

To use Prometheus for monitoring and incorporate labels for better granularity and organization of your metrics, follow these steps:

### Step-by-Step Implementation

#### 1. Install Prometheus Client

First, ensure that the Prometheus client library is installed in your Python environment.

```bash
pip install prometheus_client
```

#### 2. Set Up Prometheus Metrics in Your Application

Create a file `metrics.py` in your `ai_orchestration` package to handle Prometheus metrics.

```python
# ai_orchestration/metrics.py

from prometheus_client import Counter, Histogram, Gauge

# Define your metrics
REQUEST_COUNT = Counter(
    'request_count', 'Total number of requests', ['endpoint']
)
REQUEST_LATENCY = Histogram(
    'request_latency_seconds', 'Request latency in seconds', ['endpoint']
)
ACTIVE_USERS = Gauge(
    'active_users', 'Number of active users'
)

def increment_request_count(endpoint: str):
    REQUEST_COUNT.labels(endpoint=endpoint).inc()

def observe_request_latency(endpoint: str, latency: float):
    REQUEST_LATENCY.labels(endpoint=endpoint).observe(latency)

def set_active_users(count: int):
    ACTIVE_USERS.set(count)
```

#### 3. Integrate Prometheus Metrics into Your Orchestration Manager

Update your `OrchestrationManager` to include Prometheus metrics.

```python
# ai_orchestration/orchestrator.py

import time
from .minio_utils import fetch_function_from_minio
from .metrics import increment_request_count, observe_request_latency

class OrchestrationManager:
    def __init__(self, minio_client, bucket_name):
        self.minio_client = minio_client
        self.bucket_name = bucket_name
        self.tasks = []

    def add_task(self, function_name, *args, **kwargs):
        task = {"function_name": function_name, "args": args, "kwargs": kwargs}
        self.tasks.append(task)

    def run(self, initial_input):
        input_data = initial_input
        for task in self.tasks:
            start_time = time.time()
            function = fetch_function_from_minio(self.minio_client, self.bucket_name, task["function_name"])
            input_data = function(input_data, *task["args"], **task["kwargs"])
            latency = time.time() - start_time
            increment_request_count(task["function_name"])
            observe_request_latency(task["function_name"], latency)
        return input_data
```

#### 4. Expose Prometheus Metrics

Set up an endpoint to expose Prometheus metrics. This is typically done in your main application script.

```python
# main.py or your main application script

from flask import Flask, Response
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
from ai_orchestration import OrchestrationManager, store_function_in_minio

app = Flask(__name__)

@app.route('/metrics')
def metrics():
    return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST)

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)
```

#### 5. Using Prometheus Labels in Your Workflow

You can use Prometheus labels to differentiate between different endpoints or tasks in your orchestration. This helps in providing more granular monitoring and analysis of your application’s performance.

### Example Usage

Here’s how you can use the updated `OrchestrationManager` with Prometheus metrics.

```python
from minio import Minio
from ai_orchestration import OrchestrationManager, store_function_in_minio

# Initialize MinIO client
minio_client = Minio(
    "play.min.io",
    access_key="YOUR_ACCESS_KEY",
    secret_key="YOUR_SECRET_KEY",
    secure=True
)

bucket_name = "function-bucket"
if not minio_client.bucket_exists(bucket_name):
    minio_client.make_bucket(bucket_name)

# Store a function in MinIO
def sample_function(data):
    return data * 2

store_function_in_minio(minio_client, bucket_name, sample_function, "sample_function.pkl")

# Initialize Orchestration Manager
manager = OrchestrationManager(minio_client, bucket_name)

# Add tasks
manager.add_task("sample_function.pkl")

# Run the orchestration with initial input
result = manager.run({"data": 5})
print(result)
```

### Conclusion

Integrating Prometheus metrics with labels into your AI orchestration layer enhances monitoring capabilities by providing detailed insights into the performance and behavior of different tasks and endpoints. This setup helps in identifying bottlenecks and optimizing the overall workflow.

For more detailed documentation on Prometheus and its client libraries, refer to:
- [Prometheus Documentation](https://prometheus.io/docs/introduction/overview/)
- [Prometheus Python Client Documentation](https://github.com/prometheus/client_python)

---

# With Grafana Dashboard

To monitor your function store and run functions in the `function-bucket` using Grafana, you need to set up a monitoring stack that includes Prometheus to collect the metrics and Grafana to visualize them. Here’s a detailed guide to achieve this:

### Step-by-Step Guide

#### 1. Set Up Prometheus

First, you need to set up Prometheus to scrape the metrics from your application.

**prometheus.yml**:

```yaml
global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'your_app'
    static_configs:
      - targets: ['localhost:5000']  # Assuming your Flask app runs on port 5000
```

Start Prometheus using Docker:

```bash
docker run -d --name prometheus -p 9090:9090 -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
```

#### 2. Set Up Grafana

Next, set up Grafana to visualize the metrics collected by Prometheus.

Start Grafana using Docker:

```bash
docker run -d --name=grafana -p 3000:3000 grafana/grafana
```

Open Grafana in your browser (`http://localhost:3000`), and follow these steps:

1. **Add Data Source**:
   - Go to Configuration (the gear icon) > Data Sources.
   - Click "Add data source" and select Prometheus.
   - Set the URL to `http://localhost:9090` and click "Save & Test".

2. **Create a Dashboard**:
   - Go to Create (the plus icon) > Dashboard.
   - Click "Add new panel".

3. **Configure Panels**:
   - For example, to monitor the number of requests to each endpoint:
     - **Query**: `sum(rate(request_count[1m])) by (endpoint)`
     - **Visualization**: Choose a suitable visualization, such as a graph or bar chart.
   - To monitor request latency:
     - **Query**: `histogram_quantile(0.95, sum(rate(request_latency_seconds_bucket[5m])) by (le, endpoint))`
     - **Visualization**: Choose a suitable visualization.

4. **Save the Dashboard**:
   - Click "Save dashboard" (the disk icon), give it a name, and save.

### Example Dashboard Panels

Here are some example panels you can add to your Grafana dashboard:

**1. Total Requests Per Endpoint**:

- **Title**: Total Requests Per Endpoint
- **Query**: `sum(rate(request_count[1m])) by (endpoint)`
- **Visualization**: Graph or Bar Chart

**2. Request Latency (95th Percentile)**:

- **Title**: Request Latency (95th Percentile)
- **Query**: `histogram_quantile(0.95, sum(rate(request_latency_seconds_bucket[5m])) by (le, endpoint))`
- **Visualization**: Heatmap or Bar Chart

**3. Active Users**:

- **Title**: Active Users
- **Query**: `active_users`
- **Visualization**: Gauge or Single Stat

### Monitoring the Function Store

To monitor specific details about your function store (e.g., number of functions stored, function executions), you can define additional Prometheus metrics and visualize them in Grafana.

**Define Custom Metrics**:

```python
# ai_orchestration/metrics.py

from prometheus_client import Counter, Histogram, Gauge

# Existing metrics
REQUEST_COUNT = Counter('request_count', 'Total number of requests', ['endpoint'])
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency in seconds', ['endpoint'])
ACTIVE_USERS = Gauge('active_users', 'Number of active users')

# New metrics for function store
FUNCTION_STORE_COUNT = Gauge('function_store_count', 'Number of functions in the store')
FUNCTION_EXECUTIONS = Counter('function_executions', 'Number of function executions', ['function_name'])

def increment_function_executions(function_name: str):
    FUNCTION_EXECUTIONS.labels(function_name=function_name).inc()

def set_function_store_count(count: int):
    FUNCTION_STORE_COUNT.set(count)
```

**Update Orchestration Manager**:

```python
# ai_orchestration/orchestrator.py

from .metrics import increment_request_count, observe_request_latency, increment_function_executions, set_function_store_count

class OrchestrationManager:
    def __init__(self, minio_client, bucket_name):
        self.minio_client = minio_client
        self.bucket_name = bucket_name
        self.tasks = []

        # Set the initial function count
        function_count = len(list(self.minio_client.list_objects(bucket_name)))
        set_function_store_count(function_count)

    def add_task(self, function_name, *args, **kwargs):
        task = {"function_name": function_name, "args": args, "kwargs": kwargs}
        self.tasks.append(task)

    def run(self, initial_input):
        input_data = initial_input
        for task in self.tasks:
            start_time = time.time()
            function = fetch_function_from_minio(self.minio_client, self.bucket_name, task["function_name"])
            input_data = function(input_data, *task["args"], **task["kwargs"])
            latency = time.time() - start_time
            increment_request_count(task["function_name"])
            observe_request_latency(task["function_name"], latency)
            increment_function_executions(task["function_name"])
        return input_data
```

### Conclusion

By integrating Prometheus and Grafana, you can effectively monitor your AI orchestration layer, including specific metrics related to your function store. This setup provides detailed insights into your system’s performance and helps in identifying bottlenecks and optimizing workflows. 

For further customization, refer to the official documentation:
- [Prometheus Documentation](https://prometheus.io/docs/introduction/overview/)
- [Grafana Documentation](https://grafana.com/docs/)

---

# Simplify Tooling to Receive Output

To simplify the input and output processes for your AI tooling, you can design a more user-friendly API or CLI that abstracts the complexity of orchestrating functions, running containers, and handling metrics. Here’s a step-by-step guide to achieving this:

### Simplified Input/Output with an API

1. **Create a Flask API**: Use Flask to create a simple API that accepts requests and returns results.

**app.py**:
```python
from flask import Flask, request, jsonify
from minio import Minio
from ai_orchestration import OrchestrationManager, store_function_in_minio

app = Flask(__name__)

# Initialize MinIO client
minio_client = Minio(
    "play.min.io",
    access_key="YOUR_ACCESS_KEY",
    secret_key="YOUR_SECRET_KEY",
    secure=True
)
bucket_name = "function-bucket"
if not minio_client.bucket_exists(bucket_name):
    minio_client.make_bucket(bucket_name)

# Initialize Orchestration Manager
manager = OrchestrationManager(minio_client, bucket_name)

@app.route('/add_task', methods=['POST'])
def add_task():
    data = request.json
    function_name = data.get('function_name')
    args = data.get('args', [])
    kwargs = data.get('kwargs', {})
    manager.add_task(function_name, *args, **kwargs)
    return jsonify({"status": "Task added"}), 200

@app.route('/run', methods=['POST'])
def run():
    initial_input = request.json.get('initial_input', {})
    result = manager.run(initial_input)
    return jsonify(result), 200

@app.route('/upload_function', methods=['POST'])
def upload_function():
    file = request.files['file']
    function_name = file.filename
    store_function_in_minio(minio_client, bucket_name, cloudpickle.loads(file.read()), function_name)
    return jsonify({"status": "Function uploaded"}), 200

@app.route('/metrics')
def metrics():
    return Response(generate_latest(), mimetype=CONTENT_TYPE_LATEST)

if __name__ == "__main__":
    app.run(host='0.0.0.0', port=5000)
```

### Simplified Input/Output with a CLI

1. **Create a CLI Tool**: Use `click` to create a command-line interface for managing tasks and running orchestrations.

**cli.py**:
```python
import click
import cloudpickle
from minio import Minio
from ai_orchestration import OrchestrationManager, store_function_in_minio

minio_client = Minio(
    "play.min.io",
    access_key="YOUR_ACCESS_KEY",
    secret_key="YOUR_SECRET_KEY",
    secure=True
)
bucket_name = "function-bucket"
if not minio_client.bucket_exists(bucket_name):
    minio_client.make_bucket(bucket_name)

manager = OrchestrationManager(minio_client, bucket_name)

@click.group()
def cli():
    pass

@click.command()
@click.argument('function_name')
@click.argument('args', nargs=-1)
@click.option('--kwargs', default={}, type=dict)
def add_task(function_name, args, kwargs):
    manager.add_task(function_name, *args, **kwargs)
    click.echo(f"Task {function_name} added.")

@click.command()
@click.option('--initial_input', default={}, type=dict)
def run(initial_input):
    result = manager.run(initial_input)
    click.echo(f"Result: {result}")

@click.command()
@click.argument('file_path')
def upload_function(file_path):
    with open(file_path, 'rb') as file:
        function = cloudpickle.load(file)
    function_name = file_path.split('/')[-1]
    store_function_in_minio(minio_client, bucket_name, function, function_name)
    click.echo(f"Function {function_name} uploaded.")

cli.add_command(add_task)
cli.add_command(run)
cli.add_command(upload_function)

if __name__ == "__main__":
    cli()
```

### Running the CLI Tool

You can use the CLI to add tasks, run the orchestration, and upload functions:

```bash
python cli.py add_task sample_function.pkl --args 5
python cli.py run --initial_input '{"data": 5}'
python cli.py upload_function /path/to/your_function.pkl
```

### Example Flask API Request

Using `curl` to interact with the Flask API:

```bash
curl -X POST http://localhost:5000/add_task -H "Content-Type: application/json" -d '{"function_name": "sample_function.pkl", "args": [5], "kwargs": {}}'
curl -X POST http://localhost:5000/run -H "Content-Type: application/json" -d '{"initial_input": {"data": 5}}'
curl -F 'file=@/path/to/your_function.pkl' http://localhost:5000/upload_function
```

### Conclusion

By creating a simplified API or CLI, you abstract the complexities of managing tasks, running orchestrations, and handling metrics. This makes it easier for users to interact with your AI tooling without needing to understand the underlying implementation details. This approach improves usability and allows for easy integration into various workflows and systems.

---

# Google's Mesop (Frontend with Python)

**Integrating Your AI Tooling with Google Mesop**

To interface your AI orchestration layer with a Google Mesop chatbot, you can leverage Mesop's capabilities to build a UI and handle user interactions seamlessly. Mesop is a Python-based UI framework developed by Google for quickly building web applications using idiomatic Python code. This framework can help you create interactive interfaces without diving into JavaScript, CSS, or HTML.

### Key Features of Mesop

1. **Component-Based Architecture**: Mesop allows you to build UIs using reusable components, which are essentially Python functions. This makes the UI development process straightforward and modular.
2. **Hot Reload**: It supports hot reloading, which means changes in your code will reflect immediately in the browser without restarting the server.
3. **Integration with Python Ecosystem**: Mesop can integrate well with other Python libraries and frameworks, making it a good choice for extending existing Python projects [oai_citation:1,Quickstart - Mesop](https://google.github.io/mesop/getting_started/quickstart/) [oai_citation:2,GitHub - google/mesop: Build delightful web apps quickly in Python](https://github.com/google/mesop) [oai_citation:3,Mesop](https://google.github.io/mesop/).

### Setting Up Mesop

First, you need to install Mesop. Follow these steps:

1. **Install Mesop**:

   ```bash
   pip install mesop
   ```

2. **Create a Simple Mesop Application**:

   Create a `main.py` file:

   ```python
   import mesop as me
   import mesop.labs as mel

   @me.page(path="/")
   def app():
       me.text("Hello, this is your AI orchestration interface!")
       if me.button("Run Orchestration"):
           result = run_orchestration()
           me.text(f"Result: {result}")

   def run_orchestration():
       # Placeholder for orchestration logic
       return "Orchestration run successfully!"

   if __name__ == "__main__":
       me.run()
   ```

3. **Run the Application**:

   ```bash
   mesop main.py
   ```

   Navigate to `http://localhost:8000` to see your application.

### Integrating AI Tooling with Mesop

You can expand this basic setup to include the functionality of your AI orchestration layer. Here’s how you can integrate the orchestration tasks and function executions:

1. **Expand the Orchestration Logic**:

   Update `run_orchestration` to interact with your AI orchestration layer.

   ```python
   from minio import Minio
   from ai_orchestration import OrchestrationManager, store_function_in_minio
   import cloudpickle

   # Initialize MinIO client
   minio_client = Minio(
       "play.min.io",
       access_key="YOUR_ACCESS_KEY",
       secret_key="YOUR_SECRET_KEY",
       secure=True
   )
   bucket_name = "function-bucket"
   if not minio_client.bucket_exists(bucket_name):
       minio_client.make_bucket(bucket_name)

   manager = OrchestrationManager(minio_client, bucket_name)

   @me.page(path="/")
   def app():
       me.text("Hello, this is your AI orchestration interface!")
       if me.button("Run Orchestration"):
           result = run_orchestration()
           me.text(f"Result: {result}")

   def run_orchestration():
       manager.add_task("sample_function.pkl")
       result = manager.run({"data": 5})
       return result

   if __name__ == "__main__":
       me.run()
   ```

2. **Add Task and Run Functions via the UI**:

   Extend the Mesop UI to allow uploading functions and adding tasks dynamically.

   ```python
   @me.page(path="/upload")
   def upload_function():
       me.text("Upload a new function")
       function_file = me.file_input(label="Choose a function file")
       if me.button("Upload") and function_file:
           function_data = function_file.read()
           function_name = function_file.filename
           store_function_in_minio(minio_client, bucket_name, cloudpickle.loads(function_data), function_name)
           me.text(f"Function {function_name} uploaded successfully.")

   @me.page(path="/add_task")
   def add_task():
       me.text("Add a new task")
       function_name = me.text_input(label="Function name")
       args = me.text_input(label="Arguments (comma-separated)")
       if me.button("Add Task"):
           args_list = args.split(',')
           manager.add_task(function_name, *args_list)
           me.text(f"Task {function_name} added with args {args_list}.")

   @me.page(path="/")
   def main_app():
       me.text("AI Orchestration Dashboard")
       me.link("Upload Function", path="/upload")
       me.link("Add Task", path="/add_task")
       if me.button("Run Orchestration"):
           result = run_orchestration()
           me.text(f"Result: {result}")

   if __name__ == "__main__":
       me.run()
   ```

### Conclusion

By integrating Mesop with your AI orchestration layer, you create a user-friendly interface that allows users to upload functions, add tasks, and run orchestrations directly from a web application. Mesop’s simplicity and Python-first approach make it an excellent choice for extending your existing Python projects with interactive UIs.

For more details on Mesop and its capabilities, you can refer to the [Mesop documentation](https://google.github.io/mesop/) [oai_citation:4,Mesop](https://google.github.io/mesop/) [oai_citation:5,Why Mesop? - Mesop](https://google.github.io/mesop/blog/2024/05/13/why-mesop/).

---

# Syncing MinIO Objectstore & Weaviate Metastore 

To synchronize the MinIO bucket and the Weaviate "Feature" schema class as we built previously with Golang, we need to ensure that the Python implementation correctly handles both the upload to MinIO and the metadata synchronization with Weaviate. Here’s how to extend the existing Python implementation to achieve this:

### Synchronize MinIO and Weaviate in Python

Below is the updated code to include synchronization with the Weaviate "Feature" schema class:

#### Dependencies
Make sure to install the required dependencies:
```bash
pip install minio weaviate-client docker
```

#### Initializer Class with Weaviate Synchronization

```python
# feature_store_initializer.py

import os
import docker
import cloudpickle
from minio import Minio
from minio.error import S3Error
from weaviate import Client as WeaviateClient, AuthClientPassword

class FeatureStoreInitializer:
    def __init__(self, minio_config, weaviate_config, docker_config):
        self.minio_client = Minio(**minio_config)
        self.weaviate_client = WeaviateClient(**weaviate_config)
        self.docker_client = docker.DockerClient(**docker_config)
        self.bucket_name = "function-bucket"
    
    def create_bucket(self):
        try:
            if not self.minio_client.bucket_exists(self.bucket_name):
                self.minio_client.make_bucket(self.bucket_name, location="us-east-1")
                print(f"Bucket {self.bucket_name} created successfully.")
            else:
                print(f"Bucket {self.bucket_name} already exists.")
        except S3Error as exc:
            print(f"Error occurred: {exc}")
    
    def upload_script(self, script_name, script_content):
        try:
            self.minio_client.put_object(self.bucket_name, script_name, script_content, len(script_content))
            print(f"Script {script_name} uploaded successfully.")
        except S3Error as exc:
            print(f"Error occurred: {exc}")

    def fetch_script(self, object_name, file_path):
        try:
            self.minio_client.fget_object(self.bucket_name, object_name, file_path)
            print(f"Script {object_name} fetched and saved to {file_path}")
        except S3Error as exc:
            print(f"Error occurred: {exc}")

    def store_function_in_minio(self, function, function_name):
        serialized_function = cloudpickle.dumps(function)
        self.upload_script(function_name, serialized_function)
        # Synchronize with Weaviate
        self.sync_with_weaviate(function_name)

    def sync_with_weaviate(self, function_name):
        properties = {
            "name": function_name,
            "description": "A serialized function",
            "libraries": "cloudpickle",
            "input_source": "N/A",
            "output_source": "N/A",
            "category": "function",
            "docker_image": "N/A",
            "object_path": f"https://{self.minio_client._endpoint}/{self.bucket_name}/{function_name}",
            "version": "1.0"
        }
        self.weaviate_client.data_object.create(properties, "Feature")
        print(f"Synchronized {function_name} with Weaviate")

    def build_docker_image(self, script_path, dockerfile_path='Dockerfile', image_name='script_executor'):
        if not os.path.exists(dockerfile_path):
            dockerfile_content = f"""
FROM python:3.9-slim
WORKDIR /usr/src/app
COPY {os.path.basename(script_path)} /usr/src/app/{os.path.basename(script_path)}
RUN pip install requests beautifulsoup4
RUN chmod +x /usr/src/app/{os.path.basename(script_path)}
CMD ["python", "/usr/src/app/{os.path.basename(script_path)}"]
"""
            with open(dockerfile_path, 'w') as dockerfile:
                dockerfile.write(dockerfile_content.strip())
            print(f"Generated default Dockerfile at {dockerfile_path}")
        else:
            print(f"Using existing Dockerfile at {dockerfile_path}")

        image, logs = self.docker_client.images.build(path='.', tag=image_name, dockerfile=dockerfile_path, nocache=True)
        for log in logs:
            print(log)
        return image

    def run_docker_container(self, image_name, environment):
        container_name = f'script_container_{uuid.uuid4()}'
        container = self.docker_client.containers.run(image_name, name=container_name, detach=True, environment=environment)
        
        logs = container.logs(stream=True)
        
        for log in logs:
            print(log.decode('utf-8').strip())
        
        container.wait()
        container.remove()
        print(f"Container {container_name} finished execution.")

    def initialize(self, function_name, environment):
        self.create_bucket()
        self.store_function_in_minio(self.sample_function, function_name)
        script_path = os.path.join(os.getcwd(), function_name)
        self.fetch_script(function_name, script_path)
        self.build_docker_image(script_path)
        self.run_docker_container('script_executor', environment)
    
    def sample_function(self, data):
        return data * 2

def scrape_url(url):
    minio_config = {
        "endpoint": "192.168.0.25:9000",
        "access_key": "cda_cdaprod",
        "secret_key": "cda_cdaprod",
        "secure": False
    }

    weaviate_config = {
        "url": "http://localhost:8080",
        "auth_client_secret": AuthClientPassword(username="user", password="password")
    }

    docker_config = {
        "base_url": 'tcp://rpi4-2.local:2375'
    }

    initializer = FeatureStoreInitializer(minio_config, weaviate_config, docker_config)
    initializer.initialize("sample_function.pkl", {"URL": url})

# Example usage within the notebook
scrape_url("https://github.com/cdaprod/cdaprod")  # Replace this with the actual URL you want to scrape
```

### Explanation:

1. **FeatureStoreInitializer Class**:
   - **Initialization**: Sets up MinIO, Weaviate, and Docker clients.
   - **Bucket Management**: Creates and checks for the existence of a MinIO bucket.
   - **Script Management**: Uploads and fetches scripts to/from MinIO.
   - **Function Serialization**: Serializes functions using `cloudpickle` and stores them in MinIO.
   - **Weaviate Synchronization**: Synchronizes the uploaded function with the Weaviate "Feature" schema class by creating a corresponding object in Weaviate with the required properties.
   - **Docker Image Management**: Builds Docker images from provided scripts.
   - **Container Execution**: Runs Docker containers and logs their outputs.

2. **scrape_url Function**:
   - Configures MinIO, Weaviate, and Docker settings.
   - Initializes the FeatureStoreInitializer and runs a sample function.

### Additional Considerations

- **Weaviate Schema and Data Management**: Define the schema and manage metadata in Weaviate, similar to the previous example but with integration into this Python class.
- **Testing and Validation**: Ensure to test the integration and handle any potential errors gracefully.

This setup simplifies the interaction between MinIO and Weaviate while maintaining essential functionalities and compatibility with your current system.