<a href="https://colab.research.google.com/github/Ishittaaaa/transcription/blob/main/Transcription.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install openai==0.28 watchdog



In [None]:
import os
import time
import json
import logging
import tempfile
import subprocess
import requests
import sys
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
import openai

# Set up logging with more detailed formatting
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("transcription_service.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# All supported formats for the application
ALL_SUPPORTED_FORMATS = [".mp3", ".wav", ".mp4", ".mkv", ".mov", ".flv", ".aac", ".m4a"]

# Formats directly supported by OpenAI Whisper API
WHISPER_API_SUPPORTED_FORMATS = [".m4a", ".mp3", ".mp4", ".mpeg", ".mpga", ".wav", ".webm"]

# Session file to track processed files
SESSION_FILE = "transcription_session.json"

class TranscriptionService:
    def __init__(self, watch_directory, api_key, model="whisper-1", ffmpeg_path="ffmpeg"):
        self.watch_directory = os.path.abspath(watch_directory)  # Get absolute path
        logger.info(f"Initializing with directory: {self.watch_directory}")

        self.model_name = model
        self.ffmpeg_path = ffmpeg_path
        self.processed_files = set()
        self.temp_dir = tempfile.mkdtemp(prefix="whisper_conversion_")

        # Configure OpenAI API
        if not api_key or api_key == "your_api_key_here":
            error_msg = "API key is required for OpenAI Whisper API. Please set a valid key."
            logger.error(error_msg)
            print(f"ERROR: {error_msg}")
            raise ValueError(error_msg)

        # Test if the API key looks valid (has correct format)
        if not (api_key.startswith("sk-") and len(api_key) > 30):
            logger.warning("API key format looks incorrect. Keys typically start with 'sk-' and are longer")
            print("WARNING: API key format looks incorrect")

        # Initialize the OpenAI client by setting the API key
        openai.api_key = api_key
        self.client = openai  # Use the module directly
        logger.info(f"Using OpenAI Whisper API with model: {model}")

        # Test API connectivity
        self.test_api_connection()

        self.load_session()

    def test_api_connection(self):
        """Test connection to OpenAI API."""
        try:
            logger.info("Testing OpenAI API connection...")
            response = requests.get(
                "https://api.openai.com/v1/models",
                headers={"Authorization": f"Bearer {self.client.api_key}"}
            )
            if response.status_code == 200:
                logger.info("OpenAI API connection successful")
                print("OpenAI API connection successful")
            else:
                error_msg = f"OpenAI API test failed with status code: {response.status_code}, message: {response.text}"
                logger.error(error_msg)
                print(f"ERROR: {error_msg}")
        except Exception as e:
            error_msg = f"Failed to connect to OpenAI API: {str(e)}"
            logger.error(error_msg)
            print(f"ERROR: {error_msg}")

    def load_session(self):
        """Load the list of previously processed files."""
        session_path = os.path.join(self.watch_directory, SESSION_FILE)
        if os.path.exists(session_path):
            try:
                with open(session_path, 'r') as f:
                    data = json.load(f)
                    self.processed_files = set(data.get('processed_files', []))
                logger.info(f"Loaded {len(self.processed_files)} processed files from session")
            except Exception as e:
                logger.error(f"Failed to load session: {str(e)}")
                print(f"ERROR loading session: {str(e)}")
                self.processed_files = set()

    def save_session(self):
        """Save the list of processed files."""
        session_path = os.path.join(self.watch_directory, SESSION_FILE)
        try:
            with open(session_path, 'w') as f:
                json.dump({'processed_files': list(self.processed_files)}, f)
            logger.info(f"Saved {len(self.processed_files)} processed files to session")
        except Exception as e:
            logger.error(f"Failed to save session: {str(e)}")
            print(f"ERROR saving session: {str(e)}")

    def scan_directory(self):
        """Scan directory and subdirectories for media files."""
        media_files = []
        try:
            logger.info(f"Scanning directory: {self.watch_directory}")
            try:
                root_files = os.listdir(self.watch_directory)
                logger.info(f"Files in root directory: {root_files}")
                print(f"Files found: {root_files}")
            except Exception as e:
                logger.error(f"Error listing directory contents: {str(e)}")
                print(f"ERROR listing directory: {str(e)}")

            for root, dirs, files in os.walk(self.watch_directory):
                logger.debug(f"Scanning: {root}, found {len(files)} files")
                for file in files:
                    file_path = os.path.join(root, file)
                    file_ext = os.path.splitext(file_path.lower())[1]
                    logger.debug(f"Checking file: {file} with extension: {file_ext}")
                    if file_ext in ALL_SUPPORTED_FORMATS:
                        media_files.append(file_path)
                        logger.info(f"Found supported media file: {file_path}")
                        try:
                            file_size = os.path.getsize(file_path)
                            logger.info(f"File size: {file_size / 1024:.2f} KB")
                            print(f"Found file: {file_path} ({file_size / 1024:.2f} KB)")
                        except Exception as e:
                            logger.error(f"Error getting file size: {str(e)}")

            logger.info(f"Total media files found: {len(media_files)}")
            return media_files
        except Exception as e:
            logger.error(f"Error scanning directory: {str(e)}")
            print(f"ERROR scanning directory: {str(e)}")
            return []

    def convert_to_supported_format(self, file_path):
        """Convert file to a format supported by Whisper API if needed."""
        file_ext = os.path.splitext(file_path.lower())[1]
        try:
            basename = os.path.basename(file_path)
            temp_output = os.path.join(self.temp_dir, f"{os.path.splitext(basename)[0]}.mp3")
            logger.info(f"Converting {file_path} to {temp_output}")
            print(f"Converting file to MP3 format...")
            cmd = [
                self.ffmpeg_path,
                "-i", file_path,
                "-vn",
                "-ar", "44100",
                "-ac", "2",
                "-b:a", "128k",
                "-y",
                temp_output
            ]
            process = subprocess.run(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True
            )
            if process.returncode != 0:
                error_msg = f"Conversion failed: {process.stderr}"
                logger.error(error_msg)
                print(f"ERROR: {error_msg}")
                return None, error_msg
            if not os.path.exists(temp_output) or os.path.getsize(temp_output) == 0:
                error_msg = f"Conversion produced empty or non-existent file"
                logger.error(error_msg)
                print(f"ERROR: {error_msg}")
                return None, error_msg
            logger.info(f"Successfully converted {file_path} to {temp_output}")
            print(f"Conversion successful. File size: {os.path.getsize(temp_output) / 1024:.2f} KB")
            return temp_output, None
        except Exception as e:
            error_msg = f"Error converting file {file_path}: {str(e)}"
            logger.error(error_msg)
            print(f"ERROR: {error_msg}")
            return None, error_msg

    def transcribe_with_api(self, file_path):
        """Transcribe audio using the OpenAI Whisper API."""
        try:
            if not os.path.exists(file_path):
                raise FileNotFoundError(f"File not found: {file_path}")
            file_size = os.path.getsize(file_path)
            if file_size == 0:
                raise ValueError(f"File is empty: {file_path}")
            if file_size > 25 * 1024 * 1024:
                raise ValueError(f"File exceeds OpenAI's 25MB limit: {file_size / (1024 * 1024):.2f} MB")
            logger.info(f"Sending file to OpenAI API: {file_path} (Size: {file_size / 1024:.2f} KB)")
            print(f"Sending file to OpenAI API... (Size: {file_size / 1024:.2f} KB)")
            start_time = time.time()
            with open(file_path, "rb") as audio_file:
                try:
                    # Updated API call per documentation:
                    response = self.client.Audio.transcribe(
                        self.model_name,
                        audio_file,
                        timeout=60
                    )
                    elapsed_time = time.time() - start_time
                    logger.info(f"API request completed in {elapsed_time:.2f} seconds")
                    print(f"Transcription received in {elapsed_time:.2f} seconds")
                    # Return the transcription text
                    return response["text"]
                except Exception as e:
                    error_msg = f"API transcription error: {str(e)}"
                    logger.error(error_msg)
                    print(f"API ERROR: {error_msg}")
                    raise
        except Exception as e:
            error_msg = f"Failed to transcribe: {str(e)}"
            logger.error(error_msg)
            print(f"ERROR: {error_msg}")
            raise

    def transcribe_file(self, file_path):
        """Transcribe a media file using the Whisper API."""
        if file_path in self.processed_files:
            logger.info(f"Skipping already processed file: {file_path}")
            print(f"Skipping already processed file: {os.path.basename(file_path)}")
            return
        if not os.path.exists(file_path):
            logger.warning(f"File does not exist: {file_path}")
            print(f"Warning: File does not exist: {file_path}")
            return
        file_ext = os.path.splitext(file_path.lower())[1]
        if file_ext not in ALL_SUPPORTED_FORMATS:
            logger.warning(f"Unsupported file format: {file_path}")
            print(f"Warning: Unsupported file format: {file_path}")
            return
        output_path = os.path.splitext(file_path)[0] + ".txt"
        if os.path.exists(output_path):
            logger.info(f"Transcription already exists: {output_path}")
            print(f"Transcription already exists: {os.path.basename(output_path)}")
            self.processed_files.add(file_path)
            self.save_session()
            return
        try:
            print(f"Processing file: {os.path.basename(file_path)}")
            converted_path, error = self.convert_to_supported_format(file_path)
            if error:
                logger.error(f"Cannot transcribe {file_path}: {error}")
                print(f"Cannot transcribe: {error}")
                return
            logger.info(f"Transcribing: {file_path}")
            try:
                transcription = self.transcribe_with_api(converted_path)
                with open(output_path, "w", encoding="utf-8") as f:
                    f.write(transcription)
                if converted_path != file_path and os.path.exists(converted_path):
                    try:
                        os.remove(converted_path)
                    except Exception as e:
                        logger.warning(f"Failed to remove temporary file {converted_path}: {str(e)}")
                logger.info(f"Transcription saved: {output_path}")
                print(f"SUCCESS: Transcription saved to {os.path.basename(output_path)}")
                self.processed_files.add(file_path)
                self.save_session()
            except Exception as e:
                logger.error(f"API transcription failed: {str(e)}")
                print(f"API ERROR: {str(e)}")
        except Exception as e:
            logger.error(f"Transcription failed for {file_path}: {str(e)}")
            print(f"ERROR: Transcription failed: {str(e)}")

    def process_existing_files(self):
        """Process existing files in the directory."""
        logger.info("Checking for existing files to process...")
        print("Checking for existing files to process...")
        files = self.scan_directory()
        if not files:
            logger.warning("No media files found in the directory or its subdirectories.")
            print("No media files found. Make sure you have files with these extensions:", ALL_SUPPORTED_FORMATS)
        else:
            logger.info(f"Found {len(files)} media files to process")
            print(f"Found {len(files)} media files to process")
            for file_path in files:
                if file_path not in self.processed_files:
                    self.transcribe_file(file_path)

    def start_monitoring(self):
        """Start monitoring the directory for new files."""
        class MediaFileHandler(FileSystemEventHandler):
            def __init__(self, service):
                self.service = service

            def on_created(self, event):
                if not event.is_directory:
                    file_path = event.src_path
                    logger.info(f"New file detected: {file_path}")
                    file_ext = os.path.splitext(file_path.lower())[1]
                    if file_ext in ALL_SUPPORTED_FORMATS:
                        logger.info(f"New media file detected: {file_path}")
                        print(f"New media file detected: {os.path.basename(file_path)}")
                        time.sleep(1)
                        self.service.transcribe_file(file_path)
                    else:
                        logger.info(f"Ignoring non-media file: {file_path} with extension {file_ext}")

            def on_modified(self, event):
                if not event.is_directory:
                    file_path = event.src_path
                    file_ext = os.path.splitext(file_path.lower())[1]
                    if file_ext in ALL_SUPPORTED_FORMATS and file_path not in self.service.processed_files:
                        logger.info(f"Modified media file detected: {file_path}")
                        print(f"Modified media file detected: {os.path.basename(file_path)}")
                        time.sleep(2)
                        self.service.transcribe_file(file_path)

        self.process_existing_files()
        event_handler = MediaFileHandler(self)
        observer = Observer()
        if not os.path.exists(self.watch_directory):
            logger.error(f"Watch directory does not exist: {self.watch_directory}")
            print(f"Error: Directory {self.watch_directory} does not exist.")
            return
        observer.schedule(event_handler, self.watch_directory, recursive=True)
        try:
            observer.start()
            logger.info(f"Started monitoring directory: {self.watch_directory}")
            print(f"Monitoring directory: {self.watch_directory}")
            print(f"Watching for these file types: {ALL_SUPPORTED_FORMATS}")
            print("Press Ctrl+C to stop monitoring")
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            logger.info("Monitoring stopped by user")
            print("Monitoring stopped by user")
        except Exception as e:
            logger.error(f"Error during monitoring: {str(e)}")
            print(f"Error during monitoring: {str(e)}")
        finally:
            observer.stop()
            observer.join()
            self.save_session()
            try:
                for file in os.listdir(self.temp_dir):
                    file_path = os.path.join(self.temp_dir, file)
                    if os.path.isfile(file_path):
                        os.remove(file_path)
                os.rmdir(self.temp_dir)
                logger.info(f"Cleaned up temporary directory: {self.temp_dir}")
            except Exception as e:
                logger.warning(f"Failed to clean up temporary directory: {str(e)}")
            logger.info("Monitoring stopped")
            print("Monitoring stopped")

def check_ffmpeg():
    """Check if ffmpeg is available."""
    try:
        process = subprocess.run(
            ["ffmpeg", "-version"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        if process.returncode == 0:
            print("FFmpeg detected successfully")
            return True
        print("FFmpeg check failed: ffmpeg command returned non-zero exit code")
        return False
    except Exception as e:
        print(f"FFmpeg check failed: {str(e)}")
        return False

if __name__ == "__main__":
    print("\n===== Whisper Transcription Service =====\n")
    directory_to_watch = "/content/Test"
    if len(sys.argv) > 1:
        directory_to_watch = sys.argv[1]
    directory_to_watch = os.path.abspath(directory_to_watch)
    api_key = os.environ.get("OPENAI_API_KEY")
    if not api_key:
        print("OpenAI API key not found in environment variables.")
        if len(sys.argv) > 2:
            api_key = sys.argv[2]
            print("Using API key from command line argument")
        else:
            api_key = input("Please enter your OpenAI API key: ").strip()
    print(f"Starting Whisper Transcription Service")
    print(f"Directory to monitor: {directory_to_watch}")
    print(f"Supported file formats: {ALL_SUPPORTED_FORMATS}")
    try:
        os.makedirs(directory_to_watch, exist_ok=True)
        print(f"Ensured directory exists: {directory_to_watch}")
    except Exception as e:
        print(f"Error creating directory: {str(e)}")
        exit(1)
    if not os.path.exists(directory_to_watch):
        print(f"Error: Directory {directory_to_watch} does not exist.")
        exit(1)
    try:
        dir_contents = os.listdir(directory_to_watch)
        print(f"Current directory contents: {dir_contents}")
    except Exception as e:
        print(f"Error listing directory contents: {str(e)}")
        exit(1)
    if not check_ffmpeg():
        print("Error: FFmpeg is required for format conversion. Please install FFmpeg and make sure it's in your PATH.")
        exit(1)
    try:
        service = TranscriptionService(
            watch_directory=directory_to_watch,
            api_key=api_key,
            model="whisper-1"
        )
        service.start_monitoring()
    except KeyboardInterrupt:
        print("\nProgram terminated by user")
        logger.info("Program terminated by user")
    except Exception as e:
        error_msg = f"Unhandled exception: {str(e)}"
        logger.error(error_msg)
        print(f"\nERROR: {error_msg}")


In [None]:
!python app.py


===== Whisper Transcription Service =====

OpenAI API key not found in environment variables.
Please enter your OpenAI API key: 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
Starting Whisper Transcription Service
Directory to monitor: /content/Test
Supported file formats: ['.mp3', '.wav', '.mp4', '.mkv', '.mov', '.flv', '.aac', '.m4a']
Ensured directory exists: /content/Test
Current directory contents: ['.ipynb_checkpoints']
FFmpeg detected successfully
2025-03-01 12:45:02,759 - INFO - Initializing with directory: /content/Test
2025-03-01 12:45:02,760 - INFO - Using OpenAI Whisper API with model: whisper-1
2025-03-01 12:45:02,760 - INFO - Testing OpenAI API connection...
2025-03-01 12:45:03,562 - INFO - OpenAI API connection successful
OpenAI API connection successful
2025-03-01 12:45:03,563 - INFO - Checking for existing files to process...
Checking for existing file