In [None]:
#------ ASLTransalator 1.0 ---------

In [2]:
# Libraries
%pip install google-genai watchdog pandas pyserial

Defaulting to user installation because normal site-packages is not writeable
Collecting pyserial
  Using cached pyserial-3.5-py2.py3-none-any.whl.metadata (1.6 kB)
Using cached pyserial-3.5-py2.py3-none-any.whl (90 kB)
Installing collected packages: pyserial
Successfully installed pyserial-3.5
Note: you may need to restart the kernel to use updated packages.




In [2]:
# Imports
import os
import time
import pandas as pd
import json
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from google import genai
from google.genai import types
from datetime import datetime, timedelta

In [None]:
# Gemini API Key
GEMINI_API_KEY = ""
client = genai.Client(api_key=GEMINI_API_KEY)

In [4]:
# Create Test File for input
input_filename = "input.txt"

with open(input_filename, "w", encoding="utf-8") as f:
    f.write("The duck quacked.\n")
    f.write("We are using the Gemini API for this project.\n")
    f.write("This notebook provides a great environment for prototyping.\n")

print(f"Created file: {input_filename}")

Created file: input.txt


In [5]:
# Configs
model_name = "gemini-2.5-flash"  # Fast and capable model for translation
output_filename = "gemini_output_log.txt"

In [6]:
## Read input 
try:
    with open(input_filename, "r", encoding="utf-8") as f:
        file_content = f.read()
except FileNotFoundError:
    print(f"Error: Input file '{input_filename}' not found.")
    exit()

# 2. Craft the translation prompt
# Using a clear system instruction and user prompt ensures high quality and clean output.
system_instruction = f"You are an expert ASL translator who translates textual and spoken English input into ASL gloss. Do not include any extra commentary, headers, or surrounding text; only provide the translated text."

prompt = f"Translate the following text into ASL gloss:\n\n---\n{file_content}\n---"

print(f"Sending content of '{input_filename}' to Gemini...")

# 3. Call the Gemini API
response = client.models.generate_content(
    model=model_name,
    contents=prompt,
    config={
        "system_instruction": system_instruction
    }
)

# 4. Extract the translated text
translated_text = response.text.strip()

print("Translation received. Writing to file...")

# 5. Write the translated text to the output file
with open(output_filename, "w", encoding="utf-8") as f:
    f.write(translated_text)

print(f"Translation:\n{translated_text}")

Sending content of 'input.txt' to Gemini...
Translation received. Writing to file...
Translation:
DUCK QUACK. THIS PROJECT, WE USE GEMINI API. THIS NOTEBOOK, GOOD ENVIRONMENT PROTOTYPE.


In [None]:
WATCH_DIRECTORY = r"C:\Users\evanv\OneDrive\Computer_Science\JuniorYear\ASL-Robot\InputFiles"
OUTPUT_DIRECTORY = r"C:\Users\evanv\OneDrive\Computer_Science\JuniorYear\ASL-Robot\OutputFiles"


TARGET_FILE_NAME = "input.txt" 
OUTPUT_DIRECTORY =  r"C:\Users\evanv\OneDrive\Computer_Science\JuniorYear\ASL-Robot\OutputFiles"


def process_text_change(file_path):
    """
    Reads the text content of the modified file and processes it with Gemini.
    """
    print(f"-> Change detected in: {TARGET_FILE_NAME}. Processing...")
    
    try:
        # 1. Read the LATEST content of the file
        with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read().strip() # .strip() removes leading/trailing whitespace

        if not content:
            print("!! File is empty after modification. Skipping AI call.")
            return

        system_instruction = f"You are an expert ASL translator who translates textual and spoken English input into ASL gloss. Do not include any extra commentary, headers, or surrounding text; only provide the translated text."

        prompt = f"Translate the following text into ASL gloss:\n\n---\n{content}\n---"

        # 3. Call the Gemini API
        response = client.models.generate_content(
            model=model_name,
            contents=prompt,
            config={
                "system_instruction": system_instruction
            }
        )
        # 4. Save the Output to a Log
        output_filename = os.path.join(OUTPUT_DIRECTORY, "gemini_output_log.txt")
        
        # Open in append mode ('a') to keep a history of all changes
        with open(output_filename, 'a', encoding='utf-8') as outfile:
            outfile.write(f"\n{response.text}\n")
            
        print(f"<- Successfully processed and logged output to: {output_filename}")
        
    except Exception as e:
        print(f"!! An error occurred while processing {file_path}: {e}")

