In [39]:
import requests
import json
import datetime


class ModelOperatorOllama():
    def __init__(self):
        self.url = 'http://127.0.0.1:11435/api/'
        self.model_list = self.list_models()
        
        # db_configs
        
        
        

    def generate_response(self,model,system_prompt,prompt,format=None):
        """
        Sends a request to the LLM API and returns the response.
        """
        
        if model in self.model_list:
            self.model = model
        else:
            raise ValueError(f"Model '{model}' not provided or not found in available models: {self.model_list}")
        
        url="http://127.0.0.1:11435/api/chat"
        payload = {
            "model": self.model,
            "keep_alive": 0,
            "messages": [
                {'role':'system',
                'content':system_prompt
                },
                {'role':'user',
                'content':prompt}
                ],
            "stream": False
        }
        
        if format:
            payload.update(format)

        headers = {"Content-Type": "application/json"}
        
        response = requests.post(url, data=json.dumps(payload), headers=headers)
        
        # get generation timestamp
        timestamp = datetime.datetime.now().isoformat()
        response_json = response.json()
        response_json.update({'timestamp':timestamp})
        

        return response_json
        
    def list_models(self):
        
        response = requests.get(f'{self.url}tags')
        model_dict = {
            model['name']: {'model_name': model['name'],
                            'param_size': model['details']['parameter_size'],
                            'quant_level': model['details']['quantization_level']} for model in response.json()['models']
    }
        return model_dict

In [27]:
llmp = ModelOperatorOllama()

In [28]:
response = llmp.generate_response('llama3.2:latest','','what is my name?')

In [29]:
response

{'model': 'llama3.2:latest',
 'created_at': '2025-02-01T15:21:52.1530349Z',
 'message': {'role': 'assistant',
  'content': "I don't have any information about you, so I'm not sure what your name is. We just started our conversation, and I don't have any prior knowledge or data about you. Would you like to tell me your name, though?"},
 'done_reason': 'stop',
 'done': True,
 'total_duration': 2904456100,
 'load_duration': 1591682300,
 'prompt_eval_count': 30,
 'prompt_eval_duration': 151000000,
 'eval_count': 50,
 'eval_duration': 698000000,
 'timestamp': '2025-02-01T15:21:52.153541'}

In [None]:
response_json['load_duration'] = round(response_json['load_duration']/(10**9),2)
response_json['prompt_eval_duration'] = round(response_json['prompt_eval_duration']/(10**9),2)
response_json['eval_duration'] = round(response_json['eval_duration']/(10**9),2)

In [38]:
round(2904456100*(10**-9),2)

2.9

In [16]:
import psycopg2 as psql

# connects to db
connection = psql.connect(user = 'llmp',
                        password = 'Akechi21234',
                        host = '127.0.0.1',
                        port = '5432',
                        database = 'llmp_db')

# cursor to run queries and operations on db
# cursor = connection.cursor()

# query example
# cursor.execute("SELECT 1 FROM llmp_db")

# get results from query
# data = cursor.fetchmany()
# print(data)



In [18]:
drop_table_query = "DROP TABLE IF EXISTS generation_history;"
connection.autocommit = True
cursor = connection.cursor()
cursor.execute(drop_table_query)

In [54]:
connection.autocommit = True
cursor = connection.cursor()

create_table_query = """
        CREATE TABLE IF NOT EXISTS generation_history (
            id SERIAL PRIMARY KEY,
            model TEXT,
            system_prompt TEXT,
            prompt TEXT,
            generated_text TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            test TEXT
        );
        """
cursor.execute(create_table_query)

In [45]:
temp_conn = psql.connect(user = 'llmp',
                        password = 'Akechi21234',
                        host = '127.0.0.1',
                        port = '5432',
                        database = 'llmp_db')

temp_conn.autocommit = True
temp_cur = temp_conn.cursor()
database = 'llmp_db'
 # Check if database exists
temp_cur.execute(f"SELECT 1 FROM pg_database WHERE datname = '{database}'")
exists = temp_cur.fetchone()
if exists:
    print(1)
else:
    print(0)

1


In [19]:
import requests
import json
import datetime
import psycopg2 as psql
# with db connection

