# Language Detection Pipeline Preparation

During ETL pipeline preparation, some messages were not in English.
Experiment with CHATGPT to detect the language of each message and translate it to English

Note: CHATGPT is a paid service, and takes a long time to translate.
Just for general experimentation, only a sample of the data will be converted
Completed translations will be stored in json format in `../data/translations/batch_job_results_results.jsonl` and also written to SQLITE database as `message_language`

Finally, messages database will be updated with improved translated messages.

### 1. Import libraries and load datasets

In [1]:
# import libraries
import pandas as pd
from sqlalchemy import create_engine
import os
from openai import OpenAI
import json
import numpy as np
import time
from src import config

import logging
logger = logging.getLogger(__name__)

# environment settings
pd.set_option('display.max_column', 400)
pd.set_option('display.max_colwidth', 400)

In [2]:
import src.config
import importlib
importlib.reload(src.config)
from src import config

In [3]:
# activate logging
logging.basicConfig(filename=config.path_log_translation,
                    format='%(asctime)s %(levelname)-8s %(message)s',
                    filemode='w',
                    level=logging.INFO,
                    datefmt='%Y-%m-%d %H:%M:%S')

In [4]:
# load data from database created during ETL pipeline preparation
engine = create_engine(config.path_database)
conn = engine.connect()
df = pd.read_sql('select * from messages', con=conn, index_col='id')
df.head()

Unnamed: 0_level_0,message,genre,related,request,offer,aid_related,medical_help,medical_products,search_and_rescue,security,military,water,food,shelter,clothing,money,missing_people,refugees,death,other_aid,infrastructure_related,transport,buildings,electricity,tools,hospitals,shops,aid_centers,other_infrastructure,weather_related,floods,storm,fire,earthquake,cold,other_weather,direct_report
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1
2,Weather update - a cold front from Cuba that could pass over Haiti,direct,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
7,Is the Hurricane over or is it not over,direct,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,1,0,1,0,0,0,0,0
8,Looking for someone but no name,direct,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
9,UN reports Leogane 80-90 destroyed. Only Hospital St. Croix functioning. Needs supplies desperately.,direct,1,1,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,1,1,0,1,0,0,1,0,0,0,0,0,0,0,0,0,0,0
12,"says: west side of Haiti, rest of the country today and tonight",direct,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 26179 entries, 2 to 30265
Data columns (total 37 columns):
 #   Column                  Non-Null Count  Dtype 
---  ------                  --------------  ----- 
 0   message                 26179 non-null  object
 1   genre                   26179 non-null  object
 2   related                 26179 non-null  int64 
 3   request                 26179 non-null  int64 
 4   offer                   26179 non-null  int64 
 5   aid_related             26179 non-null  int64 
 6   medical_help            26179 non-null  int64 
 7   medical_products        26179 non-null  int64 
 8   search_and_rescue       26179 non-null  int64 
 9   security                26179 non-null  int64 
 10  military                26179 non-null  int64 
 11  water                   26179 non-null  int64 
 12  food                    26179 non-null  int64 
 13  shelter                 26179 non-null  int64 
 14  clothing                26179 non-null  int64 
 15  money  

In [6]:
# try to read message where language detection was already executed
df_language = pd.DataFrame()
try:
    df_language = pd.read_sql('select * from message_language',
                              con=conn,
                              index_col='id',
                              dtype={'is_english': 'boolean'})
finally:
    pass

df_language.shape

(15400, 2)

In [7]:
df_language.info()

<class 'pandas.core.frame.DataFrame'>
Index: 15400 entries, 2 to 18213
Data columns (total 2 columns):
 #   Column       Non-Null Count  Dtype  
---  ------       --------------  -----  
 0   is_english   15400 non-null  boolean
 1   translation  15400 non-null  object 
dtypes: boolean(1), object(1)
memory usage: 270.7+ KB


In [8]:
df_language[df_language['is_english'] == False][:5]