# ----------------------------------------------------------------------

class TextChangeHandler(FileSystemEventHandler):
    """
    Custom handler to process the modified file.
    """
    def on_modified(self, event):
        # We only care about file modification events (not directory changes)
        if not event.is_directory:
            file_path = event.src_path
            
            # CRITICAL: Only process the TARGET file
            if os.path.basename(file_path).lower() == TARGET_FILE_NAME.lower():
                # Introduce a small delay (0.5s) to ensure the file save operation is complete
                time.sleep(0.5) 
                process_text_change(file_path)

# --- Main Execution Loop ---
if __name__ == "__main__":
    
    # ðŸš¨ Initial check for the input file
    input_file_path = os.path.join(WATCH_DIRECTORY, TARGET_FILE_NAME)
    if not os.path.exists(input_file_path):
        print(f"!! WARNING: Input file '{TARGET_FILE_NAME}' not found in '{WATCH_DIRECTORY}'.")
        print("!! Please create the file to start monitoring.")

    event_handler = TextChangeHandler()
    observer = Observer()
    
    # Schedule the handler to monitor the directory non-recursively
    observer.schedule(event_handler, WATCH_DIRECTORY, recursive=False)
    
    # Start the observer thread
    observer.start()
    
    print(f"\n*** Watchdog is RUNNING ***")
    print(f"   Monitoring directory: {WATCH_DIRECTORY}")
    print(f"   Target file:          {TARGET_FILE_NAME}")
    
    try:
        # Keep the main thread alive until interrupted
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        observer.stop()
        print("\nWatchdog stopped by user (Ctrl+C).")
        
    observer.join()


*** Watchdog is RUNNING ***
   Monitoring directory: C:\Users\Morgan\Downloads\fall_25\Capstone1\ASL-Robot\InputFiles
   Target file:          input.txt
-> Change detected in: input.txt. Processing...
!! An error occurred while processing C:\Users\Morgan\Downloads\fall_25\Capstone1\ASL-Robot\InputFiles\input.txt: name 'MODEL_NAME' is not defined

Watchdog stopped by user (Ctrl+C).


In [None]:
WATCH_DIRECTORY = r"C:\Users\Morgan\Downloads\fall_25\Capstone1\ASL-Robot\InputFiles"
OUTPUT_DIRECTORY = r"C:\Users\Morgan\Downloads\fall_25\Capstone1\ASL-Robot\OutputFiles"
TARGET_FILE_NAME = "input.txt" 

def store_nonexist_tokens(response):
    """
    Stores tokens not in the database
    """
    tokens = response.text.split()
    
    for token in tokens:
        #test in DB
        inside = False #replace False with db access
        if inside == False:
            # write to file
            return
        
        
    


def process_text_change(file_path):

    """
    Reads the text content of the modified file and processes it with Gemini.

    """

    print(f"-> Change detected in: {TARGET_FILE_NAME}. Processing...")

   

    try:

        # 1. Read the LATEST content of the file

        with open(file_path, 'r', encoding='utf-8') as f:

            content = f.read().strip() # .strip() removes leading/trailing whitespace



        if not content:

            print("!! File is empty after modification. Skipping AI call.")

            return



        system_instruction = f"You are an expert ASL translator who translates textual and spoken English input into ASL gloss. Do not include any extra commentary, headers, or surrounding text; only provide the translated text."



        prompt = f"Translate the following text into ASL gloss:\n\n---\n{content}\n---"



        # 3. Call the Gemini API

        response = client.models.generate_content(

            model=model_name,

            contents=prompt,

            config={

                "system_instruction": system_instruction

            }

        )

        # 4. Save the Output to a Log

        output_filename = os.path.join(OUTPUT_DIRECTORY, "gemini_output_log.txt")

       

        # Open in append mode ('a') to keep a history of all changes

        with open(output_filename, 'a', encoding='utf-8') as outfile:

            outfile.write(f"\n{response.text}\n")

           

        print(f"<- Successfully processed and logged output to: {output_filename}")

       

    except Exception as e:

        print(f"!! An error occurred while processing {file_path}: {e}")



# ----------------------------------------------------------------------



