In [1]:
import whisper
import queue,threading,time
import numpy as np
from scipy.io.wavfile import write as wv
import tempfile,os
import sounddevice as sd
import pyttsx3
from datetime import datetime 
import pickle 
import face_recognition
import cv2
from pathlib import Path 
import google.generativeai as genai
from dotenv import load_dotenv
import requests,json

  from pkg_resources import resource_filename


In [2]:
model=whisper.load_model("base.en")


In [5]:
threading.Thread(target=mic_stream, daemon=True).start()


In [6]:
load_dotenv()

False

In [7]:
class AIGuardAgent:
    def __init__(self):
        self.model=whisper.load_model("base.en")
        self.SAMPLE_RATE=16000
        self.CHUNK_SECONDS=2
        self.audio_queue=queue.Queue()

        self.tts_engine=pyttsx3.init()
        self.tts_engine.setProperty('rate',150)

        self.guard_mode=False
        self.listening=False
        self.stop_flag=False

        self.face_db_path = "FACE_DB_PATH"#fill this in later 
        self.load_trusted_faces()
    
        self.activation_phrases = [
            "guard my room",
            "protect my room", 
            "secure my room",
            "start guard mode",
            "activate guard"
        ]
        self.deactivation_phrases = [
            "stop guard mode",
            "deactivate guard",
            "stand down",
            "stop monitoring",
            "goodbye guard"
        ]
        self.enrollment_phrases = [
            "enroll face",
            "register face",
            "add trusted person"
        ]
    
    def speak(self,text):
        self.tts_engine.say(text)
        self.tts_engine.runAndWait()
    
    def load_trusted_faces(self):

        if os.path.exists(self.face_db_path):
            with open(self.face_db_path,'rb') as f:
                data=pickle.load(f)#depends on what type of file me useing for the db i guess
                self.known_face_encodings=data['encodings']
                self.known_face_names=data['names']
            print(f"Loaded {len(self.known_face_names)} trusted faces.")

        else:
            print("No existing faces database found. Starting fresh.")
            self.known_face_encodings=[]
            self.known_face_names=[]

    def save_trusted_faces(self):
        with open(self.face_db_path,'wb') as f:  
            pickle.dump({'encodings':self.known_face_encodings,'names':self.known_face_names},f)

            print("Saved face to db")

    def enroll_using_webcam(self,name="unknown"):
        self.camera=cv2.VideoCapture(0)
        self.speak(f"Please look at the camera for face enrollment as {name}")

        enrollment_frames = []
        frames_captured = 0
        max_frames = 30
        
        while frames_captured < max_frames:
            ret, frame = self.camera.read()
            if not ret:
                print(" Failed to capture frame")
                continue
            
            # Convert BGR to RGB
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            # Find all face locations and encodings in the current frame
            face_locations = face_recognition.face_locations(rgb_frame)
            face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
            
            if len(face_encodings) == 1:
                enrollment_frames.append(face_encodings[0])
                frames_captured += 1
                print(f" Captured face frame {frames_captured}/{max_frames}")
                
                # Show preview
                cv2.putText(frame, f"Enrolling: {frames_captured}/{max_frames}", 
                           (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
            
            cv2.imshow("Face Enrollment - Press 'q' to cancel", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            
            time.sleep(0.5)  # Wait between captures
        
        cv2.destroyAllWindows()
        
        if enrollment_frames:
            # Average the encodings for better accuracy
            avg_encoding = np.mean(enrollment_frames, axis=0)
            self.known_face_encodings.append(avg_encoding)
            self.known_face_names.append(name)
            self.save_trusted_faces()
            self.speak(f"Successfully enrolled {name} as a trusted person")
            return True
        else:
            self.speak("Failed to capture face. Please try again.")
            return False
        
    def mic_stream(self):
        """Capture microphone audio in chunks"""
        def callback(indata, frames, time, status):
            if status:
                print(f"Audio status: {status}")
            self.audio_queue.put(indata.copy())
        
        with sd.InputStream(
            samplerate=self.SAMPLE_RATE, 
            channels=1, 
            dtype='int16', 
            callback=callback,
            blocksize=int(self.SAMPLE_RATE * self.CHUNK_SECONDS)
        ):
            while not self.stop_flag:
                time.sleep(0.1)


    def check_activation_command(self, text):
        """Check if text contains any activation phrase"""
        return any(phrase in text for phrase in self.activation_phrases)
    
    def check_deactivation_command(self, text):
        """Check if text contains any deactivation phrase"""
        return any(phrase in text for phrase in self.deactivation_phrases)
    
    def check_enrollment_command(self, text):
        """Check if text contains face enrollment phrase"""
        return any(phrase in text for phrase in self.enrollment_phrases)
    
    def activate_guard_mode(self):
        """Activate guard mode with voice confirmation"""
        self.guard_mode = True
        self.speak("Guard mode activated! Starting face monitoring.")
        print(f"Guard mode ACTIVATED at {datetime.now().strftime('%H:%M:%S')}")
        
        # Start face monitoring in a separate thread
        face_thread = threading.Thread(target=self.face_monitoring_loop, daemon=True)
        face_thread.start()
    
    def deactivate_guard_mode(self):
        """Deactivate guard mode with voice confirmation"""
        self.guard_mode = False
        self.speak("Guard mode deactivated. Goodbye!")
        print(f"Guard mode DEACTIVATED at {datetime.now().strftime('%H:%M:%S')}")
    def process_audio_chunk(self, chunk):
        """Process audio chunk with Whisper and handle commands"""
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
            path = tmp.name
        
        try:
            wv(path, self.SAMPLE_RATE, chunk)
            result = self.model.transcribe(
                path, 
                language="en", 
                fp16=False, 
                condition_on_previous_text=False
            )
            
            text = (result.get("text") or "").strip().lower()
            
            if text:
                print(f"Heard: {text}")
                
                # Check for various commands
                if self.check_activation_command(text) and not self.guard_mode:
                    self.activate_guard_mode()
                elif self.check_deactivation_command(text) and self.guard_mode:
                    self.deactivate_guard_mode()
                elif self.check_enrollment_command(text):
                    self.speak("Starting face enrollment process.")
                    self.enroll_using_webcam()
                elif "how many trusted" in text or "list trusted" in text:
                    count = len(self.known_face_names)
                    if count == 0:
                        self.speak("No trusted faces enrolled yet.")
                    else:
                        self.speak(f"I have {count} trusted faces enrolled.")
                        print(f"Trusted faces: {', '.join(self.known_face_names)}")
                elif self.guard_mode:
                    print(f" In guard mode, heard: {text}")
                    
        except Exception as e:
            print(f"Error processing audio: {e}")
        finally:
            if os.path.exists(path):
                os.remove(path)
    
 
    def recognize(self,frame):
        if not self.known_face_encodings:
            return [],[]
        
        # convert to rgb 
        frame=cv2.cvtColor(frame,cv2.COLOR_BGR2RGB)
        #find faces and then make encoding to store
        face_locations=face_recognition.face_locations(frame)
        face_encodings=face_recognition.face_encodings(frame,face_locations)
        recognized_names=[]
        recognized_status=[]
        for item in face_encodings:
            matches=face_recognition.compare_faces(self.known_face_encodings,item,tolerance=0.5)

            name="unknown"
            status="unknown"
            face_distances=face_recognition.face_distance(self.known_face_encodings,item)
            best_match_index=np.argmin(face_distances) if len(face_distances)>0 else None

            if matches[best_match_index]:
                name=self.known_face_names[best_match_index]
                status="trusted"

            recognized_names.append(name)
            recognized_status.append(status)

        return recognized_names,recognized_status
    def face_monitoring_loop(self):
        if not self.camera:
            self.camera = cv2.VideoCapture(0)
        
        if not self.camera.isOpened():
            self.speak("Error accessing the webcam for face monitoring.")
            print("Error: Could not open webcam.")
            self.guard_mode = False
            return
        
        self.speak("Face monitoring started. Scanning for trusted individuals.")

        last_announcement={}
        announcement_cd=30

        while self.guard_mode and not self.stop_flag:
            ret,frame=self.camera.read()

            if not ret:
                print("Failed to capture frame from webcam")
                #sleep a bit to not overload
                time.sleep(1)
                continue
            
            names,stati=self.recognize(frame)

            curr_t=time.time()
            for name,status in zip(names,stati):
                if status=="trusted":
                    if name not in last_announcement or (curr_t - last_announcement[name]) > announcement_cd:
                        self.speak(f"Hello {name}, welcome back!")
                        print(f" Recognized trusted person: {name}")
                        last_announcement[name]=curr_t
                else:
                    if "unknown" not in last_announcement or (curr_t - last_announcement["unknown"]) > announcement_cd:
                        self.speak("Alert! Unknown person detected!")
                        print(" Alert! Unknown person detected!")
                        last_announcement["unknown"]=curr_t
            if len(names) > 0:
                for (top, right, bottom, left), name, status in zip(
                    face_recognition.face_locations(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)), 
                    names, stati
                ):
                    color = (0, 255, 0) if status == "trusted" else (0, 0, 255)
                    cv2.rectangle(frame, (left, top), (right, bottom), color, 2)
                    cv2.putText(frame, f"{name} ({status})", (left, top - 10),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
            
            cv2.imshow("AI Guard - Face Monitoring (Press 'q' to stop)", frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            
            #time.sleep(0.1) uncomment if your cpu dies. We run a ppt then
        
        cv2.destroyAllWindows()#stop
        print("Face monitoring stopped")
    

  
    def start_listening(self):
        """Start the continuous listening loop"""
        self.listening = True
        self.stop_flag = False
        
        # Start microphone stream in background thread
        audio_thread = threading.Thread(target=self.mic_stream, daemon=True)
        audio_thread.start()
        
        self.speak("AI Guard system ready. Say 'Guard my room' to activate or 'Enroll face' to add trusted persons.")
        print(" Listening for commands...")
        print(f" {len(self.known_face_names)} trusted faces loaded")
        
        try:
            while self.listening and not self.stop_flag:
                if not self.audio_queue.empty():
                    chunk = self.audio_queue.get()
                    self.process_audio_chunk(chunk)
                else:
                    time.sleep(0.1)
                    
        except KeyboardInterrupt:
            print("\nShutting down AI Guard system...")
            self.stop_flag = True
    
    def stop_listening(self):
        """Stop the listening loop"""
        self.listening = False
        self.stop_flag = True
        if self.camera:
            self.camera.release()
        cv2.destroyAllWindows()
    

In [10]:
guard=AIGuardAgent()
guard.start_listening()

No existing faces database found. Starting fresh.
 Listening for commands...
 0 trusted faces loaded
Heard: register face.
Heard: okay, i'm an open gift.
Heard: well, you will know that
Heard: and
Heard: i have to take it. it works.
Heard: a consequence.
Heard: but it may or may not be there into. okay? how about cass' tail? oh, that's great.
Heard: find my room. bled in check.
Heard: find myut paul glensity

Shutting down AI Guard system...