class ModelOperatorOllama():
    def __init__(self):
        
        # Ollama configs
        self.url = 'http://127.0.0.1:11435/api/'
        self.model_list = self.list_models()
        
        # db configs
        self.db_user = 'llmp'
        self.db_password = 'Akechi21234'
        self.db_host = '127.0.0.1'
        self.db_port = '5432'
        self.db_database = 'llmp_db'
        
        # db connection
        self.connection = self.connect_to_db()
        self.cursor = self.connection.cursor()
        # create table if not exists
        self.create_gen_hist_table()
        
        
    def connect_to_db(self):
        """
        Verify if database exists, if not, creates it.
        Connects to the database and returns the connection object.
        """
        try:
            
            # verify if db exists
            tmp_connection = psql.connect(user = self.db_user,
                            password = self.db_password,
                            host = self.db_host,
                            port = self.db_port,
                            database = self.db_database)
            
            tmp_connection.autocommit = True
            tmp_cursor = tmp_connection.cursor()
            tmp_cursor.execute(f"SELECT 1 FROM pg_database WHERE datname = '{self.db_database}'")
            exists = tmp_cursor.fetchone()
            
            # if doesn't exist, create it
            if not exists:
                print(f"Database '{self.db_database}' not found. Creating it...")
                tmp_cursor.execute(f"CREATE DATABASE {self.db_database}")
                
            tmp_cursor.close()
            tmp_connection.close()
            
            # conenct to db
            connection = psql.connect(
                dbname=self.db_database,
                user=self.db_user,
                password=self.db_password,
                host=self.db_host,
                port=self.db_port
            )
            print(f"Connected to database: {self.db_database}")
            return connection
        
        except Exception as e:
            print(f"Database connection failed: {e}")
            raise   
        
    def create_gen_hist_table(self):
        """
        Checks if gen_hist table exists, if not creates it.
        """
        table_creation_query = """
        CREATE TABLE IF NOT EXISTS generation_history (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            gen_id TEXT,
            gen_timestamp TEXT,
            model TEXT,
            system_prompt TEXT,
            prompt TEXT,
            gent_text TEXT,
            prompt_eval_count INT,
            eval_count INT,
            load_duration FLOAT,
            prompt_eval_duration FLOAT,
            eval_duration FLOAT
        );
        """
        self.cursor.execute(table_creation_query)
        
        
    def save_to_db(self, generation_data):
        """
        Saves the generated response to PostgreSQL.
        
        Parameters:
        - generation_data (dict): JSON object containing generation details.
        """
        try:
            # Map JSON keys to table columns
            db_columns = {
                "gen_id": generation_data.get("gen_id"),
                "gen_timestamp": generation_data.get("timestamp"),
                "model": generation_data.get("model"),
                "system_prompt": generation_data.get("system_prompt"),
                "prompt": generation_data.get("prompt"),
                "gent_text": generation_data.get("message", {}).get("content"),
                "prompt_eval_count": generation_data.get("prompt_eval_count"),
                "eval_count": generation_data.get("eval_count"),
                "load_duration": generation_data.get("load_duration"),
                "prompt_eval_duration": generation_data.get("prompt_eval_duration"),
                "eval_duration": generation_data.get("eval_duration"),
            }

            # Generate dynamic SQL query
            columns = ", ".join(db_columns.keys())
            placeholders = ", ".join(["%s"] * len(db_columns))
            values = tuple(db_columns.values())

            insert_query = f"""
            INSERT INTO generation_history ({columns})
            VALUES ({placeholders});
            """

            # Execute and commit the query
            self.cursor.execute(insert_query, values)
            self.connection.commit()
            print("Generation saved to database.")

        except Exception as e:
            self.connection.rollback()
            print(f"Failed to save generation to database: {e}")
        

    def generate_response(self,model,system_prompt,prompt,format=None):
        """
        Sends a request to the LLM API and returns the response.
        """
        timestamp = datetime.datetime.now().isoformat()
        timestamp = str(timestamp)
        if model in self.model_list:
            self.model = model
        else:
            raise ValueError(f"Model '{model}' not provided or not found in available models: {self.model_list}")
        
        url="http://127.0.0.1:11435/api/chat"
        payload = {
            "model": self.model,
            "keep_alive": 0,
            "messages": [
                {'role':'system',
                'content':system_prompt
                },
                {'role':'user',
                'content':prompt}
                ],
            "stream": False
        }
        
        if format:
            payload.update(format)

        headers = {"Content-Type": "application/json"}
        
        response = requests.post(url, data=json.dumps(payload), headers=headers)
        
        # get generation timestamp
        
        response_json = response.json()
        
        response_json.update({'system_prompt':system_prompt})
        response_json.update({'prompt':prompt})
        response_json.update({'timestamp':timestamp})
        response_json['load_duration'] = round(response_json['load_duration']/(10**9),2)
        response_json['prompt_eval_duration'] = round(response_json['prompt_eval_duration']/(10**9),2)
        response_json['eval_duration'] = round(response_json['eval_duration']/(10**9),2)
        response_json['gen_id'] = f'{response_json['model']}_{response_json['timestamp']}'
        
        print('saving data')
        self.save_to_db(response_json)

        return response_json
        
    def list_models(self):
        
        response = requests.get(f'{self.url}tags')
        model_dict = {
            model['name']: {'model_name': model['name'],
                            'param_size': model['details']['parameter_size'],
                            'quant_level': model['details']['quantization_level']} for model in response.json()['models']
    }
        return model_dict