Unnamed: 0_level_0,is_english,translation
id,Unnamed: 1_level_1,Unnamed: 2_level_1
146,False,"In the Saint Etienne area, the road to Jacmel is blocked, it is very difficult to get to Jacmel"
176,False,"In what field would you need me to speak Creole, French, half English?"
342,False,"I need some food. Thanks for your understanding. (Translator's note: are we getting the full messages? Most of them appear to be bits of sentences, like this one)"
383,False,"I need help in Creole. Contact me quickly, quickly via SMS or call, it's urgent. It is very important because today I'm going to do what I have to do over the barriers, I'm already waiting."
399,False,Don't know what to do about the staff routing part at the end


### 2. Set API key and connect to OPENAI

In [9]:
# Setting the API key to use OPEN AI models
openai_api_key = os.environ.get('OPENAI_API_KEY')
client = OpenAI(api_key=openai_api_key)

### 3. Build the API logic in realtime

Detect if messages are in English, and if not, translate them to English

In [10]:
# Identify messages that was not the checked for language yet
df_remaining = df.merge(df_language, on='id', how='left', indicator=True)
df_remaining = df_remaining[df_remaining['_merge'] == "left_only"]
df_remaining.head()

Unnamed: 0_level_0,message,genre,related,request,offer,aid_related,medical_help,medical_products,search_and_rescue,security,military,water,food,shelter,clothing,money,missing_people,refugees,death,other_aid,infrastructure_related,transport,buildings,electricity,tools,hospitals,shops,aid_centers,other_infrastructure,weather_related,floods,storm,fire,earthquake,cold,other_weather,direct_report,is_english,translation,_merge
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1
18214,Télécoms Sans Frontières is also a working group member of the United Nations emergency telecoms body (WGET) and a member of the International Council of Voluntary Agencies (ICVA).,news,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,left_only
18215,"UNICEF is complementing DFID's contribution with =A31,150,000 from its own resources and voluntary contributions.",news,1,0,0,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,left_only
18216,"The Government of Iran has indicated that it will support a number of significant road construction projects inside Afghanistan, including from Islam Qala to Herat and from Milak (on the Iranian-Nimroz border) to Dilaram city (the latter route will shorten the transportation of goods from Bandar Abbas to Kandahar by some 700km.)",news,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,left_only
18217,"Though the population of these islands is relatively small, children in Samoa and Tonga are at risk of respiratory disease, measles and tetanus, as vaccination rates in many communities are low.",news,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,left_only
18218,"With widespread crop failure and food price hikes, the situation will become extremely serious unless emergency measures are put in place immediately.""",news,1,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,,,left_only


In [11]:
df_remaining.shape

(10779, 40)

# Build OPENAI API request
Testing OPENAI response to see if it's working for a few messages first

In [12]:
system_prompt = """ 
  You will be provided with text about disaster responses.
  Step 1: Detect of the text is in English. Return options 'True' if the sentence is in English or 'False' if the sentence is not in English as isEnglish boolean variable
  Step 2: If sentence is not in English, translate it to English and return as text in json format
  
  Example: 'I need food' // isEnglish: True
  Example2: 'Vandaag is het zonnig'' // isEnglish: False, Translation: 'Today it is Sunny'
  """

In [13]:
# testing OPENAI response to see if it's working for a few messages first
# detect if all texts are in English, and if not translate them
mydict = {}

for idx, text in df_remaining['message'][:2].items():

  response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
      {
        "role": "system",
        "content": system_prompt,
        "temperature":0.1,
        "response_format": {
              "type": "json_object"
          },   
      },
      {
        "role": "user",
        # "content": text,
        "content": 'Vandaag is het zonnig'   
        # "content": 'I love Wilson and Maya' ,
      }
    ],
    temperature=0.1,
    top_p=1
  )

  mydict[idx] = response.choices[0].message.content
  
print(mydict)