class TextChangeHandler(FileSystemEventHandler):

    """
    Custom handler to process the modified file.
    """
    def __init__(self):
        self.last_modified = datetime.now()

    def on_modified(self, event):
        # We only care about file modification events (not directory changes)

        if not event.is_directory:

            file_path = event.src_path

            # CRITICAL: Only process the TARGET file

            if os.path.basename(file_path).lower() == TARGET_FILE_NAME.lower():

                # Introduce a small delay (0.5s) to ensure the file save operation is complete
                time.sleep(0.5)
                
                if datetime.now() - self.last_modified < timedelta(seconds =1):
                    process_text_change(file_path)
                    return
                else: 
                    self.last_modified = datetime.now()



# --- Main Execution Loop ---

if __name__ == "__main__":

   

    # ðŸš¨ Initial check for the input file

    input_file_path = os.path.join(WATCH_DIRECTORY, TARGET_FILE_NAME)

    if not os.path.exists(input_file_path):

        print(f"!! WARNING: Input file '{TARGET_FILE_NAME}' not found in '{WATCH_DIRECTORY}'.")

        print("!! Please create the file to start monitoring.")



    event_handler = TextChangeHandler()

    observer = Observer()

   

    # Schedule the handler to monitor the directory non-recursively

    observer.schedule(event_handler, WATCH_DIRECTORY, recursive=False)

   

    # Start the observer thread

    observer.start()

   

    print(f"\n*** Watchdog is RUNNING ***")

    print(f"   Monitoring directory: {WATCH_DIRECTORY}")

    print(f"   Target file:          {TARGET_FILE_NAME}")

   

    try:

        # Keep the main thread alive until interrupted

        while True:

            time.sleep(10)

    except KeyboardInterrupt:

        observer.stop()

        print("\nWatchdog stopped by user (Ctrl+C).")

       

    observer.join()


*** Watchdog is RUNNING ***
   Monitoring directory: C:\Users\Morgan\Downloads\fall_25\Capstone1\ASL-Robot\InputFiles
   Target file:          input.txt
-> Change detected in: input.txt. Processing...
<- Successfully processed and logged output to: C:\Users\Morgan\Downloads\fall_25\Capstone1\ASL-Robot\OutputFiles\gemini_output_log.txt
-> Change detected in: input.txt. Processing...
<- Successfully processed and logged output to: C:\Users\Morgan\Downloads\fall_25\Capstone1\ASL-Robot\OutputFiles\gemini_output_log.txt

Watchdog stopped by user (Ctrl+C).


In [None]:
# Python script to read a text file of ASL commands, stream them to the Arduino,
# and then generate/run a C++ program based on the same sequence.

import serial
import time
import subprocess
import os
import sys

# --- Configuration (MUST BE CORRECT) ---
SERIAL_PORT = '/dev/ttyACM0' 
BAUD_RATE = 9600 
COMMAND_FILE = 'machineinstructions.txt' 
SIGN_DELAY_SECONDS = 1.0 

# --- C++ Configuration ---
CPP_SOURCE_FILE = 'stream_sequence.cpp'
# Output executable name. On Windows, this will become 'stream_sequence.exe'
CPP_EXECUTABLE_NAME = 'stream_sequence_exec' 

def generate_cpp_source(sequence):
    """
    Creates the C++ source file with the ASL sequence embedded.
    """
    print(f"\n--- 1. Generating C++ Source File ({CPP_SOURCE_FILE}) ---")
    
    # Format the sequence as a C++ string literal
    sequence_str = ' '.join(sequence)
    
    cpp_code = f"""
#include <iostream>
#include <string>

// This program was generated by Python and runs the final action.
int main() {{
    std::string final_asl_sequence = "{sequence_str}";
    
    std::cout << "\\n#####################################################" << std::endl;
    std::cout << "C++ Program Executed Successfully!" << std::endl;
    std::cout << "The ASL sentence streamed was: \\n  -> " << final_asl_sequence << std::endl;
    std::cout << "#####################################################\\n" << std::endl;
    
    // In a real application, you might use this C++ program to trigger a final action
    // on a different peripheral, send a network signal, or write a log file.
    
    return 0;
}}
"""
    try:
        with open(CPP_SOURCE_FILE, 'w') as f:
            f.write(cpp_code)
        print(f"Source file created: {CPP_SOURCE_FILE}")
        return True
    except Exception as e:
        print(f"Error writing C++ source file: {e}")
        return False

