In [None]:
# Cell: Deep dive into universe ranking implementation

import uuid
import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, List, Tuple, Optional
from pydantic import BaseModel, Field
import boto3
from pathlib import Path

# Step 0: Define our data models
class UniverseRankingRequest(BaseModel):
    """Parameters for ranking stocks in a universe."""
    session_id: str = Field(..., description="Session ID for retrieving stored data")
    start_date: Optional[str] = Field(description="Start date in ISO format (YYYY-MM-DD)")
    end_date: Optional[str] = Field(description="End date in ISO format (YYYY-MM-DD)")
    metric: str = Field(description="Metric to rank by ('performance' or 'technical_indicator')")
    sort: str = Field(description="Direction of ranking ('top' or 'bottom')")
    length: int = Field(description="Number of results to return")
    indicator_type: Optional[str] = Field(None, description="Type of technical indicator ('SMA', 'golden_cross', etc.)")

class UniverseRankingResponse(BaseModel):
    """Response from ranking function."""
    status: str = Field(..., description="Processing status (success/error)")
    path: str = Field(..., description="S3 path to the processed data")
    summary: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Summary statistics including ranked results and date range"
    )

# Step 1-3: Same as before - get tickers and load returns
session_id = uuid.uuid4().hex
nl_query = "List the top 10 performing stocks in the S&P 500 over the past year"

screen_resp = screening_query_agent.query(nl_query)
tickers = extract_tickers(screen_resp)
print("Step 1: Identified tickers:", tickers)

pricing_req = PricingRequest(mode="default", tickers=tickers)
returns_resp = await get_returns_data(pricing_req)
df = pd.DataFrame(returns_resp["data"])
print("Step 2: Fetched returns_data, shape:", df.shape)

await session_manager.initialize_session(uuid.UUID(session_id))
saved = session_manager.save_dataframe(df, session_id, "returns_data")
print("Step 3: returns_data saved?", saved)

# Step 4: Implement the core ranking functions
def rank_by_performance(df: pd.DataFrame,
                       tickers: List[str],
                       sort: str,
                       length: int) -> pd.DataFrame:
    """Rank tickers by their cumulative return over the period."""
    # Compute cumulative return for each ticker: (1 + r1)(1 + r2)...(1 + rn) - 1
    cumulative_returns = (1 + df[tickers]).prod() - 1
    
    # Create DataFrame with results
    result_df = pd.DataFrame({
        'ticker': cumulative_returns.index,
        'total_return': cumulative_returns.values
    })
    
    # Sort (ascending=False for 'top', True for 'bottom')
    result_df = result_df.sort_values(
        'total_return', 
        ascending=(sort == 'bottom')
    ).head(length)
    
    return result_df

def rank_by_technical_indicator(df: pd.DataFrame,
                              tickers: List[str],
                              sort: str,
                              length: int,
                              indicator_type: str,
                              sma_window: int = 50,
                              sma_long: int = 200) -> Tuple[pd.DataFrame, str]:
    """Rank tickers by technical indicators (SMA or Golden Cross)."""
    
    if indicator_type.lower() == 'sma':
        # Compute ratio of price to SMA
        sma = df[tickers].rolling(window=sma_window).mean()
        last_values = df[tickers].iloc[-1]
        sma_values = sma.iloc[-1]
        indicator_values = last_values / sma_values
        value_col = 'price_to_sma'
        
    elif indicator_type.lower() == 'golden_cross':
        # Compute ratio of short-term to long-term SMA
        sma_short = df[tickers].rolling(window=sma_window).mean()
        sma_long = df[tickers].rolling(window=sma_long).mean()
        indicator_values = sma_short.iloc[-1] / sma_long.iloc[-1]
        value_col = 'sma_ratio'
    
    # Create results DataFrame
    result_df = pd.DataFrame({
        'ticker': indicator_values.index,
        value_col: indicator_values.values
    })
    
    # Sort and return top/bottom n
    result_df = result_df.sort_values(
        value_col,
        ascending=(sort == 'bottom')
    ).head(length)
    
    return result_df, value_col

