<a href="https://colab.research.google.com/github/FuzzysTodd/Foundry-Fund-Me-2024/blob/main/Real_Time_Consensus_Engine_Mockup.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
# @title Configure Gemini API key

import google.generativeai as genai
from google.colab import userdata

GOOGLE_API_KEY_3 = gen-lang-client-0044081240 # @param {"type":"integer","placeholder":"gen-lang-client-0044081240"}

try:
  GOOGLE_API_KEY=userdata.get(gemini_api_secret_name)
  genai.configure(api_key=GOOGLE_API_KEY)
except userdata.SecretNotFoundError as e:
   print(f'Secret not found\n\nThis expects you to create a secret named {gemini_api_secret_name} in Colab\n\nVisit https://aistudio.google.com/app/apikey to create an API key\n\nStore that in the secrets section on the left side of the notebook (key icon)\n\nName the secret {gemini_api_secret_name}')
   raise e
except userdata.NotebookAccessError as e:
  print(f'You need to grant this notebook access to the {gemini_api_secret_name} secret in order for the notebook to access Gemini on your behalf.')
  raise e
except Exception as e:
  print(f"There was an unknown error. Ensure you have a secret {gemini_api_secret_name} stored in Colab and it's a valid key from https://aistudio.google.com/app/apikey")
  raise e

SyntaxError: leading zeros in decimal integer literals are not permitted; use an 0o prefix for octal integers (ipython-input-835573119.py, line 6)

In [None]:
from google.colab import userdata
userdata.get('secretName')

In [None]:
import time
import random
from datetime import datetime

# --- Configuration ---
# The core topic/scope the agents are currently analyzing and acting upon.
RTCE_TOPIC = "Micro-trading strategy for ETH/USD volatility"
# The decision threshold: 2 out of 3 votes required for consensus.
CONSENSUS_THRESHOLD = 2
# Time constraints for the one-minute loop
LOOP_SECONDS = 60
# Safety Flag: Set to False to stop the entire loop immediately
KILL_SWITCH = True

# --- Agent Definitions ---

class AnalyzerBot:
    """Simulates an independent AI agent that analyzes data and provides a conclusion."""
    def __init__(self, name, bias):
        self.name = name
        self.bias = bias # e.g., 'Bullish', 'Neutral', 'Bearish'

    def fetch_data(self):
        """Mock data fetching: Simulates querying a real-time API."""
        return random.randint(100, 999)

    def analyze_and_conclude(self, data_point):
        """Uses a simulated LLM/logic to generate a conclusion and an action."""
        # The logic here simulates the LLM reasoning based on its 'bias'
        if self.bias == 'Bullish':
            conclusion = f"Data Point {data_point} suggests an upward trend. Conclusion: Strong BUY."
            action = "BUY"
        elif self.bias == 'Bearish':
            conclusion = f"Data Point {data_point} shows a correction is due. Conclusion: Moderate SELL."
            action = "SELL"
        else: # Neutral
            conclusion = f"Data Point {data_point} is within the noise. Conclusion: HOLD."
            action = "HOLD"

        return {"agent": self.name, "conclusion": conclusion, "action": action}

class ConsensusManager:
    """Gathers agent results, checks for consensus, and manages permissions."""

    def __init__(self, agents):
        self.agents = agents

    def run_minute_cycle(self):
        print(f"\n--- {datetime.now().strftime('%H:%M:%S')} ---")
        print(f"[RTCE] Starting analysis for topic: {RTCE_TOPIC}")

        # 1. Parallel Analysis
        results = []
        data_point = self.agents[0].fetch_data() # Mock: all agents use the same fetched data point

        for agent in self.agents:
            # Simulate the parallel, fast execution of the analysis
            result = agent.analyze_and_conclude(data_point)
            print(f"  > {result['agent']} ({agent.bias}): {result['action']}")
            results.append(result)

        # 2. Consensus Vote
        vote_tally = {}
        for result in results:
            action = result['action']
            vote_tally[action] = vote_tally.get(action, 0) + 1

        # Determine the final consensus action
        final_action = "NO ACTION (DIVERGENCE)"
        max_votes = 0

        for action, count in vote_tally.items():
            if count >= CONSENSUS_THRESHOLD:
                final_action = action
                max_votes = count
                break # Consensus reached

        # 3. Final Conclusion and Permission Check
        if max_votes >= CONSENSUS_THRESHOLD:
            print(f"\n[CONSENSUS REACHED] Final Action: {final_action} ({max_votes}/{len(self.agents)} votes)")

            # --- Permission & Action Guardrail Check ---
            if final_action in ["BUY", "SELL"]:
                print("[GOVERNANCE] Action passes Guardrail Check (Micro-Trade size assumed).")
                self.execute_real_world_action(final_action)
            else:
                print("[GOVERNANCE] Action is HOLD/NO ACTION. Permission not required.")
        else:
            print("[DIVERGENCE] No consensus reached. Action is aborted for this minute.")

    def execute_real_world_action(self, action):
        """Simulates sending an API command with granted permissions."""
        print(f"\n*** ACTION EXECUTED: Sent API command to Real-World System: {action} ***")
        # In a real system, this would be an authenticated API POST request.

# --- Main Execution Loop ---

