In [1]:
# Install required packages for multi-agent system
import subprocess
import sys

# Core dependencies
packages = [
    "openai",
    "mcp",
    "linkedin-api",
    "requests",
    "python-dotenv",
    "pandas",
    "numpy",
    "pydantic",
    "httpx",
    "aiohttp",
    "networkx",
    "beautifulsoup4",
    "langgraph",
    "langchain-openai",
    "langchain-core",
    "tqdm",
    "structlog",
]

print("Installing multi-agent system dependencies...")
for package in packages:
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package, "-q"])
        print(f" {package}")
    except Exception as e:
        print(f" {package}: {e}")

print("\n Multi-Agent LAIE System Dependencies Ready!")

Installing multi-agent system dependencies...
 openai
 mcp
 linkedin-api
 requests
 python-dotenv
 pandas
 numpy
 pydantic
 httpx
aiohttp
 networkx
 beautifulsoup4
 langgraph
 langchain-openai
 langchain-core
 tqdm
 structlog

 Multi-Agent LAIE System Dependencies Ready!


In [8]:
import os
import json 
import asyncio 
from datetime import datetime , timedelta
from typing import Annotated, Any, Optional, List , Dict , TypedDict, Literal, Union
from dataclasses import dataclass, field, asdict
from pathlib import Path
import hashlib
from collections import defaultdict
from enum import Enum
import logging
import structlog 

#reuse data models 
from pydantic import BaseModel, Field
from enum import Enum

# LangGraph imports
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langchain_openai import AzureChatOpenAI

#load environment variables 
from dotenv import load_dotenv
load_dotenv(Path("../.env"))







False

In [2]:
# Configure logging 
logging.basicConfig(level=logging.INFO)
logger = structlog.get_logger(__name__)

In [3]:


#Azure openai config 
AZURE_OPENAI_ENDPOINT = os.getenv("AI_FOUNDRY_PROJECT_ENDPOINT")
AZURE_OPENAI_API_KEY = os.getenv("AI_FOUNDRY_API_KEY")
AZURE_OPENAI_DEPLOYMENT = os.getenv("AI_FOUNDRY_DEPLOYMENT_NAME", "gpt-4.1")
AZURE_OPENAI_API_VERSION = os.getenv("AI_FOUNDRY_API_VERSION", "2024-12-01-preview")


#linkedin data sources 
LINKEDIN_EMAIL = os.getenv("LINKEDIN_EMAIL")
LINKEDIN_PASSWORD = os.getenv("LINKEDIN_PASSWORD")
LINKEDIN_LI_AT = os.getenv("LINKEDIN_LI_AT")
PROXYCURL_API_KEY = os.getenv("PROXYCURL_API_KEY")

#Analysis Time window 
ANALYSIS_START_DATE = datetime(2025, 1, 1)
ANALYSIS_END_DATE = datetime(2026, 1, 1)

In [4]:
llm = AzureChatOpenAI(
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
    api_key=AZURE_OPENAI_API_KEY,
    api_version=AZURE_OPENAI_API_VERSION,
    model=AZURE_OPENAI_DEPLOYMENT,
    temperature=0.3,
    max_tokens=2000
)

In [5]:
#define the state schema for langGraph 
class LAIEState(TypedDict):
    """State Schema for the LAIE multi-agent system."""
    #input parameters 
    public_id : str 
    data_sources : Dict[str, Any]

    # data collection results 
    raw_profile : Optional[Dict[str, Any]]
    raw_posts : Optional[List[Dict[str, Any]]]
    data_quality_score : float 

    # analytics results 
    monthly_analytics : Optional[List[Dict[str, Any]]]
    content_performance : Optional[Dict[str, Any]]
    temporal_patterns : Optional[Dict[str, Any]]

    # AI generated content 
    monthly_notes : Optional[List[Dict[str, Any]]]
    content_performance : Optional[Dict[str, Any]]
    temporal_patterns : Optional[Dict[str, Any]]

    # Agent Communication 
    messages : List[BaseMessage]
    current_agent : str 
    next_agent : Optional[str]

    # Error handling 
    errors : List[str]
    retry_count : int 

    #final output 
    final_report : Optional[Dict[str, Any]]
    audit_trail : List[Dict[str, Any]]



