In [1]:
import sys
import json
import time
import threading
from typing import Dict, List, Callable

# Add the project source directory to the path if needed
import sys
sys.path.append('src')

# Import controllers from the project
from obs_switch.obs_controller import OBSController
from obs_switch.switch_controller import SwitchController
from obs_switch.switch_handler import translate_code_to_macros

# Initialize OBS controller
obs = OBSController()

# Initialize Switch controller
switch = SwitchController()

obs_host = "localhost"
obs_port = 4455
obs_password = ""  # Set your password if you have one configured

# Register a callback for connection status
def on_obs_connection_status(data):
    if data['connected']:
        print("Connected to OBS WebSocket server")
    else:
        print("Disconnected from OBS WebSocket server")
        if 'error' in data:
            print(f"Error: {data['error']}")

obs.register_event_callback("connection_status", on_obs_connection_status)

# Connect to OBS
connected = obs.connect(obs_host, obs_port, obs_password)
print(f"Connection attempt result: {'Success' if connected else 'Failed'}")

# Connect to NXBT webapp
switch_url = "http://localhost:8000"

# Register a callback for connection status
def on_switch_connection_status(data):
    if data.get('connected', False):
        print("Connected to the NXBT webapp.")
    else:
        print("Disconnected from the NXBT webapp.")

# Register a callback for controller creation
def on_create_controller(data):
    index = data.get('index')
    print(f"Controller created with index: {index}")

switch.register_event_callback("connection_status", on_switch_connection_status)
switch.register_event_callback("controller_created", on_create_controller)

# Connect to Switch
connected = switch.connect(switch_url)
print(f"Connection attempt result: {'Success' if connected else 'Failed'}")

Connection attempt result: Success
WebSocket connection opened
Successfully identified with OBS WebSocket
Connected to OBS WebSocket server
Connected to the NXBT webapp.
Connected to the NXBT webapp.
Connection attempt result: Success
Controller created with index: 3
Controller created with index: 3


packet queue is empty, aborting


Disconnected from the NXBT webapp.
Disconnected from the NXBT webapp.
WebSocket closed: None - None
Disconnected from OBS WebSocket server


In [31]:
def recording_state(st: str):
    # Get current recording status
    if st == 'get_status':
        print("Getting current recording status...")
        obs.get_recording_status()
        time.sleep(1)
    elif st == 'start_recording':
        # Start recording
        print("Starting recording...")
        obs.start_recording()
        time.sleep(3)
    elif st == 'stop_recording':
        # Stop recording
        print("Stopping recording...")
        obs.stop_recording()

def press_button():
    # Press the A button
    print("Pressing A button...")
    switch.press_and_release('A', delay=0.2)
    time.sleep(1)
    
    # Press multiple buttons
    print("Pressing X and Y buttons...")
    switch.press_and_release(['X', 'Y'], delay=0.3)
    time.sleep(1)

    # Move left stick right
    print("Moving left stick right...")
    switch.tilt_stick('L_STICK', 100, 0, delay=0.5)
    time.sleep(1)
    
    # Move left stick up
    print("Moving left stick up...")
    switch.tilt_stick('L_STICK', 0, 100, delay=0.5)
    time.sleep(1)

# Function to enter a replay code
def enter_replay_code(code: str):
    try:
        # Translate the code to controller macros
        macro_sequences = translate_code_to_macros(code)
        
        # Execute each macro sequence
        for sequence in macro_sequences:
            if sequence:
                # Convert macro sequence to button presses
                for button in sequence:
                    if button:
                        # Map the macro to the actual button name
                        switch.press_and_release([button], 0.1)
                        time.sleep(0.2)  # Add delay between button presses
                
                time.sleep(0.5)  # Add delay between sequences
    except Exception as e:
        print(f"Error entering replay code: {e}")

def macro_to_controller(macro_input: str):
        macro_dict = {
            'L': 'A',
            'K': 'B',
            'I': 'X',
            'J': 'Y',
            '8': 'R',
            '9': 'ZR',
            '1': 'ZL',
            '2': 'L',
            '7': 'MINUS',
            '6': 'PLUS',
            'T': 'L_STICK_PRESS',
            'Y': 'R_STICK_PRESS',
            '[': 'HOME',
            ']': 'CAPTURE',
            'B': 'DPAD_DOWN',
            'G': 'DPAD_UP',
            'V': 'DPAD_LEFT',
            'N': 'DPAD_RIGHT',
            'W': 'LEFT_STICK_UP',
            'S': 'LEFT_STICK_DOWN',
            'A': 'LEFT_STICK_LEFT',
            'D': 'LEFT_STICK_RIGHT',
            'ArrowDown': 'RIGHT_STICK_UP',
            'ArrowUp': 'RIGHT_STICK_DOWN',
            'ArrowLeft': 'RIGHT_STICK_LEFT',
            'ArrowRight': 'RIGHT_STICK_RIGHT'
        }
        return macro_dict.get(macro_input)

In [23]:
controls = [
    "DPAD_UP",      # 0
    "DPAD_LEFT",    # 1
    "DPAD_RIGHT",   # 2
    "DPAD_DOWN",    # 3
    "L",            # 4
    "ZL",           # 5
    "R",            # 6
    "ZR",           # 7
    "PLUS",         # 8
    "MINUS",        # 9
    "HOME",         # 10
    "CAPTURE",      # 11
    "Y",            # 12
    "X",            # 13
    "B",            # 14
    "A"             # 15
]

# switch.is_connected()
switch.press_and_release(controls[1], delay=0.2)
# switch.press_and_release(['ZL','ZR'], delay=1)
# switch.press_and_release([''], delay=2)
# switch.tilt_stick('L_STICK', 0, 100, delay=0.5)


True

In [36]:
# schema = [ [2], [2], [2], [15] ] # From matchine to replays

for action in schema:
    switch.press_and_release([controls[i] for i in action], delay=0.2)

In [89]:
switch.press_and_release(controls[14], delay=0.2)

True

In [90]:
# Enter a replay code
replay_codes = [
    # "1234-5678-9012"
    "R2TA-RJN3-BNYQ-LCTS"
]
for replay_code in replay_codes:
    print(f"Entering replay code: {replay_code}")
    # Getting the whole sequence for putting a replay code
    macro_sequences = translate_code_to_macros(replay_code)
    # Is a List of Lists
    for sequence in macro_sequences:
        # Every item is one replay char 
        for key in sequence:
            # Translating the Key (for NXBT keyboard) to Switch buttons
            switch.press_and_release(macro_to_controller(key), delay=0.1)
            # This is required for avoiding mixing signals
            time.sleep(0.5)
    


Entering replay code: R2TA-RJN3-BNYQ-LCTS


packet queue is empty, aborting


Disconnected from the NXBT webapp.
Disconnected from the NXBT webapp.
Connected to the NXBT webapp.
Connected to the NXBT webapp.
Controller created with index: 6
Controller created with index: 6


In [91]:
# Disconnect block
# Disconnect from OBS
print("Disconnecting from OBS...")
obs.disconnect()

# Disconnect from Switch
print("Disconnecting from Switch...")
switch.disconnect()

print("Cleanup complete!")

Disconnecting from OBS...
Disconnected from OBS WebSocket server
Disconnecting from Switch...
Disconnected from the NXBT webapp.
Disconnected from the NXBT webapp.
Disconnected from the NXBT webapp.
Cleanup complete!