{18214: '```json\n{\n  "isEnglish": False,\n  "Translation": "Today it is sunny"\n}\n```', 18215: '```json\n{\n  "isEnglish": False,\n  "Translation": "Today it is sunny"\n}\n```'}


### 4. Once we are happy with API response, let's kick off translation in BATCHES
Create multiple batch jobs on open ai platform, which will complete within 24hours. 
Due to limitations on openai, only 90000 tokens can be processed in a single batch
Running API requests are significantly cheaper but takes longer

**Important:**
The message id will be become the index, and is the main identifier of the translation

In [14]:
# Creating an array of json tasks for the next 2000 texts messages
start = 0
end = 2000
interval = 400
next = start + interval

while next <= end:
    
    # create an array of json tasks for each batch job
    tasks = []
    for index, text in df_remaining['message'][start:next].items():
    
        task = {
            "custom_id": f"task-{index}",
            "method":"POST",
            "url":"/v1/chat/completions",
            "body": {
              # This is what you would have in your Chat Completions API call
              "model":"gpt-4-turbo",
              "temperature":0.1,
              "response_format": {
                  "type": "json_object"
              },
              "messages": [
                {
                  "role": "system",
                  "content": system_prompt,
                },
                {
                  "role": "user",
                  "content": text,
                },
              ]
            }
          }
        
        tasks.append(task)
        
    # create json file and save it locally
    with open(config.path_translation_json_batchjob, 'w') as file:
        for obj in tasks:
            file.write(json.dumps(obj) + '\n') 
            
    # Uploading json file to openai platform
    batch_file = client.files.create(
        file=open(config.path_translation_json_batchjob, 'rb'),
        purpose='batch'
    )    
    
    # Creating the batch job on openai
    batch_job = client.batches.create(
        input_file_id=batch_file.id,
        endpoint="/v1/chat/completions",
        completion_window="24h"
    )    
    
    print('Batch submitted {} for records {}-{}'.format(batch_job.id, start, next))
    logger.info('Batch submitted {} for records {}-{}'.format(batch_job.id, start, next))
    
    # Check status of batch job running on openai platform
    print('Waiting for batch to start, go to sleep 5 minutes')
    time.sleep(300)
    batch_job = client.batches.retrieve(batch_job.id)
    print('Batch {}" status {}'.format(batch_job.id, batch_job.status))
    logger.info('Batch {}" status {}'.format(batch_job.id, batch_job.status))
    
    # wait for batch to complete before starting the next batch job. (CHATGPT does not allow multiple batches to run in parallel)
    while batch_job.status in ['in_progress', 'validating', 'finalizing']:
        print('Batch {} still running - going to sleep for 5 minutes'.format(batch_job.id))
        logger.info('Batch {} still running - going to sleep for 5 minutes'.format(batch_job.id))
        time.sleep(300)
        batch_job = client.batches.retrieve(batch_job.id)
    
    # when batch is completed, set counters to kick off the next batch job of 400 requests   
    if not batch_job.status == 'failed':
        start = start + interval
        next = next + interval

Batch submitted batch_9KmEZ7TTk0dUIORNsdQoI26d for records 0-400
Waiting for batch to start, go to sleep 5 minutes


KeyboardInterrupt: 

### 5. Load and Analyze API results

In [None]:
# get all batch jobs that were submitted from openai
# batch_jobs = []
batch_jobs = client.batches.list(limit=100)
print('Number of batch jobs retrieved: {}'.format(len(batch_jobs.data)))

In [None]:
# select batches to process
batches = []
for batch in batch_jobs.data:
   if ((batch.status == 'completed') & 
        (batch.request_counts.failed == 0) & 
        (batch.request_counts.total > 10)):
            batches.append(batch.id)
            print(batch.id, batch.status, batch.request_counts.completed, batch.request_counts.failed) 

