In [None]:
from dotenv import load_dotenv
import os
import openai

# Load environment variables from the .env file
load_dotenv()

# Retrieve the OpenAI API key from environment variables
openai.api_key = os.getenv("OPENAI_API_KEY")

# Check if the API key is loaded
if openai.api_key is None:
    raise ValueError("API key not found. Please check the .env file.")
else:
    print("API key loaded successfully.")

In [None]:
import os
from openai import OpenAI

# Instantiate the client using environment variable for the API key
client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
)

# Make a chat completion request
chat_completion = client.chat.completions.create(
    model="gpt-3.5-turbo",
    messages=[
        {
            "role": "user",
            "content": "Say this is a test",
        }
    ]
)

# Print the response
print(chat_completion)

In [None]:
print(chat_completion.choices[0].message.content)

In [15]:
def mood_agent(content):
    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{
            "role": "user",
            "content": f"Analyze the mood of this message, focusing on any hypothetical or speculative language that could affect sentiment: {content}"
        }]
    )
    return response.choices[0].message.content

def institutional_investor_agent(content):
    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{
            "role": "user",
            "content": f"Analyze this message as if you are an institutional investor, focusing on long-term impacts on stability and growth potential: {content}"
        }]
    )
    return response.choices[0].message.content


def individual_investor_agent(content):
    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{
            "role": "user",
            "content": f"Analyze this message as if you are an individual investor, focusing on short-term price impact and immediate gains or losses: {content}"
        }]
    )
    return response.choices[0].message.content


def rhetoric_agent(content):
    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{
            "role": "user",
            "content": f"Analyze the rhetorical style of this message, such as sarcasm, exaggeration, or assertive statements, and how these elements affect sentiment: {content}"
        }]
    )
    return response.choices[0].message.content


def dependency_agent(content):
    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{
            "role": "user",
            "content": f"Focus on the speaker’s sentiment in this message, without considering external perspectives or opinions of third parties: {content}"
        }]
    )
    return response.choices[0].message.content


def aspect_agent(content):
    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{
            "role": "user",
            "content": f"Analyze the sentiment toward the main entity (e.g., company or stock ticker) in this message, ignoring unrelated information: {content}"
        }]
    )
    return response.choices[0].message.content


def reference_agent(content):
    response = client.chat.completions.create(
        model="gpt-4-turbo",
        messages=[{
            "role": "user",
            "content": f"Identify references to time, price points, or external factors in this message, and analyze how they impact the overall sentiment: {content}"
        }]
    )
    return response.choices[0].message.content


In [None]:
content = "$SBUX this stock has been consolidating and coiling for years. I think many underestimate it. spring how"

# Run each agent on the content
responses = {
    "mood_agent": mood_agent(content),
    "institutional_investor_agent": institutional_investor_agent(content),
    "individual_investor_agent": individual_investor_agent(content),
    "rhetoric_agent": rhetoric_agent(content),
    "dependency_agent": dependency_agent(content),
    "aspect_agent": aspect_agent(content),
    "reference_agent": reference_agent(content)
}

# Display the responses from each agent
for agent, response in responses.items():
    print(f"{agent}: {response}\n")


# Mock Tests

In [21]:
# Sample financial message for sentiment analysis
content = "$SBUX this stock has been consolidating and coiling for years. I think many underestimate it. spring how"

In [22]:
# Mock agent responses to simulate natural language sentiment descriptions
def mood_agent(content):
    return "The mood seems cautiously optimistic about future potential."

def institutional_investor_agent(content):
    return "This message suggests a long-term positive outlook due to consolidation."

def individual_investor_agent(content):
    return "Indicates potential for short-term gains but remains uncertain."

def rhetoric_agent(content):
    return "The language implies underestimation by others, hinting at overlooked growth."

def dependency_agent(content):
    return "The speaker seems to hold a positive perspective independently."

def aspect_agent(content):
    return "Focuses specifically on $SBUX, suggesting stability and possible growth."