In [6]:
# define agent response types 
class AgentResponse(TypedDict):
    success : bool 
    data : Any 
    next_agent : Optional[str]
    errors : List[str]
    

In [7]:
#define monthly note structure 
class MonthlyNote(TypedDict):
    month : str 
    activity_summary : str 
    key_achievements : List[str]
    content_performance : Dict[str, Any]
    engagement_highlights : List[str]
    recommendations : List[str]
    ai_insights : str 
    


In [9]:
#Enums 
class ContentType(Enum):
    TEXT = "text"
    IMAGE = "image"
    VIDEO = "video"
    ARTICLE = "article"
    CAROUSEL = "carousel"
    POLL = "poll"
    DOCUMENT = "document"


class EngagementType(Enum):
    LIKE = "like"
    COMMENT = "comment"
    REPOST = "repost"
    IMPRESSION = "impression"

#profile models
class LinkedInProfile(BaseModel):
    user_id : str 
    full_name : str 
    headline : str 
    followers_count : int = Field(default=0)
    connections_count : int = Field(default=0)
    industry : Optional[str] = Field(default=None)
    location : Optional[str] = Field(default=None)
    about : Optional[str] = Field(default=None)

class LinkedInPost(BaseModel):
    post_id : str 
    user_id : str 
    content : str 
    content_type : ContentType 
    published_at : datetime 
    likes_count : int = Field(default=0)
    comments_count : int = Field(default=0)
    reposts_count : int = Field(default=0)
    impressions : int = Field(default=0)

class MonthlyActivity(BaseModel):
    user_id : str 
    month : str 
    posts_count : int = Field(default=0)
    total_impressions : int = Field(default=0)
    total_likes : int = Field(default=0)
    engagement_rate : float = Field(default=0.0)
    content_types : Dict[str, int] = Field(default_factory=dict)

