# Machine-translate quasi-sentences in position-coded  subset of the PimPo dataset

| Authors | Last update |
|:------ |:----------- |
| Hauke Licht (https://github.com/haukelicht) | 2023-12-07 |

<br>

<a target="_blank" href="https://colab.research.google.com/github/fabiennelind/Going-Cross-Lingual_Course/blob/main/code/example_translate_pimpo_position_data.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

We use the [PimPo](https://manifesto-project.wzb.eu/information/documents/pimpo) dataset that records party manifesto quasi-sentences coded for positions on immigration or integration.
We have already subset this data to quasi-sentences mentioning the issues of immigration or integration.

## Setup

In [None]:
# check if on colab
try:
    import google.colab
    COLAB = True
except:
    COLAB=False
print('on colab:', COLAB)

In [None]:
# need to install libraries if on Colab
%%capture
if COLAB:
    !pip install iso639==0.1.4 easynmt==2.0.0 deepl==1.16.1 google-cloud-translate==3.12.1

In [None]:
import os
import pandas as pd

import iso639

import torch

import easynmt
import deepl
from google.oauth2 import service_account
from google.cloud import translate_v2 as gt

base_path = os.path.join('..')
data_path = os.path.join(base_path, 'data')

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'

## Load data

In [None]:
# fp = os.path.join(data_path, 'lehmann+zobel_2018_pimpo_positions_translated.tsv')
fp = 'https://raw.githubusercontent.com/fabiennelind/Going-Cross-Lingual_Course/main/data/lehmann%2Bzobel_2018_pimpo_positions.tsv'
df = pd.read_csv(fp, sep='\t', encoding='utf-8')

In [None]:
df.position.value_counts()

In [None]:
print(f'# characters =~ {df.text.apply(len).sum() / 1_000_000:.03f} mio')
print(f'approximate costs =~ {(df.text.apply(len).sum() / 1_000_000)*20:.02f} EUR')

## Definining the translation functions

In [None]:
# chunk list of sentences into smaller chunks
def chunk(lst, size):
    for i in range(0, len(lst), size):
        yield lst[i:i + size]

In [None]:
import torch
import gc

def clean_memory(device):
    if 'cuda' in str(device):
        torch.cuda.empty_cache()
        torch.cuda.ipc_collect()
    elif 'mps' in str(device):
        torch.mps.empty_cache()
    else:
        pass
    gc.collect()

In [None]:
import torch
from typing import Callable, Union

def translate_batch_safely(texts: list, translation_fun: Callable, device: Union[str, torch.device], **kwargs) -> list:
    """
    Translates a batch of texts using the model, handling potential errors.

    Parameters:
        texts (list): A list of texts to be translated.
        translation_fun (Callable): The translation function to be used.
        **kwargs: Additional keyword arguments to be passed to `translation_fun`.

    Returns:
        list: A list of translated texts.
    """
    try:
        # Attempt to translate the batch of texts using the model
        res = translation_fun(texts, **kwargs)
    except Exception as e:
        # If the exception is _not_ related to running out of memory, ...
        if 'out of memory' not in str(e):
            # ... raise the exception
            raise e
        # but if the error was due running out of memory, ...
        else:
            clean_memory(device)
            res = [None] * len(texts)
            # ... try translating each text individually
            for i, text in enumerate(texts):
                try:
                    res[i] = translation_fun(text, batch_size=1, **kwargs)
                except:
                    # If unable to translate a text, print a warning message
                    print(f'WARNING: couldn\'t translate text "{text}")')
    return res

In [None]:
from tqdm.auto import tqdm

def translate_in_batches(texts: list, batch_size, verbose=False, pbar_desc=None, **kwargs) -> list:
    """
    Translates a list of texts in batches using the `translate_batch_safely` function.

    Parameters:
        texts (list): A list of texts to be translated.
        batch_size (int): The size of each translation batch.
        verbose (bool): Whether to print messages and a progress bar
        pbard_desc (str): The description of the progress bar
        **kwargs: Additional keyword arguments to be passed to the `translate_batch_safely` function.

    Returns:
        list: A list of translated texts.
    """
    # Initialize an empty list to store the translations
    translations = []
    n_batches = len(texts)//batch_size
    if verbose:
        pbar = tqdm(total=n_batches, desc=pbar_desc)
    # Iterate over the batches of texts
    for batch in chunk(texts, batch_size):
        # Translate the batch of texts
        translations += translate_batch_safely(batch, **kwargs)
        if verbose: 
            pbar.update(1)
    if verbose: 
        pbar.close()
    return translations

In [None]:
import numpy as np
from typing import Callable, Union, List

# helpers
def is_string_series(s: pd.Series):
    """
    Test if pandas series is a string series/series of strings
    
    source: https://stackoverflow.com/a/67001213
    """
    if isinstance(s.dtype, pd.StringDtype):
        # The series was explicitly created as a string series (Pandas>=1.0.0)
        return True
    elif s.dtype == 'object':
        # Object series, check each value
        return all(isinstance(v, str) or (v is None) or (np.isnan(v)) for v in s)
    else:
        return False

def is_nonempty_string(s: pd.Series):
    return np.array([isinstance(v, str) and len(v) > 0 for v in s], dtype=bool)

# main function
def translate_df(
        df: pd.DataFrame, 
        translation_function: Callable,
        supported_languages: List[str],
        text_col: str = 'text', 
        lang_col: str = 'lang',
        target_language: str = 'en',
        target_col: str = 'translation',
        device: Union[str, torch.device] = 'cpu',
        batch_size: int = 16,
        verbose: bool = False,
        **kwargs
    ):
    """
    Translates the texts in a data frame from the source languages specified in a column to a target language and add the translations to the data frame.

    Parameters:
        df (pd.DataFrame): The DataFrame containing the texts to be translated.
        translation_function (Callable): The translation function to be used.
        supported_languages (List[str]): A list language codes supported by the translation model.
        text_col (str): The name of the column in the DataFrame that contains the texts to be translated. Default is 'text'.
        lang_col (str): The name of the column in the DataFrame that contains the language codes. Default is 'lang'.
        target_language (str): The target language to translate the texts to. Can be either an ISO 639-1 or ISO 639-2 language code. Default is 'en'.
        target_col (str): The name of the column in the DataFrame to store the translations. Default is 'translation'.
        supported_languages (List[str]): A list of ISO 639-1 or ISO 639-2 language codes supported by the translation model. Default is None.
        device (Union[str, torch.device]): The device to use for translation. Default is 'cpu' but should be compatible with device used by translation model.
        batch_size (int): The size of each translation batch. Default is 16.
        **kwargs: Additional keyword arguments to be passed to the `translate_in_batches` function which, in turn, passes them to the translation function.

    Returns:
        pd.DataFrame: The DataFrame with the translated texts in column `target_col` in the target language `target_lang`.
    """
    # validate the inputs
    assert text_col in df.columns, f'Column "{text_col}" not found in data frame.'
    assert is_string_series(df[text_col]), f'Column "{text_col}" is not a series of string values.'
    assert lang_col in df.columns, f'Column "{lang_col}" not found in data frame.'
    assert is_string_series(df[lang_col]), f'Column "{lang_col}" is not a series of string values.'
    assert target_language is not None, 'Target language must be specified.'
    assert target_col not in df.columns, f'Column "{target_col}" already exists in data frame.'
    assert translation_function is not None, 'Translation function must be specified.'
    assert batch_size > 0, 'Batch size must be greater than 0.'
    assert supported_languages is not None, 'Supported languages must be specified.'
    assert isinstance(supported_languages, list), 'Supported languages must be a list.'
    assert len(supported_languages) > 0, 'Supported languages must not be empty.'
    assert all([isinstance(l, str) for l in supported_languages]), 'Supported languages must be a list of strings.'
    
    # check whether the model supports the target language
    langs = df['lang'].unique().tolist()
    # try to get the ISO 639-1 or ISO 639-2 language code for each language in the data frame
    langs_map = {
        l: l if iso639.is_valid639_1(l) else iso639.to_iso639_1(l) if iso639.is_valid639_2(l) else None 
        for l in langs
    }
    # check whether there are unsupported languages
    not_supported = [
        l 
        for l, c in langs_map.items() 
        if l not in supported_languages and c not in supported_languages and l != target_language and c != target_language
    ]
    # print warning message if there are unsupported languages
    if len(not_supported) > 0:
        print(
            f'WARNING: values {not_supported} in column "{lang_col}" are not supported by NMT model.',
            'Texts with these values will not be translated.'
        )
    # now update language mapping with "correct" language codes (use ISO code if available, otherwise use original indicator from the data frame)
    langs_map = {
        l: c if c in supported_languages else l if l in supported_languages else None 
        for l, c in langs_map.items()
    }

    # create new column for translation
    df[target_col] = [None]*len(df)

    # iterate over languages
    for l, d in df.groupby(lang_col):
        lang_code = langs_map[l]
        # just copy texts if source language is the target language
        if lang_code == target_language or l == target_language:
            df.loc[d.index, target_col] = d[text_col].tolist()
            continue
        # skip unsupported languages
        if l in not_supported or lang_code is None:
            continue
        # test for each text value if non-empty string
        flag = is_nonempty_string(d[text_col])
        if any(~flag):
            print(f'WARNING: {sum(~flag)} empty or non-string text(s) in "{l}"')
        df.loc[d.index[flag], target_col] = translate_in_batches(
            texts=d[text_col][flag].tolist(), # <== only translate non-empty texts
            translation_fun=translation_function,
            device=device,
            batch_size=batch_size, 
            source_lang=lang_code, 
            target_lang=target_language,
            verbose=verbose, 
            pbar_desc=f'translating {len(d)} text(s) from "{l}"',
            **kwargs
        )
    
    return df

In [None]:
from types import SimpleNamespace

def translate_df_with_easynmt(df, **kwargs):
    """
    Translates a DataFrame using the EasyNMT model.

    Args:
        df (pandas.DataFrame): The DataFrame to be translated.
        args: Additional arguments for the translation process.

    Returns:
        pandas.DataFrame: The translated DataFrame.
    """
    args = SimpleNamespace(**kwargs)

    try:
        device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'
        model = easynmt.EasyNMT(args.model_name, device=device)
        print(f'Using device "{model.device}"')
    except Exception as e:
        print(f'WARNING: could not load model "{args.model_name}"')
        raise e
    
    tgt_lang = [l.lower() for l in model.get_languages() if args.target_language.lower() == l.lower() or args.target_language.lower() in l.lower()]
    if len(tgt_lang) == 0:
        raise ValueError(f'Target language "{args.target_language}" not supported by DeepL.')
    if len(tgt_lang) > 1:
        raise ValueError(f'Target language "{args.target_language}" ambiguous. Please specify one of {tgt_lang}.')
    tgt_lang = tgt_lang[0]
    src_langs = model.get_languages(target_lang = tgt_lang)

    df = df.copy(deep=True)
    
    try:
        df = translate_df(
            df=df, 
            # data frame arguments
            text_col=args.text_col,
            lang_col=args.lang_col,
            target_language=tgt_lang, 
            target_col=args.target_col if hasattr(args, 'target_col') else f'{args.text_col}_mt_{args.model_name.lower()}',
            # translation model arguments
            translation_function=model.translate,
            supported_languages=src_langs,
            batch_size=args.batch_size if hasattr(args, 'batch_size') else 16,
            device=model.device,
            # arguments forwarded to model.translate()
            beam_size=5,
            perform_sentence_splitting=False,
            show_progress_bar=False, 
            # print progress bar
            verbose=args.verbose,
        )
    except Exception as e:
        print(f'WARNING: Error during translation "{str(e)}". Returning data frame with translations so far.')
    
    return df

In [None]:
import deepl

def translate_df_with_deepl(df, **kwargs):
    args = SimpleNamespace(**kwargs)

    # get API key
    try:
        with open(args.api_key_file) as f:
            api_key = f.read().strip()
    except Exception as e:
        raise ValueError(f'Could not load API key file "{args.api_key_file}". Reason: {str(e)}')
        
    # initialize a `Translator` instance
    try:
        translator = deepl.Translator(api_key)
    except Exception as e:
        raise ValueError(f'Could not connect to DeepL API. Reason: {str(e)}')

    # get source and target languages
    src_langs = [l.code.lower() for l in translator.get_source_languages()]
    tgt_lang = [l.code.lower() for l in translator.get_target_languages() if args.target_language.lower() == l.code.lower() or args.target_language.lower() in l.code.lower()]
    if len(tgt_lang) == 0:
        raise ValueError(f'Target language "{args.target_language}" not supported by DeepL.')
    if len(tgt_lang) > 1:
        raise ValueError(f'Target language "{args.target_language}" ambiguous. Please specify one of {tgt_lang}.')
    tgt_lang = tgt_lang[0]
    
    df = df.copy(deep=True)
    tgt_col = f'{args.text_col}_mt_deepl'
    
    # translate
    try:
        df = translate_df(
            df=df, 
            # data frame arguments
            text_col=args.text_col,
            lang_col=args.lang_col,
            target_language=tgt_lang,
            target_col=args.target_col if hasattr(args, 'target_col') else tgt_col,
            # translation model arguments
            translation_function=translator.translate_text,
            supported_languages=src_langs,
            batch_size=args.batch_size if hasattr(args, 'batch_size') else 128,
            # arguments forwarded to translator.translate_text()
            split_sentences='off' if not hasattr(args, 'split_sentences') else 'on' if args.split_sentences else 'off',
            # print progress bar
            verbose=args.verbose,
        )
    except Exception as e:
        print(f'WARNING: Error during translation "{str(e)}". Returning data frame with translations so far.')
    
    try:
        # post-process translation result
        df[tgt_col] = df[tgt_col].apply(lambda x: x if isinstance(x, str) else x.text if x is not None else None)
    except Exception as e:
        print(f'WARNING: Error during post-processing "{str(e)}". Returning data frame with translations so far.')
    
    return df

In [None]:
from google.oauth2 import service_account
from google.cloud import translate_v2 as gt

def translate_df_with_google(df, **kwargs):
    args = SimpleNamespace(**kwargs)

    # get API key
    try:
        credentials = service_account.Credentials.from_service_account_file(args.api_key_file)
    except Exception as e:
        raise ValueError(f'Could not load API key file "{args.api_key_file}". Reason: {str(e)}')
    
    # initialize a `translator` instance
    try:
        translator = gt.Client(credentials=credentials)
    except Exception as e:
        raise ValueError(f'Could not connect to Google Cloud Translation API. Reason: {str(e)}')
    
    # get source and target languages
    src_langs = [l['language']  for l in translator.get_languages()]
    tgt_lang = [l.lower() for l in src_langs if args.target_language.lower() == l.lower() or args.target_language.lower() in l.lower()]
    if len(tgt_lang) == 0:
        raise ValueError(f'Target language "{args.target_language}" not supported by Google Cloud Trsanslation API.')
    if len(tgt_lang) > 1:
        raise ValueError(f'Target language "{args.target_language}" ambiguous. Please specify one of {tgt_lang}.')
    tgt_lang = tgt_lang[0]
    
    df = df.copy(deep=True)
    tgt_col = f'{args.text_col}_mt_google'
    
    def translate_util(values, target_lang, source_lang, **kwargs):
        return translator.translate(values=values, target_language=target_lang, source_language=source_lang, **kwargs)

    # translate
    try:
        df = translate_df(
            df=df, 
            # data frame arguments
            text_col=args.text_col,
            lang_col=args.lang_col,
            target_language=tgt_lang,
            target_col=args.target_col if hasattr(args, 'target_col') else tgt_col,
            # translation model arguments
            translation_function=translate_util,
            supported_languages=src_langs,
            batch_size=args.batch_size if hasattr(args, 'batch_size') else 128,
            # print progress bar
            verbose=args.verbose,
        )
    except Exception as e:
        print(f'WARNING: Error during translation "{str(e)}". Returning data frame with translations so far.')
    
    try:
        # post-process translation result
        df[tgt_col] = df[tgt_col].apply(lambda x: x if isinstance(x, str) else x['translatedText'] if x is not None else None)
    except Exception as e:
        print(f'WARNING: Error during post-processing "{str(e)}". Returning data frame with translations so far.')
    
    return df

## Translate

### Commercial services

**WARNING:** If you run this code, you'll spend ~40 Dollars!

In [None]:
# translate
out = translate_df_with_google(
    df=df, 
    api_key_file=os.path.join(os.environ['SPATH'], 'multilingual-gesis-translate.json'),
    # data frame arguments
    text_col='text',
    lang_col='lang',
    target_language='en', 
    target_col='text_mt_google',
    batch_size=128,
    # print progress bar
    verbose=True,
)

In [None]:
out = translate_df_with_deepl(
    df=out, 
    api_key_file=os.path.join(os.environ['SPATH'], 'deepl'),
    # data frame arguments
    text_col='text',
    lang_col='lang',
    target_language='en-gb', 
    target_col='text_mt_deepl',
    # print progress bar
    verbose=True,
)

And now, let's save some money but spend some time!

In [None]:
out = translate_df_with_easynmt(
    df=out, 
    model_name='m2m_100_418M',
    # data frame arguments
    text_col='text',
    lang_col='lang',
    target_language='en', 
    target_col='text_mt_m2m',
    batch_size=8, # <== use small batch size for illustration 
    # arguments forwarded to model.translate()
    beam_size=5,
    perform_sentence_splitting=False,
    show_progress_bar=False, 
    # print progress bar
    verbose=True,
)

In [None]:
out = translate_df_with_easynmt(
    df=out, 
    model_name='opus-mt',
    # data frame arguments
    text_col='text',
    lang_col='lang',
    target_language='en', 
    target_col='text_mt_opus',
    batch_size=8, # <== use small batch size for illustration 
    # arguments forwarded to model.translate()
    beam_size=5,
    perform_sentence_splitting=False,
    show_progress_bar=False, 
    # print progress bar
    verbose=True,
)

In [29]:
# out.to_csv(fp.replace('.tsv', '_translated.tsv'), sep='\t', index=False)
# the resulting file is at https://raw.githubusercontent.com/fabiennelind/Going-Cross-Lingual_Course/main/data/lehmann%2Bzobel_2018_pimpo_positions_translated.tsv