def reference_agent(content):
    return "No explicit timeframes, but consolidation indicates a buildup for potential movement."


In [23]:
# Collect initial responses from each agent in a dictionary
responses = {
    "mood_agent": mood_agent(content),
    "institutional_investor_agent": institutional_investor_agent(content),
    "individual_investor_agent": individual_investor_agent(content),
    "rhetoric_agent": rhetoric_agent(content),
    "dependency_agent": dependency_agent(content),
    "aspect_agent": aspect_agent(content),
    "reference_agent": reference_agent(content)
}

In [24]:
def summative_agent(responses, max_rounds=2):
    round_count = 0
    consensus_reached = False
    high_priority_agents = ["institutional_investor_agent", "individual_investor_agent"]
    
    # Store initial responses
    sentiment_summary = {agent: response for agent, response in responses.items()}

    while not consensus_reached and round_count < max_rounds:
        # Step 1: Ask the model to summarize the collective sentiment based on agent responses
        combined_responses = "\n".join([f"{agent}: {response}" for agent, response in sentiment_summary.items()])
        
        # Ask GPT-4 to interpret the overall sentiment, allowing for Positive, Negative, Neutral, or Mixed
        overall_sentiment = client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[{
                "role": "user", 
                "content": f"Based on the following responses from various agents, summarize the overall sentiment as Positive, Negative, Neutral, or Mixed if no clear conclusion can be derived:\n\n{combined_responses}"
            }]
        ).choices[0].message.content.strip().lower()
        
        # Check if a clear consensus is indicated by the response
        if "positive" in overall_sentiment:
            consensus_reached = True
            final_sentiment = "Positive"
        elif "negative" in overall_sentiment:
            consensus_reached = True
            final_sentiment = "Negative"
        elif "neutral" in overall_sentiment:
            consensus_reached = True
            final_sentiment = "Neutral"
        elif "mixed" in overall_sentiment or not consensus_reached:
            # Inconclusive: proceed to the next round
            round_count += 1
            for agent, response in responses.items():
                refined_response = client.chat.completions.create(
                    model="gpt-4-turbo",
                    messages=[
                        {"role": "user", "content": f"Refine your sentiment analysis by reviewing these responses:\n{combined_responses}\nOriginal response: {response}"}
                    ]
                )
                sentiment_summary[agent] = refined_response.choices[0].message.content

    # If no consensus after max rounds, defer to high-priority agents for final decision
    if not consensus_reached:
        high_priority_responses = "\n".join([f"{agent}: {sentiment_summary[agent]}" for agent in high_priority_agents if agent in sentiment_summary])
        final_sentiment = client.chat.completions.create(
            model="gpt-4-turbo",
            messages=[{
                "role": "user", 
                "content": f"Summarize the final sentiment based on high-priority agents alone, as Positive, Negative, Neutral, or Mixed if no clear conclusion can be drawn:\n{high_priority_responses}"
            }]
        ).choices[0].message.content.strip().capitalize()

    # Return the final sentiment and a detailed summary of agent responses
    return final_sentiment

In [None]:
# Run the summative agent to determine final sentiment
final_output = summative_agent(responses)
print("Final Aggregated Sentiment:", final_output)


# Financial Data Source

In [24]:
RANKED_KEYWORDS = [
    ["Hong Kong", "HSI", "Hang Seng"],  # Rank 1
    ["China", "Asia", "Singapore"],      # Rank 2
    ["US", "Global", "Something"]        # Rank 3
]

### Financial News: GDELT

In [None]:
import requests
import pandas as pd
from IPython.display import display

url = "https://api.gdeltproject.org/api/v2/doc/doc"
params = {
    'query': '(economy OR finance)',  # Corrected query with parentheses
    'mode': 'ArtList',
    'format': 'JSON'
}

headers = {
    'User-Agent': 'Mozilla/5.0'
}