In [10]:
class IngestionAgent:
    """Agent responsible for collecting LinkedIn data from various sources."""
    
    def __init__(self):
        self.audit_log = []
        logger.info("IngestionAgent initialized")
    
    def process(self, state: LAIEState) -> AgentResponse:
        """Process data ingestion for the given LinkedIn profile."""
        public_id = state["public_id"]
        data_sources = state["data_sources"]
        
        logger.info("IngestionAgent processing", public_id=public_id)
        
        try:
            # Attempt data collection
            profile, posts = self._collect_data(public_id, data_sources)
            
            # Validate data quality
            quality_score = self._assess_data_quality(profile, posts)
            
            # Convert to dict format for state
            profile_dict = profile.dict() if hasattr(profile, 'dict') else profile
            posts_list = [post.dict() if hasattr(post, 'dict') else post for post in posts]
            
            response = AgentResponse(
                success=True,
                data={
                    "profile": profile_dict,
                    "posts": posts_list,
                    "quality_score": quality_score
                },
                message=f"Successfully collected data for {public_id}",
                next_agent="analytics",
                errors=[]
            )
            
            self._log_action(f"Data ingestion completed for {public_id}")
            
        except Exception as e:
            logger.error("IngestionAgent failed", error=str(e))
            response = AgentResponse(
                success=False,
                data=None,
                message=f"Data ingestion failed: {str(e)}",
                next_agent=None,
                errors=[str(e)]
            )
        
        return response
    
    def _collect_data(self, public_id: str, data_sources: Dict[str, Any]) -> tuple:
        """Collect data from available sources."""
        # Priority: GDPR export > Proxycurl > linkedin-api
        
        if data_sources.get("gdpr_export"):
            return self._parse_gdpr_export(data_sources["gdpr_export"], public_id)
        
        if data_sources.get("proxycurl_api_key"):
            return self._fetch_proxycurl_data(public_id, data_sources["proxycurl_api_key"])
        
        if data_sources.get("linkedin_credentials"):
            return self._fetch_linkedin_api_data(public_id, data_sources["linkedin_credentials"])
        
        raise ValueError("No valid data source provided")
    
    def _parse_gdpr_export(self, zip_path: str, public_id: str) -> tuple:
        """Parse GDPR export (simplified implementation)."""
        # In production, this would parse actual GDPR export
        # For demo, return mock data
        profile = LinkedInProfile(
            user_id=public_id,
            full_name="Demo User",
            headline="Professional Title",
            followers_count=1000,
            connections_count=500
        )
        
        # Generate mock posts for the analysis period
        posts = []
        current_date = ANALYSIS_START_DATE
        post_id = 0
        
        while current_date < ANALYSIS_END_DATE:
            # Add 2-5 posts per month
            posts_this_month = []
            num_posts = 2 + (hash(public_id + current_date.strftime("%Y-%m")) % 4)
            
            for i in range(num_posts):
                post_date = current_date + timedelta(days=i * 7)  # Spread posts
                if post_date >= ANALYSIS_END_DATE:
                    break
                
                posts.append(LinkedInPost(
                    post_id=f"post_{post_id}",
                    user_id=public_id,
                    content=f"Sample LinkedIn post content for {post_date.strftime('%B %Y')}",
                    content_type=ContentType.TEXT,
                    published_at=post_date,
                    likes_count=10 + (hash(str(post_id)) % 90),
                    comments_count=1 + (hash(str(post_id + 1)) % 9),
                    reposts_count=0 + (hash(str(post_id + 2)) % 5),
                    impressions=100 + (hash(str(post_id + 3)) % 900)
                ))
                post_id += 1
            
            # Move to next month
            if current_date.month == 12:
                current_date = current_date.replace(year=current_date.year + 1, month=1)
            else:
                current_date = current_date.replace(month=current_date.month + 1)
        
        return profile, posts
    
    def _fetch_proxycurl_data(self, public_id: str, api_key: str) -> tuple:
        """Fetch data from Proxycurl API."""
        import requests
        
        try:
            # Proxycurl API endpoint for profile data
            url = "https://nubela.co/proxycurl/api/v2/linkedin"
            headers = {
                'Authorization': f'Bearer {api_key}'
            }
            params = {
                'url': f'https://www.linkedin.com/in/{public_id}',
                'fallback_to_cache': 'on-error'
            }
            
            logger.info("Fetching profile data from Proxycurl", public_id=public_id)
            response = requests.get(url, headers=headers, params=params, timeout=30)
            response.raise_for_status()
            
            data = response.json()
            
            # Extract profile information
            profile = LinkedInProfile(
                user_id=public_id,
                full_name=data.get('full_name', f'User {public_id}'),
                headline=data.get('headline', data.get('occupation', 'Professional')),
                followers_count=data.get('follower_count', 0),
                connections_count=data.get('connections', 0),
                industry=data.get('industry'),
                location=data.get('city', {}).get('full') if data.get('city') else None,
                about=data.get('summary')
            )
            
            # Proxycurl doesn't provide posts data, return empty list
            posts = []
            self._log_action(f"Successfully fetched Proxycurl data for {public_id}")
            
            return profile, posts
            
        except requests.exceptions.RequestException as e:
            logger.error("Proxycurl API error", error=str(e))
            raise ValueError(f"Failed to fetch Proxycurl data: {str(e)}")
        except Exception as e:
            logger.error("Proxycurl data processing error", error=str(e))
            raise ValueError(f"Failed to process Proxycurl data: {str(e)}")
    
    def _fetch_linkedin_api_data(self, public_id: str, credentials: Dict) -> tuple:
        """Fetch data using linkedin-api."""
        try:
            from linkedin_api import Linkedin
            
            # Initialize LinkedIn client
            email = credentials.get("email") or os.getenv("LINKEDIN_EMAIL")
            password = credentials.get("password") or os.getenv("LINKEDIN_PASSWORD")
            li_at = credentials.get("li_at") or os.getenv("LINKEDIN_LI_AT")
            
            if li_at:
                # Use li_at cookie for authentication
                api = Linkedin("", "", cookies={"li_at": li_at})
            elif email and password:
                # Use email/password authentication
                api = Linkedin(email, password)
            else:
                raise ValueError("No valid LinkedIn credentials provided")
            
            logger.info("Fetching profile data from LinkedIn API", public_id=public_id)
            
            # Get profile data
            profile_data = api.get_profile(public_id)
            
            # Extract profile information
            profile = LinkedInProfile(
                user_id=public_id,
                full_name=profile_data.get("firstName", "") + " " + profile_data.get("lastName", "") if profile_data.get("firstName") else f"User {public_id}",
                headline=profile_data.get("headline", "Professional"),
                followers_count=profile_data.get("followerCount", 0),
                connections_count=profile_data.get("connectionsCount", 0),
                industry=profile_data.get("industryName"),
                location=profile_data.get("locationName"),
                about=profile_data.get("summary")
            )
            
            # Get posts/activity data
            logger.info("Fetching posts data from LinkedIn API", public_id=public_id)
            urn_id = profile_data.get("public_id") or public_id
            
            # Get recent posts (last 365 days)
            posts_data = api.get_profile_posts(urn_id, post_count=50)  # Get up to 50 recent posts
            posts = []
            
            for post_item in posts_data.get("elements", []):
                post = post_item.get("update", {}).get("share", {})
                if not post:
                    continue
                
                # Extract post information
                post_id = str(post.get("urn", "").split(":")[-1])
                content = post.get("text", {}).get("text", "")
                published_at_str = post.get("created", {}).get("time")
                
                # Convert timestamp to datetime
                if published_at_str:
                    try:
                        published_at = datetime.fromtimestamp(int(published_at_str) / 1000)
                    except:
                        published_at = ANALYSIS_START_DATE
                else:
                    published_at = ANALYSIS_START_DATE
                
                # Skip posts outside analysis window
                if published_at < ANALYSIS_START_DATE or published_at >= ANALYSIS_END_DATE:
                    continue
                
                # Get engagement metrics
                social_counts = post.get("socialDetail", {}).get("totalSocialActivityCounts", {})
                likes_count = social_counts.get("numLikes", 0)
                comments_count = social_counts.get("numComments", 0)
                reposts_count = social_counts.get("numShares", 0)
                impressions = social_counts.get("numImpressions", 0)
                
                # Determine content type
                content_type = ContentType.TEXT
                if post.get("content", {}).get("images"):
                    content_type = ContentType.IMAGE
                elif post.get("content", {}).get("videos"):
                    content_type = ContentType.VIDEO
                
                posts.append(LinkedInPost(
                    post_id=post_id,
                    user_id=public_id,
                    content=content,
                    content_type=content_type,
                    published_at=published_at,
                    likes_count=likes_count,
                    comments_count=comments_count,
                    reposts_count=reposts_count,
                    impressions=impressions
                ))
            
            self._log_action(f"Successfully fetched LinkedIn API data for {public_id}: {len(posts)} posts")
            
            return profile, posts
            
        except ImportError as e:
            logger.error("linkedin-api package not installed", error=str(e))
            raise ValueError("linkedin-api package required for LinkedIn API data fetching")
        except Exception as e:
            logger.error("LinkedIn API error", error=str(e))
            raise ValueError(f"Failed to fetch LinkedIn API data: {str(e)}")
    

