# Initialization and setup libraries

In [1]:
!pip install openai transformers torch

Collecting openai
  Downloading openai-1.41.0-py3-none-any.whl.metadata (22 kB)
Collecting httpx<1,>=0.23.0 (from openai)
  Downloading httpx-0.27.0-py3-none-any.whl.metadata (7.2 kB)
Collecting jiter<1,>=0.4.0 (from openai)
  Downloading jiter-0.5.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.6 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cache

In [2]:
import json
import os
import uuid
import random
from datetime import datetime, timedelta
import textwrap
import re
from google.colab import drive
import openai
from openai import OpenAI
from transformers import pipeline

In [16]:
# Mount Google Drive for persistent storage
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [17]:
# Initialize OpenAI client with your API key
client = OpenAI(api_key="YOUR_OPEN_API_KEY")

In [18]:
# Initialize sentiment analysis pipeline
try:
    sentiment_analyzer = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english")
except Exception as e:
    print(f"Error loading sentiment analysis model: {e}")
    print("Falling back to a simpler sentiment analysis method.")
    sentiment_analyzer = None

# Set up and required steps for user profiling

In [19]:
# Set up profile storage in Google Drive
PROFILES_FOLDER = '/content/drive/MyDrive/financial_advisor_profiles'
if not os.path.exists(PROFILES_FOLDER):
    os.makedirs(PROFILES_FOLDER)

def generate_unique_id():
    """Generate a unique identifier for a user profile."""
    return str(uuid.uuid4())

def save_profile(profile):
    """Save a user profile to a JSON file in Google Drive."""
    filename = os.path.join(PROFILES_FOLDER, f"{profile['id']}.json")
    with open(filename, 'w') as f:
        json.dump(profile, f)

def load_profile(profile_id):
    """Load a user profile from a JSON file in Google Drive."""
    filename = os.path.join(PROFILES_FOLDER, f"{profile_id}.json")
    if os.path.exists(filename):
        with open(filename, 'r') as f:
            return json.load(f)
    return None

def find_profile_by_name(name):
    """Find all profiles matching a given name."""
    matching_profiles = []
    for filename in os.listdir(PROFILES_FOLDER):
        if filename.endswith('.json'):
            profile = load_profile(filename[:-5])
            if profile and profile['name'].lower() == name.lower():
                matching_profiles.append(profile)
    return matching_profiles

def create_new_profile(name):
    """Create a new user profile with synthetic spending data."""
    profile_id = generate_unique_id()
    profile = {
        "id": profile_id,
        "name": name,
        "age": None,
        "income": None,
        "savings": None,
        "debts": None,
        "investments": None,
        "financial_goals": [],
        "conversation_history": [],
        "spending_data": generate_synthetic_spending_data()
    }
    save_profile(profile)
    return profile

def update_profile(profile, key, value):
    """Update a specific field in the user profile."""
    profile[key] = value
    save_profile(profile)

def get_profile_summary(profile):
    """Generate a human-readable summary of the user profile."""
    summary = f"Based on your profile, {profile['name']}:\n"
    for key, value in profile.items():
        if key not in ["id", "name", "conversation_history", "spending_data"] and value is not None and value != []:
            summary += f"- Your {key.replace('_', ' ')}: {value}\n"
    return wrap_text(summary.strip())

# Sentiment analyze for given text from user and get the context of text

In [20]:
def analyze_sentiment(text):
    """Analyze the sentiment of the given text using the sentiment analyzer or a fallback method."""
    if sentiment_analyzer:
        try:
            result = sentiment_analyzer(text)[0]
            return result['label'], result['score']
        except Exception as e:
            print(f"Error in sentiment analysis: {e}")

    # Fallback simple sentiment analysis
    positive_words = set(['good', 'great', 'happy', 'positive', 'optimistic'])
    negative_words = set(['bad', 'terrible', 'sad', 'negative', 'worried', 'concerned'])

    words = text.lower().split()
    positive_count = sum(word in positive_words for word in words)
    negative_count = sum(word in negative_words for word in words)

    if positive_count > negative_count:
        return 'POSITIVE', 0.7
    elif negative_count > positive_count:
        return 'NEGATIVE', 0.7
    else:
        return 'NEUTRAL', 0.5

In [21]:
def get_financial_context(text):
    """Determine the financial context of the given text using regex patterns."""
    patterns = {
        'spending': r'\b(buy|purchase|spend)\b',
        'saving': r'\b(save|deposit)\b',
        'investment': r'\b(invest|stock|bond)\b',
        'debt': r'\b(debt|loan|borrow)\b',
        'income': r'\b(income|salary|pension)\b'
    }

    contexts = [context for context, pattern in patterns.items() if re.search(pattern, text, re.IGNORECASE)]
    return contexts if contexts else ['general']

