Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,4 @@ stripe==11.3.0
typesense==0.21.0
pycountry==24.6.1
google-cloud-translate==3.20.2
langdetect==1.0.9
129 changes: 102 additions & 27 deletions backend/utils/translation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import List

from google.cloud import translate_v3
from langdetect import detect as langdetect_detect, DetectorFactory
from langdetect.lang_detect_exception import LangDetectException


# LRU Cache for language detection
Expand Down Expand Up @@ -114,20 +116,87 @@
_parent = f"projects/{PROJECT_ID}/locations/global"
_mime_type = "text/plain"

# Initialize langdetect for consistent results
DetectorFactory.seed = 0

# Languages with 100% accuracy in langdetect
LANGDETECT_RELIABLE_LANGUAGES = {
'af',
'ar',
'bg',
'bn',
'ca',
'cs',
'cy',
'da',
'de',
'el',
'en',
'es',
'et',
'fa',
'fi',
'fr',
'gu',
'he',
'hi',
'hr',
'hu',
'id',
'it',
'ja',
'kn',
'ko',
'lt',
'lv',
'mk',
'ml',
'mr',
'ne',
'nl',
'no',
'pa',
'pl',
'pt',
'ro',
'ru',
'sk',
'sl',
'so',
'sq',
'sv',
'sw',
'ta',
'te',
'th',
'tl',
'tr',
'uk',
'ur',
'vi',
}


def _detect_with_langdetect(text: str, hint_language: str = None) -> str | None:
if hint_language not in LANGDETECT_RELIABLE_LANGUAGES:
return None
try:
return langdetect_detect(text)
except LangDetectException:
return None

def detect_language(text: str, remove_non_lexical: bool = False) -> str | None:
"""
Detects the language of the provided text using Google Cloud Translate API.
Uses a cache to avoid redundant detections.

Args:
text: The text to detect language for
remove_non_lexical: If True, removes common non-lexical utterances before detection.
def _detect_with_google_cloud(text: str) -> str | None:
"""Helper function to detect language using Google Cloud API."""
response = _client.detect_language(parent=_parent, content=text, mime_type=_mime_type)
if response.languages and len(response.languages) > 0:
for language in response.languages:
if language.confidence >= 1:
return language.language_code
return None

Returns:
The language code of the detected language (e.g., 'en', 'vi', 'fr') if confidence >= 1,
or None if no language with sufficient confidence is found
"""

def detect_language(text: str, remove_non_lexical: bool = False, hint_language: str = None) -> str | None:
text_for_detection = text
if remove_non_lexical:
cleaned_text = _non_lexical_utterances_pattern.sub('', text)
Expand All @@ -140,25 +209,31 @@ def detect_language(text: str, remove_non_lexical: bool = False) -> str | None:
detection_cache.move_to_end(text_for_detection)
return detection_cache[text_for_detection]

# Count words to determine which detection method to use
word_count = len(text_for_detection.split())
detected_language = None

# Use Google Cloud API for short text (≤5 words)
# Otherwise, use langdetect for longer text (cost-effective)
# Fallback to Google Cloud API if langdetect fails
try:
# Call the Google Cloud Translate API to detect language
response = _client.detect_language(parent=_parent, content=text_for_detection, mime_type=_mime_type)

detected_language = None
# Return the language code only if confidence is >= 1
if response.languages and len(response.languages) > 0:
for language in response.languages:
if language.confidence >= 1:
detected_language = language.language_code
break

if len(detection_cache) >= MAX_DETECTION_CACHE_SIZE:
detection_cache.popitem(last=False)
detection_cache[text_for_detection] = detected_language
return detected_language
if word_count <= 5:
detected_language = _detect_with_google_cloud(text_for_detection)
if not detected_language:
detected_language = _detect_with_langdetect(text_for_detection, hint_language)

# Cache the result
if detected_language:
if len(detection_cache) >= MAX_DETECTION_CACHE_SIZE:
detection_cache.popitem(last=False)
detection_cache[text_for_detection] = detected_language
return detected_language

except Exception as e:
print(f"Language detection error: {e}")
return None # Return None on error
return None

return detected_language


def split_into_sentences(text: str) -> List[str]:
Expand Down
58 changes: 11 additions & 47 deletions backend/utils/translation_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,66 +6,30 @@
class TranscriptSegmentLanguageCache:
"""
A class to manage language detection caching for transcript segments.

This cache stores information about whether a segment's text is in the target language
and tracks text changes to optimize language detection by checking sentence by sentence.
"""

def __init__(self):
"""Initialize an empty language detection cache."""
# Cache structure: {segment_id: (text, is_target_language)}
# is_target_language can be:
# - True: text is in target language
# - False: text is not in target language
# - None: language has not been detected yet
self.cache: Dict[str, Tuple[str, Optional[bool]]] = {}

@staticmethod
def _get_text_difference(new_text: str, old_text: str) -> str:
if not old_text:
return new_text

# Simple approach: if new text starts with old text, return the difference
if new_text.startswith(old_text):
return new_text[len(old_text) :].strip()

# If not a simple continuation, return the full new text for re-evaluation
return new_text
self.cache: Dict[str, Optional[bool]] = {}

def is_in_target_language(self, segment_id: str, text: str, target_language: str) -> bool:
"""
Determines if the segment text is in the target language.
It performs sentence-level language detection on new text and caches the result for the segment.
Returns True if no translation is needed, False otherwise.
"""
cached_text, was_in_target_language = self.cache.get(segment_id, (None, None))

# If we already determined it's not the target language, it remains so.
# Update cache with the latest text.
was_in_target_language = self.cache.get(segment_id, None)
if was_in_target_language is False:
if text != cached_text:
self.cache[segment_id] = (text, False)
return False

diff_text = self._get_text_difference(text, cached_text)

# If no new text to analyze, rely on the previous state.
if not diff_text:
return was_in_target_language is not False # True or None results in True

sentences = split_into_sentences(diff_text)
if not sentences:
# True or None results in True
if not text:
return was_in_target_language is not False

# Check each new sentence. If any is not in the target language, the whole segment is marked for translation.
for sentence in sentences:
detected_lang = detect_language(sentence, remove_non_lexical=True)
if detected_lang and detected_lang != target_language:
self.cache[segment_id] = (text, False)
return False
# Use full text detection for better accuracy and performance
detected_lang = detect_language(text, remove_non_lexical=True, hint_language=target_language)
if detected_lang and detected_lang != target_language:
self.cache[segment_id] = False
return False

# All new sentences are in the target language or undetectable.
self.cache[segment_id] = (text, True)
# All text is in the target language or undetectable.
self.cache[segment_id] = True
return True

def delete_cache(self, segment_id: str) -> None:
Expand Down