In [1]:
from src.model_operator.model_operator import ModelOperatorOllama
llmp = ModelOperatorOllama()
response = llmp.generate_response('llama3.2:latest','','what is my name?')

Connected to database: llmp_db
saving data
Generation saved to database.


In [22]:
response

{'model': 'llama3.2:latest',
 'created_at': '2025-02-01T17:10:24.4045472Z',
 'message': {'role': 'assistant',
  'content': "I don't have any information about your name. I'm a large language model, I don't have the ability to store or recall personal details about individual users. Each time you interact with me, it's a new conversation and I start from a blank slate. Would you like to tell me your name?"},
 'done_reason': 'stop',
 'done': True,
 'total_duration': 3146740200,
 'load_duration': 1.61,
 'prompt_eval_count': 30,
 'prompt_eval_duration': 0.18,
 'eval_count': 63,
 'eval_duration': 0.89,
 'system_prompt': '',
 'prompt': 'what is my name?',
 'timestamp': '2025-02-01T17:10:21.256455',
 'gen_id': 'llama3.2:latest_2025-02-01T17:10:21.256455'}

In [12]:
response.get("eval_count")

43

In [None]:
def connect_to_db(self):
        """Connects to PostgreSQL and creates the database if it doesn't exist."""
        try:
            # Connect to default 'postgres' database to check if target DB exists
            temp_conn = psycopg2.connect(
                dbname="postgres",
                user=self.db_user,
                password=self.db_password,
                host=self.db_host,
                port=self.db_port
            )
            temp_conn.autocommit = True
            temp_cur = temp_conn.cursor()

            # Check if database exists
            temp_cur.execute(f"SELECT 1 FROM pg_database WHERE datname = '{self.db_name}'")
            exists = temp_cur.fetchone()

            if not exists:
                print(f"Database '{self.db_name}' not found. Creating it...")
                temp_cur.execute(f"CREATE DATABASE {self.db_name}")

            temp_cur.close()
            temp_conn.close()

            # Now connect to the actual database
            conn = psycopg2.connect(
                dbname=self.db_name,
                user=self.db_user,
                password=self.db_password,
                host=self.db_host,
                port=self.db_port
            )
            print(f"Connected to database: {self.db_name}")
            return conn

        except Exception as e:
            print(f"Database connection failed: {e}")
            raise
        
        