# Generate AI response to the user

In [22]:
def generate_response(profile, user_input):
    """Generate an AI response based on the user's input, profile, and spending data."""
    if not user_input.strip():
        return "I'm sorry, but I didn't receive any input. Could you please provide some information or ask a question?"

    try:
        sentiment, confidence = analyze_sentiment(user_input)
        contexts = get_financial_context(user_input)

        profile_summary = get_profile_summary(profile)

        # Analyze spending data if available
        spending_insights = ""
        if 'spending_data' in profile and profile['spending_data']:
            insights = analyze_spending_data(profile['spending_data'])
            spending_insights = f"\n\nBased on your spending data for the last 5 years:\n"
            spending_insights += f"- Total spending: ${insights['total_spending']:.2f}\n"
            spending_insights += f"- Average monthly spending: ${insights['average_monthly_spending']:.2f}\n"
            spending_insights += "- Spending breakdown by category:\n"
            for category, percentage in insights['category_breakdown'].items():
                spending_insights += f"  - {category.capitalize()}: {percentage:.2%}\n"

        prompt = f"The elderly client (Name: {profile['name']}, Age: {profile.get('age', 'Unknown')}) said: '{user_input}'. Their sentiment is {sentiment.lower()} (confidence: {confidence:.2f}). "
        prompt += f"The financial context is related to {', '.join(contexts)}. "
        prompt += f"\n\nUser's financial profile:\n{profile_summary}\n"
        prompt += spending_insights
        prompt += "\nProvide an empathetic response and appropriate financial advice, considering their emotional state, financial context, profile information, and spending habits if available."

        profile["conversation_history"].append({"role": "user", "content": user_input})
        profile["conversation_history"].append({"role": "system", "content": prompt})

        response = client.chat.completions.create(
            model="gpt-3.5-turbo",
            messages=profile["conversation_history"],
            max_tokens=200,
            n=1,
            temperature=0.7,
        )

        ai_response = response.choices[0].message.content.strip()
        profile["conversation_history"].append({"role": "assistant", "content": ai_response})
        save_profile(profile)

        return ai_response
    except openai.APIError as e:
        print(f"OpenAI API error: {e}")
        return "I apologize, but I'm having trouble connecting to my knowledge base. Could we try again in a moment?"
    except Exception as e:
        print(f"Unexpected error: {e}")
        return "I'm sorry, but I encountered an unexpected error. Could you please try rephrasing your question?"


# Start conversation, update profile from the given and sanitize this input

In [23]:
def update_profile_from_input(profile, user_input):
    """Update the user profile based on the input provided."""
    if "income" in user_input.lower():
        match = re.search(r'\d+', user_input)
        if match:
            update_profile(profile, "income", int(match.group()))
    elif "savings" in user_input.lower():
        match = re.search(r'\d+', user_input)
        if match:
            update_profile(profile, "savings", int(match.group()))
    elif "debt" in user_input.lower():
        match = re.search(r'\d+', user_input)
        if match:
            update_profile(profile, "debts", int(match.group()))
    elif "invest" in user_input.lower():
        update_profile(profile, "investments", user_input)
    elif "goal" in user_input.lower():
        profile["financial_goals"].append(user_input)
        save_profile(profile)

def sanitize_input(user_input):
    """Sanitize user input to remove potentially harmful characters."""
    sanitized = re.sub(r'[^\w\s.,!?]', '', user_input)
    return sanitized[:500]  # Limit input length


def start_new_conversation():
    """Start a new conversation by getting or creating a user profile."""
    name = input("Please enter your name: ")
    matching_profiles = find_profile_by_name(name)

    if not matching_profiles:
        profile = create_new_profile(name)
        print(wrap_text(f"Welcome, {profile['name']}! Let's create your profile."))
    elif len(matching_profiles) == 1:
        profile = matching_profiles[0]
        print(wrap_text(f"Welcome back, {profile['name']}!"))
    else:
        print(wrap_text(f"Found multiple profiles with the name {name}."))
        for i, p in enumerate(matching_profiles):
            print(wrap_text(f"{i+1}. Age: {p.get('age', 'Unknown')}, Income: {p.get('income', 'Unknown')}"))
        choice = int(input("Please choose the correct profile number: ")) - 1
        profile = matching_profiles[choice]

    if profile['age'] is None:
        age = input("Please enter your age: ")
        update_profile(profile, 'age', age)

    print(get_profile_summary(profile))
    return profile