def start_rtce_engine():
    global KILL_SWITCH

    # Initialize the three independent analyzer bots
    bots = [
        AnalyzerBot("Agent-Alpha", "Bullish"),
        AnalyzerBot("Agent-Beta", "Bearish"),
        AnalyzerBot("Agent-Gamma", "Neutral")
    ]

    manager = ConsensusManager(bots)

    print("--- REAL-TIME CONSENSUS ENGINE (RTCE) STARTING ---")

    cycle_count = 1
    while KILL_SWITCH:
        start_time = time.time()

        manager.run_minute_cycle()

        # Calculate time taken and wait for the remainder of the minute
        elapsed_time = time.time() - start_time
        wait_time = max(0, LOOP_SECONDS - elapsed_time)

        print(f"\n[LOOP] Cycle {cycle_count} complete in {elapsed_time:.2f}s. Waiting {wait_time:.2f}s...")
        time.sleep(wait_time)
        cycle_count += 1

        # Self-governance stop after 5 cycles for simulation purposes
        if cycle_count > 5:
            KILL_SWITCH = False
            print("\n[GOVERNANCE] Simulation complete. RTCE shutting down via KILL_SWITCH.")


if __name__ == "__main__":
    start_rtce_engine()

# New Section

In [None]:
from google.colab import auth
PROJECT_ID = "12333121" # @param {type: "string"}
auth.authenticate_user(project_id=PROJECT_ID)

In [20]:
from IPython.display import display, Javascript
from google.colab.output import eval_js
from base64 import b64decode

def take_photo(filename='photo.jpg', quality=0.8):
  js = Javascript('''
    async function takePhoto(quality) {
      const div = document.createElement('div');
      const capture = document.createElement('button');
      capture.textContent = 'Capture';
      div.appendChild(capture);

      const video = document.createElement('video');
      video.style.display = 'block';
      const stream = await navigator.mediaDevices.getUserMedia({video: true});

      document.body.appendChild(div);
      div.appendChild(video);
      video.srcObject = stream;
      await video.play();

      // Resize the output to fit the video element.
      google.colab.output.setIframeHeight(document.documentElement.scrollHeight, true);

      // Wait for Capture to be clicked.
      await new Promise((resolve) => capture.onclick = resolve);

      const canvas = document.createElement('canvas');
      canvas.width = video.videoWidth;
      canvas.height = video.videoHeight;
      canvas.getContext('2d').drawImage(video, 0, 0);
      stream.getVideoTracks()[0].stop();
      div.remove();
      return canvas.toDataURL('image/jpeg', quality);
    }
    ''')
  display(js)
  data = eval_js('takePhoto({})'.format(quality))
  binary = b64decode(data.split(',')[1])
  with open(filename, 'wb') as f:
    f.write(binary)
  return filename

In [None]:
%%javascript
const listenerChannel = new BroadcastChannel('channel');
listenerChannel.onmessage = (msg) => {
  const div = document.createElement('div');
  div.textContent = msg.data;
  document.body.appendChild(div);
};

In [18]:
import IPython
js_code = '''
document.querySelector("#output-area").appendChild(document.createTextNode("hello world!"));
'''
display(IPython.display.Javascript(js_code))

<IPython.core.display.Javascript object>

In [19]:
# Import PyDrive and associated libraries.
# This only needs to be done once per notebook.
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client.
# This only needs to be done once per notebook.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Download a file based on its file ID.
#
# A file ID looks like: laggVyWshwcyP6kEI-y_W3P8D26sz
file_id = 'REPLACE_WITH_YOUR_FILE_ID'
downloaded = drive.CreateFile({'id': file_id})
print('Downloaded content "{}"'.format(downloaded.GetContentString()))

ApiRequestError: <HttpError 404 when requesting https://www.googleapis.com/drive/v2/files/REPLACE_WITH_YOUR_FILE_ID?supportsAllDrives=true&alt=json returned "File not found: REPLACE_WITH_YOUR_FILE_ID". Details: "[{'message': 'File not found: REPLACE_WITH_YOUR_FILE_ID', 'domain': 'global', 'reason': 'notFound', 'location': 'file', 'locationType': 'other'}]">

In [None]:
# Import PyDrive and associated libraries.
# This only needs to be done once per notebook.
from pydrive2.auth import GoogleAuth
from pydrive2.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client.
# This only needs to be done once per notebook.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# Download a file based on its file ID.
#
# A file ID looks like: laggVyWshwcyP6kEI-y_W3P8D26sz
file_id = 'REPLACE_WITH_YOUR_FILE_ID'
downloaded = drive.CreateFile({'id': file_id})
print('Downloaded content "{}"'.format(downloaded.GetContentString()))

In [None]:
import IPython
from google.colab import output

display(IPython.display.Javascript('''
  window.someValue = new Promise(resolve => {
    setTimeout(() => {
      resolve("hello world!");
    }, 100);
  });
'''))


value = output.eval_js('someValue');
value

In [None]:
# @title Connect to the API and send an example message

text = 'What is the velocity of an falling super nova' # @param {type: "string"}

model = genai.GenerativeModel('gemini-2.0-flash')
chat = model.start_chat(history=[])

response = chat.send_message(text)
response.text

This second cell will be in a separate sandboxed iframe.


In [None]:
%%javascript
const senderChannel = new BroadcastChannel('channel');
senderChannel.postMessage('Hello world!');

In [None]:
from IPython.display import Image
try:
  filename = take_photo()
  print('Saved to {}'.format(filename))

  # Show the image which was just taken.
  display(Image(filename))
except Exception as err:
  # Errors will be thrown if the user does not have a webcam or if they do not
  # grant the page permission to access it.
  print(str(err))

In [None]:
# @title Example form fields
# @markdown Forms support many types of fields.

no_type_checking = "son"  # @param
string_type = 'example'  # @param {type: "string"}
slider_value = 198  # @param {type: "slider", min: 100, max: 200}
number = 102  # @param {type: "number"}
date = '2010-11-05'  # @param {type: "date"}
pick_me = "monday"  # @param ['monday', 'tuesday', 'wednesday', 'thursday']
select_or_input = "oranges" # @param ["apples", "bananas", "oranges"] {allow-input: true}
# @markdown ---

# New Section