# Optimized Pipeline with Rich Formatting

Enhanced version of the pipeline with rich text formatting for better visualization of outputs.

In [None]:
import langchain
import os
from langchain_openai import ChatOpenAI
from langchain.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate, ChatPromptTemplate
from langchain.schema import SystemMessage, HumanMessage
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain
import json
import urllib.parse
from os import getenv
from dotenv import load_dotenv
import re
import requests
from rich import print
from rich.console import Console
from rich.table import Table
from rich.progress import track
from rich.panel import Panel
from rich.syntax import Syntax

# Initialize Rich console
console = Console()

try:
    import json5
except ImportError:
    json5 = None
    console.print("[yellow]Warning: json5 not installed. Strict JSON parsing will be used.[/yellow]")

load_dotenv()

# Initialize OpenAI model with rich status display
with console.status("[bold green]Initializing OpenAI model..."):
    llm = ChatOpenAI(
        api_key=getenv("OPENAI_API_KEY"),
        model_name="gpt-4",
        verbose=True,
        temperature=0
    )
    console.print("[bold green]✓[/bold green] Model initialized successfully")

In [None]:
# API Configuration with Rich formatting
lunarcrush_base_url = "https://lunarcrush.com/api4"
mobula_base_url = "https://production-api.mobula.io/api/1"

# Display API configuration
api_table = Table(title="API Configuration")
api_table.add_column("Service", style="cyan")
api_table.add_column("Base URL", style="green")
api_table.add_row("LunarCrush", lunarcrush_base_url)
api_table.add_row("Mobula", mobula_base_url)
console.print(api_table)

# Headers configuration
lunarcrush_headers = {'Authorization': 'Bearer deb9mcyuk3wikmvo8lhlv1jsxnm6mfdf70lw4jqdk'}
mobula_headers = {"Authorization": "e26c7e73-d918-44d9-9de3-7cbe55b63b99"}

# File configuration
mobula_coins_file = "mobula-coins.json"
lunarcrush_coins_file = "lunarcrush-coins.json"

In [None]:
def form_parent_calls_func(api_list):
    base_urls = {
        "lunarcrush": lunarcrush_base_url,
        "mobula": mobula_base_url
    }
    api_calls = []
    
    with console.status("[bold blue]Forming API calls...") as status:
        for api_def in track(api_list, description="Processing API definitions"):
            if not isinstance(api_def, dict):
                continue
                
            provider = str(api_def.get("provider", "")).lower().strip()
            base_url = base_urls.get(provider)
            
            if not base_url:
                console.print(f"[red]Unknown provider '{provider}' – skipping definition.[/red]")
                continue
                
            try:
                url = form_parent_call_func(api_def, base_url)
            except Exception as e:
                console.print(f"[red]Error forming API call for {provider}: {e}[/red]")
                continue
                
            description = api_def.get("description", "")
            api_calls.append({
                "provider": provider,
                "url": url,
                "description": description
            })
            
    return api_calls

In [None]:
def execute_parent_calls_func(api_calls):
    header_map = {
        "lunarcrush": lunarcrush_headers,
        "mobula": mobula_headers
    }
    results = []
    
    with console.status("[bold blue]Executing API calls...") as status:
        for call in track(api_calls, description="Processing API calls"):
            provider = str(call.get("provider", "")).lower().strip()
            url = call.get("url", "")
            description = call.get("description", "")
            headers = header_map.get(provider)
            
            if headers is None:
                console.print(f"[red]Unknown provider '{provider}'[/red]")
                results.append({
                    "url": url,
                    "description": description,
                    "error": f"Unknown provider '{provider}'"
                })
                continue
                
            try:
                response = requests.get(url, headers=headers, timeout=10)
                response.raise_for_status()
                results.append({
                    "url": url,
                    "response": response.json()
                })
                console.print(f"[green]Successfully executed API call for {provider}[/green]")
            except Exception as e:
                console.print(f"[red]Error executing API call: {str(e)}[/red]")
                results.append({
                    "url": url,
                    "description": description,
                    "error": str(e)
                })
                
    return results

In [None]:
def aggregate_tokens_simple_func(obj, avoid_tokens, collected=None):
    if collected is None:
        collected = {}
        
    if isinstance(obj, dict):
        if "symbol" in obj:
            symbol = str(obj.get("symbol", "")).upper().strip()
            if symbol and symbol not in avoid_tokens:
                if symbol not in collected:
                    collected[symbol] = obj
        for value in obj.values():
            aggregate_tokens_simple_func(value, avoid_tokens, collected)
    elif isinstance(obj, list):
        for item in obj:
            aggregate_tokens_simple_func(item, avoid_tokens, collected)
            
    return collected

