# LLM Sentiment Analysis Proof-of-Concept

## Setup and Function Definitions

### Requirements

#### Modules

In [1]:
pip install openai





[notice] A new release of pip is available: 23.1.2 -> 23.2.1
[notice] To update, run: python.exe -m pip install --upgrade pip


#### Imports

In [2]:
import os
import openai
import time
import re

### OpenAI Library Functions

#### API Key Validation Functions

In [3]:
def get_api_key() -> str:
    '''Returns openai API key from API folder'''
    if os.path.exists('api/openai.txt'):
        with open('api/openai.txt') as f:
            return f.read().strip()
    return ''

def validate_key() -> bool:
    '''Checks if openai API key is valid'''
    openai.api_key = get_api_key()
    try:
        openai.Completion.create(engine="davinci", prompt="This is a test", max_tokens=5)
        return True
    except Exception as e:
        print('[ERROR - API KEY]', str(e))
        return False

#### Chat Completion Functions

In [4]:
def get_response(messages):
    '''Gets response from openai API'''
    #print(messages)
    response = None
    for i in range(3):
        if not validate_key():
            break
        response = query_gpt(messages)
        if response:
            break
    
    if not response:
        print('[ERROR - GPT 3.5]', 'Max connection attempts reached. Check internet connection.')
        return None
    
    return response['choices'][0]['message']['content']

def query_gpt(messages):
    '''Tries to get ChatGPT to generate a text response to messages
    if it fails, it waits 1 second and tries again.'''
    try:
        response = openai.ChatCompletion.create(
            model='gpt-3.5-turbo',
            messages=messages,
        )
        return response
    
    except Exception as e:
        print('[ERROR - GPT 3.5]', str(e))
        print('Retrying in 1 second...')
        time.sleep(1)
        return None

#### Message Formatting Functions

In [5]:
def format_question(question):
    '''Formats a string as a user prompt for ChatGPT'''
    return [{
        'role':'user',
        'content': question,
    }]

def format_instructions(instructions):
    '''Formats a string as a system instruction for ChatGPT'''
    return [{
        'role':'system',
        'content': instructions,
    }]

### File System Functions

In [6]:
def read_from_file(filepath):
    '''Reads all data from a file at filepath'''
    with open(filepath, 'r', encoding='utf-8') as infile:
        return infile.read()

def write_to_file(filepath, content):
    '''Writes all data in content to file at filepath'''
    with open(filepath, 'w', encoding='utf-8') as outfile:
        outfile.write(content)

### Sentiment Analysis Functions

In [14]:
def get_sentiment_scores():
    '''Returns a new line separated list of sentiment scores for 
    each user feedback comment.'''
    instructions = read_from_file('instructions.txt')
    inst_length = len(instructions.split(' '))
    instructions = format_instructions(instructions)
    comments = format_comments(read_from_file('comments.txt'))
    batches = split_into_batches(inst_length, comments, 5)
    scores = []
    
    for batch in batches:
        scores = scores + get_scores_from_batch(instructions, batch)
    return scores
    

def validate_sentiment_scores(score: str):
    '''Returns true if the sentiment score is a valid structure.'''
    valid_pattern = '^[\-]{0,1}[0-9]{1}[0]{0,1},[a-z]*( [a-z]*){0,2}(\n[\-]{0,1}[0-9]{1}[0]{0,1},[a-z]*( [a-z]*){0,2})*$'
    return bool(re.search(valid_pattern, score))

def results_invalid_length(batch: list, results: list):
    if len(batch) != len(results):
        return True
    return False

def format_comments(comments: str):
    comments = comments.strip().split('\n')
    return ['[' + comment.replace('\n', ' ') + ']' for comment in comments]

def split_into_batches(inst_length: int, comments: list, size: int):
    batches = []
    batch = []
    batch_length = inst_length
    for c in comments:
        if batch_length > 10000 or len(batch) == size:
            batches.append('\n'.join(batch))
            batch = []
            batch_length = inst_length
        batch_length += len(c.split(' '))
        batch.append(c)
    batches.append('\n'.join(batch))
    return batches

def get_scores_from_batch(instructions: list, batch: str):
    query = format_question(batch)    
    prompt = instructions + query

    while True:
        response = get_response(prompt).strip()
        print(response)
        if not validate_sentiment_scores(response):
            print('invalid sentiment scores')
            continue
        response = response.replace(', ', ',')
        scores = response.split('\n')
        response_length = len(scores)
        batch_length = len(batch.split('\n'))
        if response_length == batch_length:
            return scores
        print('Scores Count:', response_length)
        print('Batch Length:', batch_length)

## Main Function and Analysis

In [15]:
def main():
    if not validate_key():
        return

    sentiment = get_sentiment_scores()
    #write_to_file('sentiment.txt', '\n'.join(sentiment))

main()

7,none
-7,product quality
0,product value
10,none
-6,product durability
7,service
-5,product quality
-5,price value
0,none
-8,product quality
7,none
-8,price quality
-10,service
9,none
-8,product diameter
8,none
-9,product quality
6,none
-7,product value
9,none
-7,product color
7,installation ease
-8,product transportation
-10,product value
6,not the best
-5,product performance
8,price value
-7,product damage
8,product performance
-5,product quality
8,delivery performance
0,none
-10,product quality
9,none
-10,product design
7,none
-10,none
5,none
-8,product quality
10,none
-10,price quality
-5,product quality
-3,none
8,product durability
-10,none
-5,none
9,product durability
-7,purchase regret
6,product functionality
-10,customer refund
10,none
-8,frustrating
0,neutral
-10,product quality
10,none
-8,product quality
8,product quality
-9,none
7,no issues
-10,product value
10,satisfied
-10,craftsmanship
8,none
-8,product quality
10,customer service
-10,product value
5,none
-10,price value

In [27]:
arr = ['fish', 'cat', 'dog', 'fish', 'fish', 'dog', 'elephant', 'human']
counts = {}
for a in arr:
    if a in counts:
        counts[a] += 1
    else:
        counts[a] = 1
counts = dict(sorted(counts.items(), key=lambda x:x[1], reverse=True))
print(counts)

{'fish': 3, 'dog': 2, 'cat': 1, 'elephant': 1, 'human': 1}
