In [60]:
import psycopg2

connection = psycopg2.connect(
    dbname="SentimentDB",
    user="Kuba",
    password="kuba",
    host="localhost",  # lub adres serwera
    port="5432"        # domyślny port PostgreSQL
)

create_table_query = """
CREATE TABLE tweets (
    id BIGINT PRIMARY KEY,          -- Kolumna ID jako klucz główny
    text TEXT NOT NULL,            -- Kolumna text
    tokens TEXT NOT NULL,        -- Kolumna tokens jako tablica tekstowa
    target INTEGER NOT NULL,       -- Kolumna target
    cnn_pred INTEGER NOT NULL,     -- Kolumna cnn_pred
    mistrall_pred Integer          -- Kolumna mistrall_pred
);
"""

cursor = connection.cursor()
try:
    cursor.execute(create_table_query)
    connection.commit()
    print("Tabela została utworzona pomyślnie.")
except Exception as e:
    print("Błąd podczas tworzenia tabeli:", e)
finally:
    cursor.close()
    connection.close()

Tabela została utworzona pomyślnie.


In [None]:
import pandas as pd
from sqlalchemy import create_engine

df = pd.read_csv('CNN_tweet_prediction.csv', sep=',', encoding='latin-1')
df['mistrall_pred'] = -1

engine = create_engine('postgresql://Kuba:kuba@localhost:5432/SentimentDB')

try:
    df.to_sql('tweets', engine, if_exists='append', index=False)
    print("Dane zostały załadowane pomyślnie.")
except Exception as e:
    print(f"Wystąpił błąd podczas ładowania danych: {e}")

Dane zostały załadowane pomyślnie.


In [124]:
from mistralai import Mistral
import pandas as pd


def process_batch(batch_texts, model, client, log = False):
    def format_tweets_for_prompt(tweets):
        formatted_tweets = "\n".join([f"Tweet {i + 1}: \"{tweet}\"" for i, tweet in enumerate(tweets)])
        return formatted_tweets
    messages = [
        {
            "role": "user",
            "content": f"""Classify the following {len(batch_texts)} tweets to determine if their sentiment is positive or negative. Only respond with the list of exact words 'positive' or 'negative', in the same order as the tweets are provided, separated by commas.
            I am providing you {len(batch_texts)} tweets, so you must return as a answear list which lenght is also {len(batch_texts)}.
            Tweets:
            {format_tweets_for_prompt(batch_texts)}"""
        },
    ]
    chat_response = client.chat.complete(
        model=model,
        messages=messages
    )
    response = chat_response.choices[0].message.content
    
    if log:
        print(f"[MIstrall AI] Message content: {messages[0]}")
        print(f"[MIstrall AI] response: {response}")
    
    sentiments = response.split(",")  # Podział odpowiedzi na listę
    if(len(sentiments) != len(batch_texts)):
        raise Exception("Mismatch between number of tweets and sentiments returned.")
    return [1 if s.strip().lower() == "positive" else 0 for s in sentiments]
    


Przetwarzanie danych:

In [125]:
import pandas as pd
from sqlalchemy import create_engine, text
import time

def process_and_update_data(engine, model, client, batch_size = 100, log = False):
    select_query = text("""
            SELECT * FROM tweets
            WHERE mistrall_pred = -1
            ORDER BY ID ASC
            LIMIT :batch_size;
        """)
    
    update_query = text("""
        UPDATE tweets
        SET mistrall_pred = :mistrall_pred
        WHERE id = :id;
    """)
    
    with engine.connect() as conn:
        while True:
            batch = pd.read_sql(select_query, conn, params={"batch_size": batch_size})

            if batch.empty:
                print("[DONE] ALL RECORD PROCESSED")
                break
            
            min_id = batch['id'].min()
            max_id = batch['id'].max()

            batch_texts = batch['text'].tolist()

            try:
                sentiments = process_batch(batch_texts, model, client, log)  # Przekazujemy 0 jako iter
                
                if sentiments is None:
                    raise Exception("[Mistrall AI] empty answear")
                    continue

                batch['mistrall_pred'] = sentiments

                for _, row in batch.iterrows():
                    if log:
                        print(f"Zapis do rekordu ID={row['id']}, mistrall_pred= {row['mistrall_pred']}")
                    conn.execute(update_query, {"mistrall_pred": row['mistrall_pred'], "id": row['id']})
                conn.commit()

                print(f"[DB-COMMIT] Processed and updated {len(batch)} records. IDs: [{min_id},{max_id}]")
                time.sleep(1.5)
            
            except Exception as e:
                print(f"[ERROR]: {e} Records: [{min_id},{max_id}] will be processed in next iteration")
                time.sleep(1.5)
                continue

In [126]:

DB_URL = 'postgresql://Kuba:kuba@localhost:5432/SentimentDB'
engine = create_engine(DB_URL)

api_key = "zdRaC8ptWSvAIBCmP84ImOAWfbY2ZIbj"
model = "open-mistral-nemo"
client = Mistral(api_key=api_key)

BATCH_SIZE = 10

process_and_update_data(
    engine=engine,
    model=model,
    client=client,
    batch_size=BATCH_SIZE,
    log=False)

[DB-COMMIT] Processed and updated 10 records. IDs: [692,701]
[DB-COMMIT] Processed and updated 10 records. IDs: [702,711]
[DB-COMMIT] Processed and updated 10 records. IDs: [712,721]
[DB-COMMIT] Processed and updated 10 records. IDs: [722,731]
[DB-COMMIT] Processed and updated 10 records. IDs: [732,741]
[ERROR]: API error occurred: Status 429
{"message":"Requests rate limit exceeded"} Records: [742,751] will be processed in next iteration
[DB-COMMIT] Processed and updated 10 records. IDs: [742,751]
[DB-COMMIT] Processed and updated 10 records. IDs: [752,761]
[DB-COMMIT] Processed and updated 10 records. IDs: [762,771]
[ERROR]: API error occurred: Status 429
{"message":"Requests rate limit exceeded"} Records: [772,781] will be processed in next iteration
[DB-COMMIT] Processed and updated 10 records. IDs: [772,781]
[DB-COMMIT] Processed and updated 10 records. IDs: [782,791]
[ERROR]: API error occurred: Status 429
{"message":"Requests rate limit exceeded"} Records: [792,801] will be proc

KeyboardInterrupt: 