In [1]:
!pip install pinecone-client==3.1.0 \
  pinecone-datasets==0.7.0 \
  sentence-transformers==2.2.2 \
  pinecone-notebooks==0.1.1 \
  datasets \
  bert_score



In [2]:
import requests
from pinecone import Pinecone, ServerlessSpec
from transformers import AutoTokenizer, AutoModel
import torch

# Pinecone Initialization
pc = Pinecone(api_key="9214c316-cd0b-461d-8851-618d9ccfb69a")
index = pc.Index("arabic-dialect-translation")


# Load embedding model
tokenizer = AutoTokenizer.from_pretrained("intfloat/e5-base")
model = AutoModel.from_pretrained("intfloat/e5-base")

def generate_embedding(text):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings.squeeze().tolist()

def query_pinecone(query_text, top_k=20):
    query_embedding = generate_embedding(query_text)
    results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True)
    return results

def generate_prompt(query_text, pinecone_results):
  """
    Generates a prompt that includes retrieved examples and few-shot examples.

    Args:
    - query_text (str): The input query in dialectal Arabic.
    - pinecone_results (list): Retrieved relevant examples from Pinecone.

    Returns:
    - str: A formatted prompt to guide the model's response.
  """
  # few_shot_examples = """
  #   - Sentence: "وينك؟ ما شفناك من زمان"
  #     Dialect: [Gulf]
  #     Translation: "أين أنت؟ لم نرك منذ فترة طويلة"

  #   - Sentence: "شو عم تعمل؟ ما شفتك من زمان"
  #     Dialect: [Levantine]
  #     Translation: "ماذا تفعل؟ لم أرك منذ فترة طويلة"

  #   - Sentence: "واش راك؟ ما شفتكش من مدة "
  #     Dialect: [North Africa]
  #     Translation: "كيف حالك؟ لم أرك منذ فترة "

  #   - Sentence: "مّا وبّا ... شحال من عام و هوما يحّجرو ( يْلَّمو ) ف الدراهم باش ينجمو يشرو ديك الماشينة ، و ف النهار اللول اللي كانو غادي يخدمو بيها ، حتى حّنّا ( الجدة ) كانت معروضة باش تجي تشوف الماشينة .  . "
  #     Dialect: [North Africa]
  #     Translation: "كان والدي ووالدتي يدخرون المال لهذا اليوم منذ سنوات لكي يستطيعوا شراء هذه الغسالة وفي اول يوم لاستخدامها دعيت جدتي لحضور ذلك الحدث لكي ترى هذه الآلة  "

  #   - Sentence: "إزيّك؟ ما شفتكش بقالك كتير"
  #     Dialect: [Egyptian]
  #     Translation: "كيف حالك؟ لم أرك منذ فترة طويلة"

  #   - Sentence: "كيف حالك؟ لم أرك منذ فترة طويلة"
  #     Dialect: [Stand arabic]
  #     Translation: "كيف حالك؟ لم أرك منذ فترة طويلة"

  #   """


  retrieved_examples = "\n".join(
      [f"{i+1}. \"{match['metadata']['text']}\" - Dialect: [{match['metadata']['label']}], Translation: \"{match['metadata']['translation']}\""
      for i, match in enumerate(pinecone_results['matches'])]
      )

  #The following are examples of Arabic dialect sentences with their dialect name and their translations in standard Arabic:

  # Your are dialect translator assistant you have the following examples of Arabic dialect sentences and their translations in standard Arabic:
  # {few_shot_examples}

  prompt = f"""
  Your are dialect translator assistant Based on the following retrieved examples:
  {retrieved_examples}

  Identify the dialect and provide the translation for the following sentence to only and only standard Arabic with giving its dialect:

  Sentence: "{query_text}"
  """

  return prompt.strip()

def parse_response(response):
    """
    Parses the model's response and returns a well-organized string with non-empty components.

    Args:
    - response (str): The model's response.

    Returns:
    - str: An organized string with only the non-empty components (Sentence, Dialect, Translation).
    """
    # Initialize components as empty strings
    sentence, dialect, translation = "", "", ""

    # Split response into lines
    lines = response.strip().split("\n")

    # Iterate through each line to find specific components
    for line in lines:
        if "Sentence:" in line:
            sentence = line.split(":", 1)[1].strip('" ').strip()
        elif "Dialect:" in line:
            dialect = line.split(":", 1)[1].strip("[] ").strip()
        elif "Translation:" in line:
            translation = line.split(":", 1)[1].strip('" ').strip()

    # Build the organized response string with non-empty components
    response = ""
    if sentence:
        response += f"Sentence: \"{sentence}\"\n"
    if dialect:
        response += f"Dialect: [{dialect}]\n"
    if translation:
        response += f"Translation: \"{translation}\""

    return response.strip()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [3]:
API_KEY="xK7NstzjTVVcAmjsI4OomLGRkt9a7vZUO9sChIVBWXLs"
ENDPOINT = "https://eu-de.ml.cloud.ibm.com/ml/v1/text/generation?version=2023-05-29"

In [4]:
import sys
import requests
from requests.exceptions import RequestException

def get_access_token():
    token_url = "https://iam.cloud.ibm.com/identity/token"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {
            "grant_type": "urn:ibm:params:oauth:grant-type:apikey",
            "apikey": API_KEY
    }
    response = requests.post(token_url, headers=headers, data=data)

    if response.status_code==200:
        return response.json()["access_token"]

    else:
        print(f"Error obtaining access token: {response.text}")

ACCESS_TOKEN = get_access_token()
print(ACCESS_TOKEN)