# Generate different syntetic data of spending habit for the users

In [24]:
def generate_synthetic_spending_data(years=5):
    """Generate synthetic spending data for the last 5 years."""
    categories = ['groceries', 'utilities', 'entertainment', 'healthcare', 'transportation']
    end_date = datetime.now()
    start_date = end_date - timedelta(days=years*365)

    data = []
    current_date = start_date
    while current_date <= end_date:
        for category in categories:
            amount = random.randint(50, 500)  # Random amount between 50 and 500
            data.append({
                'date': current_date.strftime('%Y-%m-%d'),
                'category': category,
                'amount': amount
            })
        current_date += timedelta(days=30)  # Move to next month

    return data

def analyze_spending_data(spending_data):
    """Analyze spending data and return insights."""
    total_spending = sum(item['amount'] for item in spending_data)
    category_totals = {}
    for item in spending_data:
        category = item['category']
        amount = item['amount']
        if category in category_totals:
            category_totals[category] += amount
        else:
            category_totals[category] = amount

    insights = {
        'total_spending': total_spending,
        'average_monthly_spending': total_spending / (len(spending_data) / 5),  # Assuming 5 categories per month
        'category_breakdown': {cat: amount / total_spending for cat, amount in category_totals.items()}
    }
    return insights

def update_spending_data(profile, category, amount):
    """Add a new spending entry to the user's profile."""
    if 'spending_data' not in profile:
        profile['spending_data'] = []

    new_entry = {
        'date': datetime.now().strftime('%Y-%m-%d'),
        'category': category,
        'amount': amount
    }
    profile['spending_data'].append(new_entry)
    save_profile(profile)

def wrap_text(text, width=80):
    """Wrap text to fit within a specified width."""
    return "\n".join(textwrap.wrap(text, width=width))


# Main program

In [25]:
# Define a separator function
def print_separator(char='=', length=80):
    """Print a separator line."""
    print(char * length)

# Main program loop
print(wrap_text("Financial Advisor Bot: Hello! I'm here to help you with your financial questions and concerns."))
print_separator()

while True:
    action = input("Enter 'start' to begin a new conversation, or 'quit' to exit: ").lower()

    if action == 'quit':
        print_separator()
        print(wrap_text("Thank you for using the Financial Advisor Bot. Goodbye!"))
        print_separator()
        break
    elif action == 'start':
        profile = start_new_conversation()
        print_separator()

        while True:
            try:
                user_input = input("You: ")
                user_input = sanitize_input(user_input)

                if user_input.lower() in ['exit', 'quit', 'bye']:
                    print_separator()
                    print(wrap_text("Financial Advisor Bot: Thank you for chatting with me. Take care and have a great day!"))
                    print_separator()
                    save_profile(profile)
                    break  # This breaks the inner loop

                print_separator()
                update_profile_from_input(profile, user_input)
                response = generate_response(profile, user_input)
                print(wrap_text("Financial Advisor Bot: " + response))
                print_separator()
            except KeyboardInterrupt:
                print_separator()
                print(wrap_text("\nFinancial Advisor Bot: It seems you want to end our conversation. Take care and have a great day!"))
                print_separator()
                save_profile(profile)
                break  # This breaks the inner loop
            except Exception as e:
                print_separator()
                print(f"An unexpected error occurred: {e}")
                print(wrap_text("Financial Advisor Bot: I'm having some technical difficulties. Let's start our conversation again."))
                print_separator()
                profile["conversation_history"] = profile["conversation_history"][:1]  # Reset conversation history
                save_profile(profile)
    else:
        print_separator()
        print(wrap_text("Invalid input. Please enter 'start' or 'quit'."))
        print_separator()


Financial Advisor Bot: Hello! I'm here to help you with your financial questions
and concerns.
Enter 'start' to begin a new conversation, or 'quit' to exit: start
Please enter your name: Casper
Welcome, Casper! Let's create your profile.
Please enter your age: 85
Based on your profile, Casper: - Your age: 85
You: I have a retirement income for 2000 EUR per month. Based on my spending habit, how much I can save per month? 
Financial Advisor Bot: I understand that managing your finances during
retirement can be challenging, especially when trying to strike a balance
between enjoying your golden years and saving for the future. With a retirement
income of 2000 EUR per month and an average monthly spending of 1379.75 EUR, it
seems like you have some room to save each month.  Based on your current
numbers, you could potentially save around 620.25 EUR per month. It's great that
you have the discipline to track your spending habits and think about saving for
the future.   If you're looking to