def compile_cpp_program():
    """
    Compiles the C++ source file using g++.
    """
    print(f"\n--- 2. Compiling C++ Program ---")
    
    # Determine the executable name based on OS (Windows uses .exe)
    executable = CPP_EXECUTABLE_NAME
    if sys.platform == "win32":
        executable += ".exe"

    try:
        # Command: g++ <source_file> -o <output_executable>
        result = subprocess.run(
            ['g++', CPP_SOURCE_FILE, '-o', executable],
            check=True, 
            capture_output=True,
            text=True
        )
        print(f"Compilation successful! Output: {executable}")
        # Clean up any compilation warnings
        if result.stderr:
            print("Compiler Warnings/Notes:")
            print(result.stderr)
        return executable
        
    except FileNotFoundError:
        print("\nFATAL ERROR: 'g++' command not found.")
        print("Please ensure the g++ compiler (part of GCC) is installed and accessible in your system's PATH.")
        return None
    except subprocess.CalledProcessError as e:
        print(f"COMPILATION FAILED.")
        print(f"Stdout: {e.stdout}")
        print(f"Stderr: {e.stderr}")
        return None

def run_cpp_program(executable_path):
    """
    Executes the compiled C++ program.
    """
    print(f"\n--- 3. Running Compiled C++ Program ({executable_path}) ---")
    try:
        # Execute the compiled program
        subprocess.run([f'./{executable_path}'], check=True, text=True)
        print("Execution finished.")
    except Exception as e:
        print(f"ERROR executing C++ program: {e}")


def stream_commands_to_arduino(port, baud, filename):
    """Initializes the serial connection and streams commands from the file."""
    
    # 1. Read Commands from File
    try:
        with open(filename, 'r') as f:
            raw_commands = f.read().upper().strip()
    except FileNotFoundError:
        print(f"ERROR: Command file '{filename}' not found.")
        return [], False
    except Exception as e:
        print(f"ERROR reading file: {e}")
        return [], False

    valid_commands = [c for c in raw_commands if c in ('A', 'B', 'C', 'O')]
    
    if not valid_commands:
        print(f"No valid ASL commands ('A', 'B', 'C', 'O') found in {filename}. Skipping Arduino stream.")
        return [], True # Return sequence, success flag

    print(f"Loaded sequence of {len(valid_commands)} signs: {' '.join(valid_commands)}")
    
    # 2. Establish Serial Connection
    ser = None
    try:
        ser = serial.Serial(port, baud, timeout=1)
        time.sleep(2) 
        print(f"Successfully connected to Arduino on {port}.")
        
    except serial.SerialException as e:
        print(f"\nFATAL ERROR: Could not open serial port {port}. Skipping Arduino stream.")
        return valid_commands, False

    # 3. Stream Commands
    print("\n--- Starting ASL Sequence Stream to Arduino ---")
    
    for i, command in enumerate(valid_commands):
        try:
            ser.write(command.encode('ascii'))
            print(f"[{i+1}/{len(valid_commands)}] Sent: {command}. Waiting {SIGN_DELAY_SECONDS}s...")
            time.sleep(SIGN_DELAY_SECONDS) 
        except Exception as e:
            print(f"An unexpected error occurred during streaming: {e}")
            break

    print("\n--- Arduino Sequence Complete ---")
    
    # Send the 'O' (Open Hand) command at the end and cleanup
    ser.write(b'O')
    time.sleep(SIGN_DELAY_SECONDS) 
    print("Sent 'O' command to reset hand position.")
    if ser and ser.is_open:
        ser.close()
        print(f"Disconnected from {port}.")

    return valid_commands, True


if __name__ == "__main__":
    
    # STEP 0: Perform the ASL Sequence Stream
    asl_sequence, stream_success = stream_commands_to_arduino(SERIAL_PORT, BAUD_RATE, COMMAND_FILE)

    if not asl_sequence:
        print("\nCannot proceed to C++ generation as no valid ASL sequence was read from the file.")
        sys.exit(1)
        
    # --- Start C++ Workflow (Executes regardless of stream_success) ---
    
    # STEP 1: Generate the C++ Source File
    if generate_cpp_source(asl_sequence):
        
        # STEP 2: Compile the C++ Program
        executable = compile_cpp_program()
        
        if executable:
            # STEP 3: Execute the compiled C++ Program
            run_cpp_program(executable)
            
            # Optional: Clean up the generated files after execution
            # os.remove(CPP_SOURCE_FILE)
            # os.remove(executable)
            # print(f"\nCleaned up {CPP_SOURCE_FILE} and {executable}")


ERROR: Command file 'asl_sentence.txt' not found.

Cannot proceed to C++ generation as no valid ASL sequence was read from the file.


SystemExit: 1

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