# Sending request to GDELT
response = requests.get(url, params=params, headers=headers)

# Verify the response and process the data
if response.status_code == 200:
    if response.text.strip():  # Check if the response is not empty
        try:
            data = response.json()
            articles = pd.DataFrame(data.get('articles', []))
            if 'title' in articles.columns:
                # Displaying only the titles
                display(articles['title'].head())
            else:
                print("No title information found in the response.")
        except ValueError:
            print("Error decoding JSON. Response text:", response.text)
    else:
        print("Received an empty response.")
else:
    print("Error: ", response.status_code, "\nResponse Text: ", response.text)


In [None]:
def fetch_gdelt_data_with_ranking(max_records=10):
    credible_sources = ['reuters.com', 'yahoo.com', 'cnbc.com']
    for rank, keywords in enumerate(RANKED_KEYWORDS, start=1):
        query = f"({ ' OR '.join(keywords) })"
        print(f"Generated GDELT query: {query}")
        articles = fetch_gdelt_data(query, max_records)
        print(articles)
        if articles:
            filtered_articles = [
                {
                    "title": article.get("title"),
                    "url": article.get("url"),
                    "language": article.get("language"),
                    "sourcecountry": article.get("sourcecountry"),
                    "domain": article.get("domain")
                }
                for article in articles
                if any(source in article.get('url', '') for source in credible_sources) and article.get('language', '').lower() == 'english'
            ]
            if filtered_articles:
                print(f"Found articles with Rank {rank} keywords from credible sources.")
                return filtered_articles

    print("No articles found for any of the ranked keywords from credible sources.")
    return []

def fetch_gdelt_data(query, max_records=10):
    url = f"https://api.gdeltproject.org/api/v2/doc/doc?query={query}&mode=artlist&maxrecords={max_records}&format=json"
    try:
        response = requests.get(url)
        response.raise_for_status()
        if response.headers.get('Content-Type') == 'application/json':
            try:
                data = response.json()
                return data.get('articles', [])
            except JSONDecodeError as e:
                print("Error decoding JSON response from GDELT:", e)
                return []
        else:
            print("Unexpected content type from GDELT response:", response.headers.get('Content-Type'))
            return []
    except requests.exceptions.RequestException as e:
        print("Error fetching GDELT data:", e)
        return []
    

fetch_gdelt_data_with_ranking(5)

In [None]:
import requests

# Define the GDELT API endpoint
url = "https://api.gdeltproject.org/api/v2/doc/doc"

# Define the query parameters for the GDELT GEO 2.0 API
params = {
    "query": '(Hong Kong OR HSI OR "Hang Seng") AND (China OR Asia OR Singapore) AND (US OR Global OR Something) ' \
             'AND (domain:reuters.com OR domain:yahoo.com OR domain:cnbc.com) AND sourcelang:english',
    "mode": "ArtList",          # To get a list of articles
    "format": "json",           # To retrieve data in JSON format
    "maxrecords": "10",         # Limit to 10 records for testing
    "timespan": "7d",           # Only articles from the past week
    "sort": "DateDesc"          # Sort by most recent articles first
}

# Make the request
response = requests.get(url, params=params)

# Print the response to inspect or troubleshoot
print(response.text)

# Process the JSON response if successful
if response.status_code == 200:
    data = response.json()
    articles = data.get('articles', [])
    for article in articles:
        print(f"Title: {article.get('title')}, URL: {article.get('url')}")
else:
    print("Error:", response.text)


In [None]:
def fetch_gdelt_data_with_ranking(max_records=10):
    """Fetches articles from GDELT based on ranked keywords."""
    for rank, keywords in enumerate(RANKED_KEYWORDS, start=1):
        # Adjusted without quotes around OR terms
        keyword_query = "(" + " OR ".join(keywords) + ")"
        query = f'{keyword_query} AND domain:reuters.com AND sourcelang:english'
        print(f"Trying Rank {rank} keywords: {query}")
        articles = fetch_gdelt_data(query, max_records)
        if articles:
            print(f"Found articles with Rank {rank} keywords.")
            return pd.DataFrame(articles)
    print("No articles found.")
    return pd.DataFrame()