eyJraWQiOiIyMDI0MTEwMTA4NDIiLCJhbGciOiJSUzI1NiJ9.eyJpYW1faWQiOiJJQk1pZC02OTUwMDBLNFdLIiwiaWQiOiJJQk1pZC02OTUwMDBLNFdLIiwicmVhbG1pZCI6IklCTWlkIiwianRpIjoiN2NjNjk0MTQtOWY4MC00MzkzLWEwMDAtYThkYjk0NzMyMDdhIiwiaWRlbnRpZmllciI6IjY5NTAwMEs0V0siLCJnaXZlbl9uYW1lIjoiU2FuYSIsImZhbWlseV9uYW1lIjoiQk9VQ0hFVEEiLCJuYW1lIjoiU2FuYSBCT1VDSEVUQSIsImVtYWlsIjoic2FuYS5ib3VjaGV0YUBnbWFpbC5jb20iLCJzdWIiOiJzYW5hLmJvdWNoZXRhQGdtYWlsLmNvbSIsImF1dGhuIjp7InN1YiI6InNhbmEuYm91Y2hldGFAZ21haWwuY29tIiwiaWFtX2lkIjoiSUJNaWQtNjk1MDAwSzRXSyIsIm5hbWUiOiJTYW5hIEJPVUNIRVRBIiwiZ2l2ZW5fbmFtZSI6IlNhbmEiLCJmYW1pbHlfbmFtZSI6IkJPVUNIRVRBIiwiZW1haWwiOiJzYW5hLmJvdWNoZXRhQGdtYWlsLmNvbSJ9LCJhY2NvdW50Ijp7InZhbGlkIjp0cnVlLCJic3MiOiJmNDYzN2YzYjcwNzc0MDhmYjFlMTM1ZTQxYzY3OTY0MyIsImltc191c2VyX2lkIjoiMTI2Nzc5MjUiLCJmcm96ZW4iOnRydWUsImltcyI6IjI3NDY5ODYifSwiaWF0IjoxNzMxMTM4NDI2LCJleHAiOjE3MzExNDIwMjYsImlzcyI6Imh0dHBzOi8vaWFtLmNsb3VkLmlibS5jb20vaWRlbnRpdHkiLCJncmFudF90eXBlIjoidXJuOmlibTpwYXJhbXM6b2F1dGg6Z3JhbnQtdHlwZTphcGlrZXkiLCJzY29wZSI6ImlibSB

In [5]:
def query_watsonx(prompt):
    headers = {
        "Authorization": f"Bearer {ACCESS_TOKEN}",
        "Content-Type": "application/json"
    }
    body = {
            "input": prompt,
            "parameters": {
                "decoding_method": "greedy",
                "max_new_tokens": 900,
                "repetition_penalty": 1.05,
            },
            "model_id": "sdaia/allam-1-13b-instruct",
            "project_id": "4d029c5c-71f8-459d-aa75-19f113997547"
        }

    response = requests.post(ENDPOINT, headers=headers, json=body)
    if response.json().get("results"):
        return response.json().get("results")[0].get("generated_text")
    else:
        return "No response available."


In [6]:
def process_query(query_text):
    pinecone_results = query_pinecone(query_text)
    prompt = generate_prompt(query_text, pinecone_results)
    response = query_watsonx(prompt)
    return  parse_response(response)


In [7]:
response = process_query("عمري ما سامع بهيج عنوان بهل المنطقة")
print(response)


Dialect: [Levantine]
Translation: "لم أسمع بهذا العنوان في هذه المنطقة من قبل"


## **Telegram Integration**

In [8]:
!pip install python-telegram-bot --upgrade




In [16]:
from telegram import Update, Bot
from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, ContextTypes, filters
import re


# Translation function using your model
def translate_message(message):
    query_text = message
    response = process_query(query_text)
    return response


# Command handler: Start
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
      await update.message.reply_text("Hello! Send me a message privately or mention me in a group to translate sentences.")

# Message handler for private chat
async def handle_private_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    print("HERE private")
    user_message = update.message.text
    translation = translate_message(user_message)
    await update.message.reply_text(f"\n{translation}")


# Message handler for group mentions
async def handle_group_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    print("HERE 1")
    message_text = update.message.text

    # Extract message text after bot mention
    mention_pattern = f"@{context.bot.username} (.+)"
    match = re.search(mention_pattern, message_text)

    if match:
        original_message = match.group(1)
        translation = translate_message(original_message)
        await update.message.reply_text(f"Translation:\n{translation}")
    else:
        await update.message.reply_text("Please mention me with a valid message like: @AllamDialectTranslatorBot <message> to translate.")





# Main function to set up the bot
async def main():
    bot_token = '7888741463:AAFYT1ItE5dHlSxgPoO00la9bzzjRKpOD8E'

    app = ApplicationBuilder().token(bot_token).build()

    # Command handlers
    app.add_handler(CommandHandler("start", start))

    # Message handler for private messages
    app.add_handler(MessageHandler(filters.TEXT & filters.ChatType.PRIVATE, handle_private_message))

    # Message handler for mentions in group
    app.add_handler(MessageHandler(filters.TEXT & filters.ChatType.GROUP, handle_group_message))

    app.add_handler(MessageHandler(filters.TEXT, log_all_messages))

    # Start the bot
    await app.run_polling()

async def log_all_messages(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    print(f"Received message: {update.message.text}")



if __name__ == "__main__":
    import nest_asyncio
    import asyncio

    # Apply the patch to allow nested event loops
    nest_asyncio.apply()

    asyncio.run(main())



Received message: @AllamDialectTranslatorBot
Received message: كيراكي راني
Received message: هلا بيك كيف حوالك
Received message: @AllamDialectTranslatorBot


RuntimeError: Cannot close a running event loop