def create_table_if_not_exists(self):
        """Ensures the generation history table exists in the database."""
        create_table_query = """
        CREATE TABLE IF NOT EXISTS generation_history (
            id SERIAL PRIMARY KEY,
            model TEXT,
            system_prompt TEXT,
            prompt TEXT,
            generated_text TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
        self.cur.execute(create_table_query)
        self.conn.commit()
        print("Table 'generation_history' is ready.")
        

def save_to_db(self, model, system_prompt, prompt, generated_text):
        """Saves the generated response to PostgreSQL."""
        insert_query = """
        INSERT INTO generation_history (model, system_prompt, prompt, generated_text)
        VALUES (%s, %s, %s, %s);
        """
        self.cur.execute(insert_query, (model, system_prompt, prompt, generated_text))
        self.conn.commit()
        print("Generation saved to database.")

In [8]:
connection

<connection object at 0x000001FC25B99250; dsn: 'user=llmp password=xxx dbname=llmp_db host=127.0.0.1 port=5432', closed: 0>

In [None]:
import requests
import json
import psycopg2

class ModelOperatorOllama():
    def __init__(self):
        # PostgreSQL connection settings
        self.db_name = "llmp_db"
        self.db_user = "llmp"
        self.db_password = "Akechi21234"  # Replace with actual password
        self.db_host = "127.0.0.1"
        self.db_port = "5432"

        # Connect to PostgreSQL and create database if it doesn't exist
        self.conn = self.connect_to_db()
        self.cur = self.conn.cursor()
        self.create_table_if_not_exists()  # Ensure table exists

        # Ollama API setup
        self.url = 'http://127.0.0.1:11435/api/'
        self.model_list = self.list_models()

    def connect_to_db(self):
        """Connects to PostgreSQL and creates the database if it doesn't exist."""
        try:
            # Connect to default 'postgres' database to check if target DB exists
            temp_conn = psycopg2.connect(
                dbname="postgres",
                user=self.db_user,
                password=self.db_password,
                host=self.db_host,
                port=self.db_port
            )
            temp_conn.autocommit = True
            temp_cur = temp_conn.cursor()

            # Check if database exists
            temp_cur.execute(f"SELECT 1 FROM pg_database WHERE datname = '{self.db_name}'")
            exists = temp_cur.fetchone()

            if not exists:
                print(f"Database '{self.db_name}' not found. Creating it...")
                temp_cur.execute(f"CREATE DATABASE {self.db_name}")

            temp_cur.close()
            temp_conn.close()

            # Now connect to the actual database
            conn = psycopg2.connect(
                dbname=self.db_name,
                user=self.db_user,
                password=self.db_password,
                host=self.db_host,
                port=self.db_port
            )
            print(f"Connected to database: {self.db_name}")
            return conn

        except Exception as e:
            print(f"Database connection failed: {e}")
            raise

    def create_table_if_not_exists(self):
        """Ensures the generation history table exists in the database."""
        create_table_query = """
        CREATE TABLE IF NOT EXISTS generation_history (
            id SERIAL PRIMARY KEY,
            model TEXT,
            system_prompt TEXT,
            prompt TEXT,
            generated_text TEXT,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
        """
        self.cur.execute(create_table_query)
        self.conn.commit()
        print("Table 'generation_history' is ready.")

    def generate_response(self, model, system_prompt, prompt, format=None):
        """
        Sends a request to the LLM API and returns the response.
        """
        if model in self.model_list:
            self.model = model
        else:
            raise ValueError(f"Model '{model}' not provided or not found in available models: {self.model_list}")

        url = "http://127.0.0.1:11435/api/chat"
        payload = {
            "model": self.model,
            "keep_alive": 0,
            "messages": [
                {'role': 'system', 'content': system_prompt},
                {'role': 'user', 'content': prompt}
            ],
            "stream": False
        }

        if format:
            payload.update(format)

        headers = {"Content-Type": "application/json"}
        response = requests.post(url, data=json.dumps(payload), headers=headers)

        if response.status_code == 200:
            response_data = response.json()
            generated_text = response_data.get("message", {}).get("content", "")

            # Store the generation in the database
            self.save_to_db(model, system_prompt, prompt, generated_text)

            return response_data  # Return the JSON response
        else:
            return {"error": f"Request failed with status {response.status_code}", "details": response.text}

    def save_to_db(self, model, system_prompt, prompt, generated_text):
        """Saves the generated response to PostgreSQL."""
        insert_query = """
        INSERT INTO generation_history (model, system_prompt, prompt, generated_text)
        VALUES (%s, %s, %s, %s);
        """
        self.cur.execute(insert_query, (model, system_prompt, prompt, generated_text))
        self.conn.commit()
        print("Generation saved to database.")

    def list_models(self):
        """Retrieves the list of available models from Ollama API."""
        response = requests.get(f'{self.url}tags')
        model_dict = {
            model['name']: {
                'model_name': model['name'],
                'param_size': model['details']['parameter_size'],
                'quant_level': model['details']['quantization_level']
            }
            for model in response.json()['models']
        }
        return model_dict