def fetch_gdelt_data(query, max_records=10):
    """Helper function to retrieve GDELT data for a specific query."""
    url = "https://api.gdeltproject.org/api/v2/doc/doc"
    params = {
        "query": query,
        "mode": "ArtList",
        "format": "json",
        "maxrecords": max_records,
        "sort": "DateDesc"
    }
    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        articles = data.get('articles', [])
        
        # Print available fields in each article for inspection
        for article in articles:
            print(article.keys())  # Print keys for each article
            article["content"] = article.get("excerpt", "Content not available")  # Use 'excerpt' if 'content' is missing
            
        return articles
    else:
        print("Error fetching GDELT data:", response.status_code)
        return []

    
print("\nTesting GDELT data retrieval...")
gdelt_data = fetch_gdelt_data_with_ranking()
display(gdelt_data.head())

### SNS Financial Data: Twitter

In [None]:
import tweepy
import pandas as pd
from IPython.display import display

# Set your keys and tokens (replace placeholders with your actual credentials)
consumer_key = "YOUR_CONSUMER_KEY"
consumer_secret = "YOUR_CONSUMER_SECRET"
access_token = "YOUR_ACCESS_TOKEN"
access_token_secret = "YOUR_ACCESS_TOKEN_SECRET"

# Authenticate with Twitter API
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Fetch tweets with the specified keyword
tweets = []
for tweet in tweepy.Cursor(api.search_tweets, q="economy", lang="en", tweet_mode="extended").items(10):
    tweets.append({'created_at': tweet.created_at, 'user': tweet.user.screen_name, 'text': tweet.full_text})

# Convert to DataFrame
tweets_df = pd.DataFrame(tweets)

# Display the first few tweets
display(tweets_df[['created_at', 'user', 'text']])


### Financial Forum: Reddit API

In [54]:
import praw
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

reddit = praw.Reddit(
    client_id=os.getenv("CLIENT_ID"),
    client_secret=os.getenv("CLIENT_SECRET"),
    password=os.getenv("PASSWORD"),  # Replace with your Reddit account password
    user_agent=os.getenv("USER_AGENT"),
    username=os.getenv("USERNAME").strip()
)

# Verify Reddit instance is connected
print(f"Connected as: {reddit.user.me()}")


Connected as: Nnfts


In [None]:
# Fetch posts from a subreddit as a test
subreddit = reddit.subreddit("financialindependence")
for submission in subreddit.hot(limit=5):
    print(submission.title)


In [57]:
# List of subreddits to search
SUBREDDITS = ["HongKong", "stocks", "investing", "finance"]

def fetch_reddit_data(limit=5):
    """Fetches posts from specified subreddits based on ranked keywords."""
    for rank, keywords in enumerate(RANKED_KEYWORDS, start=1):
        print(f"Searching Rank {rank} keywords: {keywords}")
        posts = []

        for subreddit_name in SUBREDDITS:
            subreddit = reddit.subreddit(subreddit_name)
            for keyword in keywords:
                for submission in subreddit.search(keyword, limit=limit):
                    posts.append({
                        "title": submission.title,
                        "score": submission.score,
                        "url": submission.url,
                        "created_utc": submission.created_utc,
                        "num_comments": submission.num_comments,
                        "subreddit": subreddit_name
                    })
                
                # Stop searching lower ranks if we find posts
                if posts:
                    return pd.DataFrame(posts)
        
    # If no posts found in any rank
    print("No Reddit posts found for any of the ranked keywords.")
    return pd.DataFrame()

fetch_reddit_data()

Searching Rank 1 keywords: ['Hong Kong', 'HSI', 'Hang Seng']


Redirect: Redirect to /subreddits/search