In [None]:
# Download batch content from OPENAI and consolidate all api results locally into a json file
# OPENAI will delete files from batches after 30 days, so we might not be able to retrieve old content
# file batch_job_resuts will still contain all those old details so don't delete it, just append

# first clear the file if it exists
# open(config.path_translation_json_batchjob_result, 'w').close()

# append contents of all batches to local json results file
for batch in batches:
    batch_job = client.batches.retrieve(batch) 
    try:        
        result = client.files.content(batch_job.output_file_id).content
        with open(config.path_translation_json_batchjob_result, 'ab') as file:
            file.write(result)   
    except:
        # print('Batch file {} output already deleted on openai'.format(batch))
        logger.info('Batch file {} output already deleted on openai'.format(batch))
        

In [None]:
# Loading all json api data from locally saved json file
results = []
with open(config.path_translation_json_batchjob_result, 'r') as file:
    for line in file:
        # Parsing the JSON string into a dict and appending to the list of results
        json_object = json.loads(line.replace('\n', '').strip())
        results.append(json_object)

In [None]:
# Reading only the first results as a test if API worked
for res in results[100:106]:
    task_id = res['custom_id']
    # Getting index from task id
    index = task_id.split('-')[-1]
    result = res['response']['body']['choices'][0]['message']['content']
    movie = df.loc[int(index)]
    description = movie['message']
    # title = movie['Series_Title']
    print(f"\nMESSAGE: {index}-{description}\n\nRESULT: {result}")
    print("\n----------------------------\n")

### 6. Load API results into dataframe
The API return 2 responses we are interested in:
1) A Boolean variable isEnglish to indicate of the message are in English or not
2) A translation in English if the message was in another language

Save both these responses in a separate dictionary and create a dataframe with 2 columns:
- is_english
- translation

In [None]:
# Load all responses into a dictionaries
isEnglishs = {}
translatedTexts = {}

for res in results:
    task_id = res['custom_id']
    # Get unique message id from task id
    index = int(task_id.split('-')[-1])
    # get response content and strip of new line indicators
    result = res['response']['body']['choices'][0]['message']['content']
    result = result.replace('\n', '').strip()
    result = result.replace('\t', '').strip()
    # get original message
    df_tmp = df.loc[index]
    description = df_tmp['message']
    translation = ''
    isEnglish = ''

    try:
        dict_object = json.loads(result)
        # isEnglish = bool(dict_object['isEnglish'])
        isEnglish = dict_object['isEnglish']
    except:
        pass

    try:
        translation = dict_object['Translation']       
    except:
        pass
    
    isEnglishs[index] = isEnglish
    translatedTexts[index] = translation
    

In [None]:
results[0]

## Clean translations and save to database

In [None]:
# create dataframe
data = {'is_english': isEnglishs,
        'translation': translatedTexts}

df_translation = pd.DataFrame.from_dict(data,
                                         orient='columns',
                                         )
df_translation.index.name = 'id'
df_translation.head()

In [None]:
df_translation.shape

In [None]:
# Add message for analysis
df_translated_enhanced = df_translation.merge(df[['message']], on='id', how='inner')
df_translated_enhanced.head()

In [None]:
df_translated_enhanced.shape

In [None]:
df_translated_enhanced.is_english.value_counts()

In [None]:
df_translated_enhanced.info()

In [None]:
# remove spaces
df_translated_enhanced['is_english'] = df_translated_enhanced['is_english'].map(lambda x: str(x).replace(' ', '').capitalize())
df_translated_enhanced.is_english.value_counts()

In [None]:
df_translated_enhanced.info()

In [None]:
# convert to boolean
df_translated_enhanced['is_english'] = df_translated_enhanced['is_english'].map(lambda x: True if str(x) == 'True' else False)
    
# df_translated_enhanced['is_english'] = np.where(df_translated_enhanced['is_english'].str.contains('True'), True, False)

In [None]:
df_translated_enhanced.is_english.value_counts()

In [None]:
df_translated_enhanced.info()

