In [None]:
import requests
from datetime import datetime, timezone
from transformers import pipeline
from tqdm import tqdm
import re
import emoji
import warnings
import logging
logging.getLogger("transformers.modeling_utils").setLevel(logging.ERROR)
warnings.filterwarnings("ignore", category=UserWarning, module="tqdm")
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

In [4]:
ES_HOST = "https://127.0.0.1:9200"
SOURCE_INDEX = "moutputdata"
DEST_INDEX = "sentiment_status"
auth = ('elastic', 'elastic')
headers = {'Content-Type': 'application/json'}

In [5]:
classifier = pipeline("zero-shot-classification",
                      model="joeddav/xlm-roberta-large-xnli",
                      device="cpu")

labels = ["support tariff", "oppose tariff", "neutral"]

Device set to use cpu


In [6]:
def text_preprocessing(text):
    if not isinstance(text, str):
        return ""
    # remove URLs
    text = re.sub(r'http\S+|www\S+', '', text)
    # remove HTML tags
    text = re.sub(r'<.*?>', '', text)
    # remove Markdown images and links
    text = re.sub(r'!\[.*?\]\(.*?\)', '', text)
    text = re.sub(r'\[.*?\]\(.*?\)', '', text)
    # keep hashtag words, remove the "#" symbol
    text = re.sub(r'#(\w+)', r'\1', text)
    # remove @someone
    text = re.sub(r'@\w+', '', text)
    # convert emojis to text descriptions
    text = emoji.demojize(text)
    # normalize line breaks
    text = text.replace('\n', ' ').replace('\r', ' ')
    # convert multiple whitespace into one space
    text = re.sub(r'\s+', ' ', text)
    # remove extra whitespace from start and end of text
    return text.strip()

In [7]:
def get_incremental_data(last_time, now):
    query = {
        "query": {
            "range": {
                "metadata.created_time": {
                    "gte": last_time,
                    "lt": now
                }
            }
        },
        "size": 1000,
        "sort": ["_doc"]
    }

    url = f"{ES_HOST}/{SOURCE_INDEX}/_search?scroll=2m"
    response = requests.post(url, headers=headers, auth=auth, json=query, verify=False).json()

    scroll_id = response.get("_scroll_id")
    all_docs = response["hits"]["hits"]

    while scroll_id:
        scroll_payload = {"scroll": "2m", "scroll_id": scroll_id}
        scroll_response = requests.post(f"{ES_HOST}/_search/scroll", headers=headers, auth=auth, json=scroll_payload, verify=False).json()
        hits = scroll_response["hits"]["hits"]
        if not hits:
            break
        all_docs.extend(hits)
        scroll_id = scroll_response.get("_scroll_id")

    return all_docs

def doc_exists(index, doc_id):
    url = f"{ES_HOST}/{index}/_doc/{doc_id}"
    resp = requests.head(url, auth=auth, headers=headers, verify=False)
    return resp.status_code == 200

In [8]:
def run_sentiment_pipeline():

    try:
        with open("last_time.txt", "r") as f:
            last_time = f.read().strip()
    except FileNotFoundError:
        last_time = "1900-01-01T00:00:00Z"

    now = datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z')
    docs = get_incremental_data(last_time, now)

    for item in tqdm(docs):

        _id = item["_id"]

        if doc_exists(DEST_INDEX, _id):
            continue

        source = item.get("_source", {})
        text = source.get("content", {}).get("text", "")
        cleaned = text_preprocessing(text)

        if not cleaned:
            sentiment = {
                "label": None,
                "score": None,
            }

        else:
            result = classifier(cleaned, candidate_labels=labels)
            sentiment = {
                "label": result["labels"][0],
                "score": round(result["scores"][0], 6),
            }

        body = {
            "sentiment": sentiment,
            "processed_at": now
        }

        insert_url = f"{ES_HOST}/{DEST_INDEX}/_doc/{_id}"
        resp = requests.put(insert_url, json=body, auth=auth, headers=headers, verify=False)

        if resp.status_code not in [200, 201]:
            print(f"Insert failed for id {_id}: Status code {resp.status_code}, Response: {resp.text}")

    with open("last_time.txt", "w") as f:
        f.write(now)

In [9]:
if __name__ == "__main__":
    run_sentiment_pipeline()

0it [00:00, ?it/s]
