In [1]:
from pytube import YouTube
import re
import os
import cv2
import numpy as np
import tensorflow as tf
from tensorflow import keras
import time
from twilio.rest import Client
from datetime import datetime
from keras import backend as K


In [None]:
def recall_m(y_true, y_pred):
  true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
  possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
  recall = true_positives / (possible_positives + K.epsilon())
  return recall

def precision_m(y_true, y_pred):
  true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
  predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
  precision = true_positives / (predicted_positives + K.epsilon())
  return precision

def f1_m(y_true, y_pred):
  precision = precision_m(y_true, y_pred)
  recall = recall_m(y_true, y_pred)
  return 2*((precision*recall)/(precision+recall+K.epsilon()))

In [None]:
# Twilio credentials


def verify_twilio_credentials():
  """Verify Twilio credentials are working"""
  try:
      client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
      # Try to access account info to verify credentials
      account = client.api.accounts(TWILIO_ACCOUNT_SID).fetch()
      print("Twilio credentials verified successfully")
      return True
  except Exception as e:
      print(f"Twilio credential verification failed: {str(e)}")
      return False


In [None]:
def send_emergency_sms(source_type="webcam"):
  """Send an emergency SMS using Twilio with enhanced error handling"""
  try:
      # First verify credentials
      if not verify_twilio_credentials():
          print("Skipping SMS send due to invalid credentials")
          return False

      # Initialize Twilio client
      client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
      
      # Prepare message
      message_body = f"⚠️ ALERT: Fight detected in {source_type} footage! Please check immediately."
      
      # Send message with detailed error handling
      try:
          message = client.messages.create(
              body=message_body,
              from_=TWILIO_PHONE_NUMBER,
              to=EMERGENCY_NUMBER
          )
          print(f"Emergency SMS sent successfully - SID: {message.sid}")
          return True
          
      except Exception as msg_error:
          if 'unauthorized' in str(msg_error).lower():
              print("Error: Unauthorized. Please check your Twilio credentials.")
          elif 'not a valid phone number' in str(msg_error).lower():
              print(f"Error: Invalid phone number format: {EMERGENCY_NUMBER}")
          elif 'not a valid twilio phone number' in str(msg_error).lower():
              print(f"Error: Invalid Twilio phone number: {TWILIO_PHONE_NUMBER}")
          else:
              print(f"SMS sending failed: {str(msg_error)}")
          return False
          
  except Exception as e:
      print(f"Failed to initialize Twilio client: {str(e)}")
      return False


In [None]:
def make_emergency_call():
  """Make an emergency call using Twilio with enhanced error handling"""
  try:
      # First verify credentials
      if not verify_twilio_credentials():
          print("Skipping call due to invalid credentials")
          return False

      # Initialize Twilio client
      client = Client(TWILIO_ACCOUNT_SID, TWILIO_AUTH_TOKEN)
      
      try:
          call = client.calls.create(
              url='http://demo.twilio.com/docs/voice.xml',
              to=EMERGENCY_NUMBER,
              from_=TWILIO_PHONE_NUMBER
          )
          print(f"Emergency call initiated successfully - SID: {call.sid}")
          return True
          
      except Exception as call_error:
          if 'unauthorized' in str(call_error).lower():
              print("Error: Unauthorized. Please check your Twilio credentials.")
          elif 'not a valid phone number' in str(call_error).lower():
              print(f"Error: Invalid phone number format: {EMERGENCY_NUMBER}")
          elif 'not a valid twilio phone number' in str(call_error).lower():
              print(f"Error: Invalid Twilio phone number: {TWILIO_PHONE_NUMBER}")
          else:
              print(f"Call initiation failed: {str(call_error)}")
          return False
          
  except Exception as e:
      print(f"Failed to initialize Twilio client: {str(e)}")
      return False


In [None]:
def process_video_feed(frame, model, frames_buffer, segment_frames=42):
  """Process video frame and make prediction"""
  resized_frame = cv2.resize(frame, (128, 128))
  frames_buffer.append(resized_frame)
  
  if len(frames_buffer) > segment_frames:
      frames_buffer.pop(0)
  
  if len(frames_buffer) == segment_frames:
      video_segment = np.array(frames_buffer)
      video_segment = video_segment.astype('float32') / 255.0
      video_segment = np.expand_dims(video_segment, axis=0)
      
      prediction = model.predict(video_segment, verbose=0)
      predicted_class = np.argmax(prediction[0])
      confidence = prediction[0][predicted_class]
      
      return predicted_class, confidence
  
  return None, None