In [None]:
# how many texts were not in English ?
print('Messages not in English:', len(df_translated_enhanced[df_translated_enhanced.is_english == False]))
df_translated_enhanced[df_translated_enhanced.is_english == False].sample(10)

In [None]:
funny_texts = [
    'The text provided does not contain any recognizable language or meaningful content.',
    'This SMS is from another language.',
    'It is not necessary to translate this message because this message is sentimental.',
    'NOTES: I cannot translate this message because it is not complete.',
    'NOTES: this person wants to translate two words',
    'I already translated this one.',
    'NOTES: I already translated the message is not important to translate.'
]

df_translated_enhanced[df_translated_enhanced['translation'].isin(funny_texts)]

In [None]:
# if text is not marked as English ,but no translation was given, set the language back to English
error_index  = df_translated_enhanced[(df_translated_enhanced['is_english'] == False) & 
                                      (df_translated_enhanced['translation'].map(lambda x: len(x) == 0))].index
len(error_index)

In [None]:
df_translated_enhanced.loc[error_index]

In [None]:
# do we have duplicates ?
df_translated_enhanced.duplicated().sum()

In [None]:
# drop duplicated translations if it exists
df_translated_enhanced =  df_translated_enhanced[~df_translated_enhanced.index.duplicated(keep='first')]
df_translated_enhanced.shape

In [None]:
df_translated_enhanced = df_translated_enhanced.sort_index()
df_translated_enhanced.head()

### 8. Save the dataset to sqlite database.

In [None]:
df_translated_enhanced.info()

In [None]:
# make sure is_english contain only values True and False
print(df_translated_enhanced.is_english.unique())

In [None]:
# save to csv file

df_translation.to_csv(config.path_log_translation)

In [None]:
# Drop tmp column before we save it
df_translated_enhanced.drop(['message'], inplace=True, axis=1)
df_translated_enhanced.info()

In [None]:
# add to existing sqlite database
df_translated_enhanced.to_sql('message_language', engine, index=True, if_exists='replace')

### 9. Update messages dataset with translations and update SQLITE database

In [None]:
# load cleaned message data from database
engine = create_engine(config.path_database)
conn = engine.connect()
df = pd.read_sql('select * from messages', con=conn, index_col='id')
df.head()

In [None]:
df.info()

In [None]:
# read message language detection and translation to English, if it exists
df_language = pd.DataFrame()
try:
    df_language = pd.read_sql('select * from message_language',
                              con=conn,
                              index_col='id',
                              dtype={'is_english': 'boolean'}
                              )
finally:
    pass

df_language.info()

In [None]:
df_language.sort_index().head()

In [None]:
# merge dataframes
if len(df_language) > 0:
    df = df.merge(df_language, on='id', how='left' )
    
df.head()

In [None]:
df['original_message'] = df['message']

In [None]:
df.is_english.value_counts(dropna=False)

In [None]:
# For messages with no translation or language detection, set is_english = True
df['is_english'] = np.where(df['is_english'].isna(), True, df['is_english'])
df.is_english.value_counts(dropna=False)

In [None]:
# display some translations
df[df['is_english'] == False][['original_message', 'is_english', 'translation']].head(5)

In [None]:
# replace message with translation, if message is flagged as not being in English
df['message'] =  np.where((df['is_english'] == False) & (~df['translation'].isnull()),
                           df['translation'],
                           df['original_message'])

In [None]:
# make sure messages already in English are untouched
cols = ['original_message', 'is_english', 'translation', 'message']
df[df['is_english'] == True][cols].sample(5)

In [None]:
# check that messages not in English were replaced
df[df['is_english'] == False][cols].sample(10)

In [None]:
# finally lets drop the columns we no longer need
df = df.drop(['original_message', 'is_english', 'translation'], axis=1)

In [None]:
df.info()

In [None]:
df.to_sql('messages', engine, index=True, if_exists='replace')