In [12]:
class AnalyticsAgent:
    """Agent responsible for performing deterministic analytics on LinkedIn data."""
    
    def __init__(self):
        self.audit_log = []
        logger.info("AnalyticsAgent initialized")
    
    def process(self, state: LAIEState) -> AgentResponse:
        """Process analytics on the collected LinkedIn data."""
        logger.info("AnalyticsAgent processing")
        
        try:
            # Extract data from state
            profile_data = state.get("raw_profile")
            posts_data = state.get("raw_posts", [])
            
            if not profile_data or not posts_data:
                raise ValueError("Insufficient data for analytics")
            
            # Convert to model objects
            profile = LinkedInProfile(**profile_data)
            posts = [LinkedInPost(**post_data) for post_data in posts_data]
            
            # Perform analytics
            monthly_analytics = self._compute_monthly_analytics(profile, posts)
            content_performance = self._compute_content_performance(posts)
            temporal_patterns = self._compute_temporal_patterns(posts)
            
            response = AgentResponse(
                success=True,
                data={
                    "monthly_analytics": [ma.dict() if hasattr(ma, 'dict') else ma for ma in monthly_analytics],
                    "content_performance": content_performance,
                    "temporal_patterns": temporal_patterns
                },
                message="Analytics computation completed successfully",
                next_agent="monthly_analysis",
                errors=[]
            )
            
            self._log_action("Analytics computation completed")
            
        except Exception as e:
            logger.error("AnalyticsAgent failed", error=str(e))
            response = AgentResponse(
                success=False,
                data=None,
                message=f"Analytics computation failed: {str(e)}",
                next_agent=None,
                errors=[str(e)]
            )
        
        return response
    
    def _compute_monthly_analytics(self, profile: LinkedInProfile, posts: List[LinkedInPost]) -> List[MonthlyActivity]:
        """Compute monthly activity analytics."""
        monthly_data = defaultdict(lambda: {
            "posts_count": 0,
            "total_impressions": 0,
            "total_likes": 0,
            "total_comments": 0,
            "total_reposts": 0,
            "content_types": defaultdict(int)
        })
        
        for post in posts:
            month_key = post.published_at.strftime("%Y-%m")
            month_data = monthly_data[month_key]
            
            month_data["posts_count"] += 1
            month_data["total_impressions"] += post.impressions
            month_data["total_likes"] += post.likes_count
            month_data["total_comments"] += post.comments_count
            month_data["total_reposts"] += post.reposts_count
            month_data["content_types"][post.content_type.value] += 1
        
        # Convert to MonthlyActivity objects
        monthly_activities = []
        for month_key, data in sorted(monthly_data.items()):
            total_engagements = data["total_likes"] + data["total_comments"] + data["total_reposts"]
            engagement_rate = total_engagements / data["total_impressions"] if data["total_impressions"] > 0 else 0
            
            monthly_activities.append(MonthlyActivity(
                user_id=profile.user_id,
                month=month_key,
                posts_count=data["posts_count"],
                total_impressions=data["total_impressions"],
                total_likes=data["total_likes"],
                engagement_rate=engagement_rate,
                content_types=dict(data["content_types"])
            ))
        
        return monthly_activities
    
    def _compute_content_performance(self, posts: List[LinkedInPost]) -> Dict[str, Any]:
        """Compute content type performance analytics."""
        if not posts:
            return {}
        
        content_stats = defaultdict(lambda: {
            "count": 0,
            "total_impressions": 0,
            "total_engagements": 0,
            "avg_impressions": 0,
            "avg_engagements": 0,
            "engagement_rate": 0
        })
        
        for post in posts:
            ct = post.content_type.value
            stats = content_stats[ct]
            
            stats["count"] += 1
            stats["total_impressions"] += post.impressions
            stats["total_engagements"] += post.likes_count + post.comments_count + post.reposts_count
        
        # Calculate averages
        for ct, stats in content_stats.items():
            if stats["count"] > 0:
                stats["avg_impressions"] = stats["total_impressions"] / stats["count"]
                stats["avg_engagements"] = stats["total_engagements"] / stats["count"]
                stats["engagement_rate"] = stats["total_engagements"] / stats["total_impressions"] if stats["total_impressions"] > 0 else 0
        
        # Find best performing type
        best_type = max(content_stats.items(), key=lambda x: x[1]["avg_impressions"]) if content_stats else None
        
        return {
            "content_stats": dict(content_stats),
            "best_performing_type": best_type[0] if best_type else None,
            "total_posts_analyzed": len(posts)
        }
    
    def _compute_temporal_patterns(self, posts: List[LinkedInPost]) -> Dict[str, Any]:
        """Compute temporal posting patterns."""
        if not posts:
            return {}
        
        # Analyze posting patterns
        posts_by_weekday = defaultdict(int)
        posts_by_hour = defaultdict(int)
        posts_by_month = defaultdict(int)
        
        for post in posts:
            posts_by_weekday[post.published_at.weekday()] += 1
            posts_by_hour[post.published_at.hour] += 1
            posts_by_month[post.published_at.strftime("%Y-%m")] += 1
        
        # Calculate posting consistency
        total_days = (ANALYSIS_END_DATE - ANALYSIS_START_DATE).days
        active_days = len(set(post.published_at.date() for post in posts))
        posting_consistency = active_days / total_days if total_days > 0 else 0
        
        # Find optimal posting times
        best_weekday = max(posts_by_weekday.items(), key=lambda x: x[1])[0] if posts_by_weekday else None
        best_hour = max(posts_by_hour.items(), key=lambda x: x[1])[0] if posts_by_hour else None
        
        weekday_names = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
        
        return {
            "posting_consistency": posting_consistency,
            "active_days": active_days,
            "total_days": total_days,
            "best_posting_weekday": weekday_names[best_weekday] if best_weekday is not None else None,
            "best_posting_hour": best_hour,
            "posts_by_month": dict(posts_by_month),
            "avg_posts_per_day": len(posts) / total_days if total_days > 0 else 0
        }
    
    def _log_action(self, action: str):
        """Log agent actions."""
        timestamp = datetime.utcnow().isoformat()
        self.audit_log.append({"timestamp": timestamp, "action": action, "agent": "analytics"})
        print(f" AnalyticsAgent: {action}")