def monitor_video_file(video_path, model_path, confidence_threshold=0.75, cooldown_period=60):
  """Monitor video file for fight detection (headless version)"""
  if not os.path.exists(video_path):
      print(f"Error: Video file not found at {video_path}")
      return
  
  print(f"Starting video file surveillance: {video_path}")
  print(f"Emergency contact number: {EMERGENCY_NUMBER}")
  
  # Create output directory for detected frames
  output_dir = 'detected_fights'
  os.makedirs(output_dir, exist_ok=True)
  
  try:
      model = keras.models.load_model(model_path, 
                                  custom_objects={'recall_m': recall_m, 
                                                'precision_m': precision_m, 
                                                'f1_m': f1_m})
  except Exception as e:
      print(f"Error loading model: {str(e)}")
      return
  
  cap = cv2.VideoCapture(video_path)
  if not cap.isOpened():
      print("Error: Could not open video file")
      return

  frames_buffer = []
  last_alert_time = 0
  frame_count = 0
  
  print("Processing video... Press Ctrl+C to stop")
  
  try:
      while True:
          ret, frame = cap.read()
          if not ret:
              break
          
          frame_count += 1
          if frame_count % 100 == 0:  # Progress update every 100 frames
              print(f"Processed {frame_count} frames")
          
          predicted_class, confidence = process_video_feed(frame, model, frames_buffer)
          
          if predicted_class is not None:
              current_time = time.time()
              if predicted_class == 1 and confidence >= confidence_threshold:
                  if current_time - last_alert_time >= cooldown_period:
                      print(f"\nFight detected in video!")
                      print(f"Frame: {frame_count}")
                      print(f"Confidence: {confidence:.2%}")
                      print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
                      
                      # Send alerts
                      alert_sent = False
                      if make_emergency_call():
                          if send_emergency_sms("video file"):
                              alert_sent = True
                              last_alert_time = current_time
                      
                      # Save detected frame
                      timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
                      output_path = os.path.join(output_dir, 
                                             f'fight_frame{frame_count}_{timestamp}.jpg')
                      cv2.imwrite(output_path, frame)
                      print(f"Saved detection frame to: {output_path}")
  
  except KeyboardInterrupt:
      print("\nProcessing interrupted by user")
  except Exception as e:
      print(f"\nError during video processing: {str(e)}")
  finally:
      cap.release()
      print(f"\nProcessing complete. Total frames processed: {frame_count}")
      print(f"Detected frames saved in: {output_dir}")


In [None]:
import yt_dlp
import os

def download_video(url, download_dir='youtube_downloads'):
  """Download YouTube video and return the path to downloaded file"""
  try:
      # Create downloads directory if it doesn't exist
      os.makedirs(download_dir, exist_ok=True)
      
      # Configure options
      ydl_opts = {
          'format': 'best[ext=mp4]',  # Best quality MP4
          'outtmpl': os.path.join(download_dir, '%(title)s.%(ext)s'),
          'quiet': False
      }
      
      print("Downloading video...")
      with yt_dlp.YoutubeDL(ydl_opts) as ydl:
          ydl.download([url])
          
          # Get the downloaded file path
          info = ydl.extract_info(url, download=False)
          video_title = info['title']
          safe_title = ''.join(c for c in video_title if c.isalnum() or c in (' ', '-', '_')).rstrip()
          video_path = os.path.join(download_dir, f'{safe_title}.mp4')
          
          return video_path
          
  except Exception as e:
      print(f"Error downloading video: {str(e)}")
      return None


In [None]:
# Modified main() function
def main():
  # Verify Twilio setup before starting
  print("Verifying Twilio setup...")
  if not verify_twilio_credentials():
      print("Warning: Twilio setup is not correct. Alerts will not be sent.")
      proceed = input("Do you want to continue anyway? (y/n): ")
      if proceed.lower() != 'y':
          return

  model_path = 'vivit_model'
  
  while True:
      print("\nFight Detection System")
      print("1. Process YouTube Video")
      print("2. Process Local Video File")
      print("3. Test SMS Alert")
      print("4. Exit")
      
      choice = input("Enter your choice (1-4): ")
      
      if choice == '1':
          youtube_url = input("Enter YouTube video URL: ")
          print("Attempting to download video...")
          video_path = download_video(youtube_url)
          
          if video_path and os.path.exists(video_path):
              print(f"Processing video: {video_path}")
              monitor_video_file(video_path, model_path)
          else:
              print("Failed to download video. Please try again.")
              
      elif choice == '2':
          video_path = input("Enter the path to video file: ")
          monitor_video_file(video_path, model_path)
      elif choice == '3':
          print("Testing SMS alert system...")
          if send_emergency_sms("test"):
              print("SMS test successful")
          else:
              print("SMS test failed")
      elif choice == '4':
          print("Exiting...")
          break
      else:
          print("Invalid choice. Please try again.")

# Add cleanup function
def cleanup_downloads():
  """Clean up downloaded videos to free space"""
  download_dir = 'youtube_downloads'
  if os.path.exists(download_dir):
      try:
          for file in os.listdir(download_dir):
              file_path = os.path.join(download_dir, file)
              if os.path.isfile(file_path):
                  os.remove(file_path)
          print("Cleaned up downloaded videos")
      except Exception as e:
          print(f"Error cleaning up downloads: {str(e)}")

if __name__ == "__main__":
  try:
      main()
  finally:
      # Clean up downloads when program exits
      cleanup_downloads()