diff --git a/backend/requirements.txt b/backend/requirements.txt index 3b7c52fb40d..cc5d68e1437 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -255,3 +255,4 @@ stripe==11.3.0 typesense==0.21.0 pycountry==24.6.1 google-cloud-translate==3.20.2 +langdetect==1.0.9 diff --git a/backend/utils/translation.py b/backend/utils/translation.py index 97fb64c14d3..124887624bf 100644 --- a/backend/utils/translation.py +++ b/backend/utils/translation.py @@ -5,6 +5,8 @@ from typing import List from google.cloud import translate_v3 +from langdetect import detect as langdetect_detect, DetectorFactory +from langdetect.lang_detect_exception import LangDetectException # LRU Cache for language detection @@ -114,20 +116,87 @@ _parent = f"projects/{PROJECT_ID}/locations/global" _mime_type = "text/plain" +# Initialize langdetect for consistent results +DetectorFactory.seed = 0 + +# Languages with 100% accuracy in langdetect +LANGDETECT_RELIABLE_LANGUAGES = { + 'af', + 'ar', + 'bg', + 'bn', + 'ca', + 'cs', + 'cy', + 'da', + 'de', + 'el', + 'en', + 'es', + 'et', + 'fa', + 'fi', + 'fr', + 'gu', + 'he', + 'hi', + 'hr', + 'hu', + 'id', + 'it', + 'ja', + 'kn', + 'ko', + 'lt', + 'lv', + 'mk', + 'ml', + 'mr', + 'ne', + 'nl', + 'no', + 'pa', + 'pl', + 'pt', + 'ro', + 'ru', + 'sk', + 'sl', + 'so', + 'sq', + 'sv', + 'sw', + 'ta', + 'te', + 'th', + 'tl', + 'tr', + 'uk', + 'ur', + 'vi', +} + + +def _detect_with_langdetect(text: str, hint_language: str = None) -> str | None: + if hint_language not in LANGDETECT_RELIABLE_LANGUAGES: + return None + try: + return langdetect_detect(text) + except LangDetectException: + return None -def detect_language(text: str, remove_non_lexical: bool = False) -> str | None: - """ - Detects the language of the provided text using Google Cloud Translate API. - Uses a cache to avoid redundant detections. - Args: - text: The text to detect language for - remove_non_lexical: If True, removes common non-lexical utterances before detection. +def _detect_with_google_cloud(text: str) -> str | None: + """Helper function to detect language using Google Cloud API.""" + response = _client.detect_language(parent=_parent, content=text, mime_type=_mime_type) + if response.languages and len(response.languages) > 0: + for language in response.languages: + if language.confidence >= 1: + return language.language_code + return None - Returns: - The language code of the detected language (e.g., 'en', 'vi', 'fr') if confidence >= 1, - or None if no language with sufficient confidence is found - """ + +def detect_language(text: str, remove_non_lexical: bool = False, hint_language: str = None) -> str | None: text_for_detection = text if remove_non_lexical: cleaned_text = _non_lexical_utterances_pattern.sub('', text) @@ -140,25 +209,31 @@ def detect_language(text: str, remove_non_lexical: bool = False) -> str | None: detection_cache.move_to_end(text_for_detection) return detection_cache[text_for_detection] + # Count words to determine which detection method to use + word_count = len(text_for_detection.split()) + detected_language = None + + # Use Google Cloud API for short text (≤5 words) + # Otherwise, use langdetect for longer text (cost-effective) + # Fallback to Google Cloud API if langdetect fails try: - # Call the Google Cloud Translate API to detect language - response = _client.detect_language(parent=_parent, content=text_for_detection, mime_type=_mime_type) - - detected_language = None - # Return the language code only if confidence is >= 1 - if response.languages and len(response.languages) > 0: - for language in response.languages: - if language.confidence >= 1: - detected_language = language.language_code - break - - if len(detection_cache) >= MAX_DETECTION_CACHE_SIZE: - detection_cache.popitem(last=False) - detection_cache[text_for_detection] = detected_language - return detected_language + if word_count <= 5: + detected_language = _detect_with_google_cloud(text_for_detection) + if not detected_language: + detected_language = _detect_with_langdetect(text_for_detection, hint_language) + + # Cache the result + if detected_language: + if len(detection_cache) >= MAX_DETECTION_CACHE_SIZE: + detection_cache.popitem(last=False) + detection_cache[text_for_detection] = detected_language + return detected_language + except Exception as e: print(f"Language detection error: {e}") - return None # Return None on error + return None + + return detected_language def split_into_sentences(text: str) -> List[str]: diff --git a/backend/utils/translation_cache.py b/backend/utils/translation_cache.py index 2924dc7dd04..aa6ee74d862 100644 --- a/backend/utils/translation_cache.py +++ b/backend/utils/translation_cache.py @@ -6,66 +6,30 @@ class TranscriptSegmentLanguageCache: """ A class to manage language detection caching for transcript segments. - - This cache stores information about whether a segment's text is in the target language - and tracks text changes to optimize language detection by checking sentence by sentence. """ def __init__(self): - """Initialize an empty language detection cache.""" - # Cache structure: {segment_id: (text, is_target_language)} - # is_target_language can be: - # - True: text is in target language - # - False: text is not in target language - # - None: language has not been detected yet - self.cache: Dict[str, Tuple[str, Optional[bool]]] = {} - - @staticmethod - def _get_text_difference(new_text: str, old_text: str) -> str: - if not old_text: - return new_text - - # Simple approach: if new text starts with old text, return the difference - if new_text.startswith(old_text): - return new_text[len(old_text) :].strip() - - # If not a simple continuation, return the full new text for re-evaluation - return new_text + self.cache: Dict[str, Optional[bool]] = {} def is_in_target_language(self, segment_id: str, text: str, target_language: str) -> bool: - """ - Determines if the segment text is in the target language. - It performs sentence-level language detection on new text and caches the result for the segment. - Returns True if no translation is needed, False otherwise. - """ - cached_text, was_in_target_language = self.cache.get(segment_id, (None, None)) - # If we already determined it's not the target language, it remains so. - # Update cache with the latest text. + was_in_target_language = self.cache.get(segment_id, None) if was_in_target_language is False: - if text != cached_text: - self.cache[segment_id] = (text, False) return False - diff_text = self._get_text_difference(text, cached_text) - # If no new text to analyze, rely on the previous state. - if not diff_text: - return was_in_target_language is not False # True or None results in True - - sentences = split_into_sentences(diff_text) - if not sentences: + # True or None results in True + if not text: return was_in_target_language is not False - # Check each new sentence. If any is not in the target language, the whole segment is marked for translation. - for sentence in sentences: - detected_lang = detect_language(sentence, remove_non_lexical=True) - if detected_lang and detected_lang != target_language: - self.cache[segment_id] = (text, False) - return False + # Use full text detection for better accuracy and performance + detected_lang = detect_language(text, remove_non_lexical=True, hint_language=target_language) + if detected_lang and detected_lang != target_language: + self.cache[segment_id] = False + return False - # All new sentences are in the target language or undetectable. - self.cache[segment_id] = (text, True) + # All text is in the target language or undetectable. + self.cache[segment_id] = True return True def delete_cache(self, segment_id: str) -> None: