In [None]:
import json
import time
from typing import Dict, List, Optional
from dataclasses import dataclass
from openai import OpenAI
import os
import pandas as pd
from tqdm.notebook import tqdm
tqdm.pandas()


In [None]:
@dataclass
class ExpertAnalysis:
    expert_role: str
    analysis: str
    bullish_score: float
    confidence: float

In [None]:
@dataclass
class TweetAnalysisResult:
    tweet_text: str
    company: str
    expert_analyses: List[ExpertAnalysis]
    final_judgment: str
    final_bullish_score: float
    reasoning: str

In [None]:
class TweetAnalysisPipeline:
    def __init__(self, api_key: str, model: str = "gpt-4"):

        self.client = OpenAI(base_url="http://api.llm.apps.os.dcs.gla.ac.uk/v1", api_key=os.environ['IDA_LLM_API_KEY'])
        self.model = model
        
        # Define expert roles and their focus areas
        self.expert_roles = {
            "Technical Analyst": "Focus on chart patterns, technical indicators, and market momentum signals",
            "Fundamental Analyst": "Analyze business fundamentals, financial metrics, and company performance",
            "Sentiment Analyst": "Evaluate public perception, social media sentiment, and market psychology",
            "Risk Analyst": "Assess potential risks, regulatory concerns, and market volatility factors",
            "Industry Expert": "Provide sector-specific insights and competitive landscape analysis"
        }
    
    def _call_llm(self, prompt: str, max_tokens: int = 2000) -> str:
        """Make a call to the LLM with error handling and rate limiting"""
        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=max_tokens,
                temperature=0.7
            )
            return response.choices[0].message.content.strip()
        except Exception as e:
            print(f"Error calling LLM: {e}")
            return ""
    
    def _get_expert_analysis(self, tweet: str, company: str, expert_role: str, focus_area: str) -> ExpertAnalysis:

        prompt = f"""
You are a {expert_role} analyzing a tweet about {company}. {focus_area}.

Tweet: "{tweet}"

Please provide your analysis in the following JSON format:
{{
    "analysis": "Your detailed analysis as a {expert_role}",
    "bullish_score": X.X,
    "confidence": X.X,
    "key_factors": ["factor1", "factor2", "factor3"]
}}

Bullish score: 0-10 scale where:
- 0-2: Very bearish (strong negative impact on stock)
- 3-4: Bearish (moderate negative impact)
- 5: Neutral (no significant impact)
- 6-7: Bullish (moderate positive impact)
- 8-10: Very bullish (strong positive impact)

Confidence: 0-1 scale representing how confident you are in your assessment. Ensure that your response conforms to the JSON format.

Focus on insights specific to your role as a {expert_role}.
"""
        
        response = self._call_llm(prompt, max_tokens=600)
        
        try:

            parsed = json.loads(response)
            return ExpertAnalysis(
                expert_role=expert_role,
                analysis=parsed.get("analysis", ""),
                bullish_score=float(parsed.get("bullish_score", 5.0)),
                confidence=float(parsed.get("confidence", 0.5))
            )
        except (json.JSONDecodeError, ValueError) as e:
            print(f"Error parsing expert analysis: {e}")

            return ExpertAnalysis(
                expert_role=expert_role,
                analysis=response,
                bullish_score=5.0,
                confidence=0.3
            )
    
    def _get_final_judgment(self, tweet: str, company: str, expert_analyses: List[ExpertAnalysis]) -> Dict:

        expert_summaries = []
        for analysis in expert_analyses:
            expert_summaries.append(
                f"{analysis.expert_role}: {analysis.analysis} "
                f"(Bullish Score: {analysis.bullish_score}/10, "
                f"Confidence: {analysis.confidence:.2f})"
            )
        
        expert_text = "\n\n".join(expert_summaries)
        
        prompt = f"""
You are a senior investment analyst tasked with making a final judgment on a tweet about {company}.

Original Tweet: "{tweet}"

Expert Analyses:
{expert_text}

Based on these expert perspectives, provide your final judgment in JSON format:
{{
    "final_judgment": "BULLISH" | "BEARISH" | "NEUTRAL",
    "final_bullish_score": X.X,
    "reasoning": "Your reasoning for the final judgment",
    "key_consensus_points": ["point1", "point2"],
    "key_disagreement_points": ["point1", "point2"],
    "time_horizon": "SHORT_TERM" | "MEDIUM_TERM" | "LONG_TERM"
}}

Final bullish score: 0-10 scale (same as individual experts)
Final judgment should be:
- BULLISH if score > 6.5
- BEARISH if score < 4.5  
- NEUTRAL if score 4.5-6.5

Consider the confidence levels of each expert when weighing their opinions. Ensure that your response conforms to the JSON format.
"""
        
        response = self._call_llm(prompt, max_tokens=2000)
        print(response)
        try:

            start_index = response.find('{')
            end_index = response.rfind('}') + 1
            if start_index != -1 and end_index != 0:
                json_response = response[start_index:end_index]
                return json.loads(json_response)
            else:

                raise json.JSONDecodeError("Could not find JSON object in response", response, 0)
        except json.JSONDecodeError as e:
            print(f"Error parsing final judgment: {e}")
            return {
                "final_judgment": "NEUTRAL",
                "final_bullish_score": 5.0,
                "reasoning": response,
                "key_consensus_points": [],
                "key_disagreement_points": [],
                "time_horizon": "SHORT_TERM"
            }
    
    def analyze_tweet(self, tweet: str, company: str) -> TweetAnalysisResult:

        print(f"Analyzing tweet about {company}...")
        

        expert_analyses = []
        for expert_role, focus_area in self.expert_roles.items():
            print(f"Getting {expert_role} analysis...")
            analysis = self._get_expert_analysis(tweet, company, expert_role, focus_area)
            expert_analyses.append(analysis)
            

            time.sleep(0.01)
        

        print("Synthesizing final judgment...")
        final_judgment_data = self._get_final_judgment(tweet, company, expert_analyses)
        

        result = TweetAnalysisResult(
            tweet_text=tweet,
            company=company,
            expert_analyses=expert_analyses,
            final_judgment=final_judgment_data.get("final_judgment", "NEUTRAL"),
            final_bullish_score=final_judgment_data.get("final_bullish_score", 5.0),
            reasoning=final_judgment_data.get("reasoning", "")
        )
        
        return result
    
    def print_analysis(self, result: TweetAnalysisResult):

        print("\n" + "="*80)
        print(f"TWEET ANALYSIS RESULTS - {result.company}")
        print("="*80)
        print(f"Tweet: {result.tweet_text}")
        print(f"\nFinal Judgment: {result.final_judgment}")
        print(f"Bullish Score: {result.final_bullish_score}/10")
        print(f"Reasoning: {result.reasoning}")
        
        print("\nEXPERT ANALYSES:")
        print("-"*40)
        for analysis in result.expert_analyses:
            print(f"\n{analysis.expert_role}:")
            print(f"Score: {analysis.bullish_score}/10 (Confidence: {analysis.confidence:.2f})")
            print(f"Analysis: {analysis.analysis}")


In [None]:
tweets_df = pd.read_parquet("stock_tweets_withsentiment_withemotion_nomerge")
tweets_df


In [None]:
companies_df = pd.read_parquet("stock_table.parquet")
companies_df

In [None]:
companies_df = companies_df.rename(columns={'Symbol':'ticker'})

In [None]:
tweets_df = tweets_df.merge(companies_df, left_on='ticker', right_on='ticker')

In [None]:
tweets_df

In [None]:

def get_stance(tweet, company):
    pipeline = TweetAnalysisPipeline(
        api_key="ida_tuKXyOHU4nc5imW2YmTzTZD6NDkPpy02XrPJ2hMg",
        model="llama-3.3-70b-instruct"
    )
    result = pipeline.analyze_tweet(tweet, company)

    return result.final_judgment, result.final_bullish_score

In [None]:
tweets_test = tweets_df.head(10).copy()

In [None]:
tweets_test

In [None]:

tweets_df[['stance', 'stance_score']] = tweets_df.apply(lambda row: pd.Series(get_stance(row['text'], row['Company'])), axis=1)

In [None]:
tweets_df

In [None]:
tweets_df.to_parquet("stock_tweets_withsentiment_withemotion_withstance_nomerge", index=False)