async def universe_ranking_function(request: UniverseRankingRequest) -> UniverseRankingResponse:
    """Main function to rank stocks based on performance or technical indicators."""
    
    # 1. Extract and validate parameters
    session_id = request.session_id
    metric = request.metric
    sort = request.sort
    length = request.length
    indicator_type = request.indicator_type
    
    if metric not in ['performance', 'technical_indicator']:
        raise ValueError("Metric must be 'performance' or 'technical_indicator'")
    if sort not in ['top', 'bottom']:
        raise ValueError("Sort must be 'top' or 'bottom'")
    if length <= 0:
        raise ValueError("Length must be positive")
    if metric == 'technical_indicator' and not indicator_type:
        raise ValueError("indicator_type required for technical_indicator metric")
    
    # 2. Load returns data from session
    session_path = Path("sessions") / session_id
    returns_data_path = session_path / "returns_data.parquet"
    
    if not returns_data_path.exists():
        raise ValueError(f"No returns data found for session {session_id}")
    
    df = pd.read_parquet(returns_data_path)
    
    # 3. Validate and prepare DataFrame
    if "date" not in df.columns:
        raise ValueError("Date column missing in returns data")
    
    available_tickers = [col for col in df.columns if col != "date"]
    if not available_tickers:
        raise ValueError("No ticker columns found in returns data")
    
    # Ensure datetime
    if not pd.api.types.is_datetime64_any_dtype(df["date"]):
        df["date"] = pd.to_datetime(df["date"])
    
    # 4. Filter by date range if specified
    if request.start_date:
        df = df[df["date"] >= pd.to_datetime(request.start_date)]
    if request.end_date:
        df = df[df["date"] <= pd.to_datetime(request.end_date)]
    
    if len(df) < 2:
        raise ValueError("Insufficient data after filtering")
    
    # 5. Apply ranking metric
    if metric == 'performance':
        result_df = rank_by_performance(df, available_tickers, sort, length)
        value_col = 'total_return'
    else:  # technical_indicator
        result_df, value_col = rank_by_technical_indicator(
            df, available_tickers, sort, length, indicator_type)
    
    # 6. Create summary
    summary = {
        "start_date": df["date"].min().strftime('%Y-%m-%d'),
        "end_date": df["date"].max().strftime('%Y-%m-%d'),
        "metric": metric,
        "indicator_type": indicator_type if metric == 'technical_indicator' else None,
        "sort": sort,
        "length": length,
        "results": result_df.to_dict(orient="records")
    }
    
    # 7. Save results to S3
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    result_filename = f"universe_ranking_{metric}_{indicator_type or ''}_{sort}_{length}_{timestamp}.json"
    s3_path = f"users/{session_id}/processed/{result_filename}"
    
    # Create result JSON
    result_json = {
        "data": result_df.replace({np.nan: None}).to_dict(orient="records"),
        "metadata": {
            "metric": metric,
            "indicator_type": indicator_type if metric == 'technical_indicator' else None,
            "sort": sort,
            "length": length,
            "start_date": request.start_date,
            "end_date": request.end_date,
            "generated_at": datetime.now().isoformat(),
            "row_count": len(result_df),
            "session_id": session_id
        }
    }
    
    # Save to S3
    s3_client = boto3.client('s3',
                         region_name='us-east-1',
                         aws_access_key_id=AWS_ACCESS_KEY_ID,
                         aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
    
    s3_client.put_object(
        Bucket=AWS_S3_BUCKET,
        Key=s3_path,
        Body=json.dumps(result_json, default=str),
        ContentType='application/json'
    )
    
    # Save local copy
    session_path.mkdir(parents=True, exist_ok=True)
    local_path = session_path / result_filename
    with open(local_path, 'w') as f:
        json.dump(result_json, f, default=str)
    
    return UniverseRankingResponse(
        status="success",
        path=f"s3://{AWS_S3_BUCKET}/{s3_path}",
        summary=summary
    )

# Step 5: Create the returns analysis agent that knows about this function
def create_returns_analysis_agent(session_id: str):
    """Create an agent that can handle ranking and exposure analysis."""
    today = datetime.today().strftime('%Y-%m-%d')
    return Agent(
        name="returns_analysis_agent",
        tools=[universe_ranking_function, universe_exposure_function],
        output_type=ReturnsAgentResponse,
        model="gpt-4o-mini",
        instructions=f"""You analyze returns data and rank instruments by performance or analyze their exposure to benchmarks.

You have two main functions:
1. universe_ranking_function - ranks by performance or technical indicators
2. universe_exposure_function - analyzes correlation with benchmarks

Today is {today}. Always use session_id: {session_id}

For RANKING queries:
1. Determine if about performance or technical indicators
2. For performance:
   - metric="performance"
   - Calculate date range from query
   - sort="top" for best performers, "bottom" for worst
3. For technical indicators:
   - metric="technical_indicator"
   - Set indicator_type to "SMA" or "golden_cross"
4. Call universe_ranking_function with UniverseRankingRequest
5. Map response to ReturnsAgentResponse format

Example ranking query:
"What are the top 10 performing stocks YTD?"
→ ReturnsAgentResponse(
     status="success",
     path="s3://...",
     universe_rankings={{
       "AAPL": 0.234,  # performance value
       "MSFT": 0.187,
       ...
     }})

Always include:
- status: "success" or "error"
- path: S3 path from function
- universe_rankings: dict mapping tickers to values"""
    )

# Step 6: Run the agent with our query
agent = create_returns_analysis_agent(session_id)
result = await Runner.run(agent, nl_query)

# Step 7: Display results
func_out = result.function_results[0]
print("\nResults:")
print("S3 Path:", func_out.path)
print("Analysis Period:", func_out.start_date, "→", func_out.end_date)
print("\nTop Performers:")
rankings_df = pd.DataFrame([
    {"ticker": k, "return": v} 
    for k, v in func_out.universe_rankings.items()
]).sort_values("return", ascending=False)
print(rankings_df)