In [None]:
def get_top_tokens_func(parent_api_results, avoid_tokens, top_n=5):
    with console.status("[bold blue]Processing tokens..."):
        valid_coins = load_valid_coins_func(mobula_coins_file, lunarcrush_coins_file)
        avoid_set = {token.upper() for token in avoid_tokens}
        valid_set = {coin.upper() for coin in valid_coins}
        
        aggregated = aggregate_tokens_simple_func(parent_api_results, avoid_set)
        filtered_tokens = []
        
        for symbol, token in aggregated.items():
            if symbol in valid_set:
                filtered_tokens.append(token)
            if len(filtered_tokens) >= top_n:
                break
                
        # Display results
        token_table = Table(title="Top Tokens")
        token_table.add_column("Symbol", style="cyan")
        token_table.add_column("Name", style="green")
        
        for token in filtered_tokens:
            token_table.add_row(
                token.get("symbol", ""),
                token.get("name", "")
            )
            
        console.print(token_table)
        return filtered_tokens

In [None]:
import fewshots
import variables

query = "Trending projects in NFTs and DeFi"

# Display query in a panel
console.print(Panel(f"[bold cyan]Query:[/bold cyan] {query}", title="Analysis Request"))

system_template = fewshots.system_template
master_system_prompt = SystemMessagePromptTemplate.from_template(system_template).format(sectors=variables.sectors)

human_template = fewshots.human_template
master_human_prompt = ChatPromptTemplate.from_template(human_template).format(input=query, master_fewshot=fewshots.master_llm_fewshot)

with console.status("[bold blue]Generating master output..."):
    master_output = llm.invoke([master_system_prompt, master_human_prompt]).content

# Display master output
console.print(Panel(master_output, title="Master LLM Output", border_style="green"))

In [None]:
data_system_template = fewshots.data_system_template
data_system_prompt = SystemMessagePromptTemplate.from_template(data_system_template).format(
    sectors=variables.sectors,
    lunarcrush_endpoints=variables.lunarcrush_endpoints,
    mobula_endpoints=variables.mobula_endpoints,
    data_output_format=variables.data_output_format,
    data_fewshot=fewshots.data_fewshot
)

human_template = fewshots.data_human_template
data_human_prompt = ChatPromptTemplate.from_template(human_template).format(input=query, master_output=master_output)

with console.status("[bold blue]Generating data output..."):
    data_output = llm.invoke([data_system_prompt, data_human_prompt]).content

# Display data output
console.print(Panel(data_output, title="Data Agent Output", border_style="blue"))

In [None]:
data_output = extract_api_json(data_output)

with console.status("[bold blue]Processing API calls..."):
    parent_result = execute_parent_calls_func(form_parent_calls_func(data_output))
    valid_coins = load_valid_coins_func(mobula_coins_file, lunarcrush_coins_file)
    top_tokens = get_top_tokens_func(parent_result, variables.avoid_tokens, top_n=5)

# Display results
console.print(Panel(
    Syntax(json.dumps(parent_result, indent=2), "json"),
    title="API Results",
    border_style="cyan"
))

In [None]:
data_output = update_api_calls_func(data_output, top_tokens)
nested_response = execute_nested_calls_func(data_output)
analysis_data = [top_tokens, nested_response]

with console.status("[bold blue]Generating analysis..."):
    analysis_system_prompt = SystemMessagePromptTemplate.from_template(analysis_system_template).format()
    human_message_prompt = HumanMessagePromptTemplate.from_template(human_template).format(input=query, analysis_data=analysis_data)
    analysis = llm.invoke([analysis_system_prompt, human_message_prompt]).content

# Display analysis
console.print(Panel(analysis, title="Analysis Results", border_style="magenta"))

In [None]:
# Generate tweet and response
with console.status("[bold blue]Generating tweet and response..."):
    tweet_prompt = ChatPromptTemplate.from_template(tweet_template).format(analysis=analysis, input=query)
    tweet = llm.invoke(tweet_prompt).content

    response_prompt = ChatPromptTemplate.from_template(response_template).format(analysis=analysis, input=query)
    response = llm.invoke(response_prompt).content

# Display results
console.print(Panel(tweet, title="Generated Tweet", border_style="yellow"))
console.print(Panel(response, title="Generated Response", border_style="red"))