In [14]:
# Initialize analytics agent
analytics_agent = AnalyticsAgent()


2026-01-20 23:27:40 [info     ] AnalyticsAgent initialized    


In [None]:
#monthly analysis agent 
class MonthlyAnalysisAgent:
    """Agent responsible for creating detailed month-wise activity analysis using AI """
    def __init__(self):
        self.audit_log = []
        logger.info("MonthlyAnalysisAgent initialized")

    def process(self, state : LAIEState) -> AgentResponse:
        """Process monthly analysis on the collected LinkedIn data."""
        logger.info("MonthlyAnalysisAgent processing")
        
        try : 
            monthly_analytics = state.get("monthly_analytics",[])
            profile_data = state.get("raw_profile",{})

            if not monthly_analytics:
                raise ValueError("No monthly analytics data available")

            monthly_notes = []
            for month_data in monthly_analytics:
                note = self._generate_monthly_note(month_data, profile_data)
                monthly_notes.append(note)
            
            response = AgentResponse(
                success=True,
                data=monthly_notes,
                message=f"Generated {len(monthly_notes)} monthly activity notes",
                next_agent="summary",
                
            )
            self._log_action("Monthly analysis completed")
            
        except Exception as e:
            logger.error("MonthlyAnalysisAgent failed", error=str(e))
            response = AgentResponse(
                success=False,
                data=None,  
                message=f"Monthly analysis failed: {str(e)}",
                next_agent=None,
                errors=[str(e)]
            )
        
        return response

    def _generate_montly_note(self, month_data : Dict[str, Any], profile_data : Dict[str,Any]) -> MonthlyNote:
        """Generate a detailed monthly note using AI."""
        month = month_data.get("month")
        posts_count = month_data.get("posts_count")
        impressions = month_data.get("total_impressions",0)
        likes = month_data.get("total_likes",0)
        engagement_rate = month_data.get("engagement_rate",0)
        content_types  = month_data.get("content_types",{})
        profile_name = profile_data.get("full_name","")

        