In [None]:
import os
import time
import requests
import json

from dotenv import load_dotenv
from crewai_tools import tool

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from bs4 import BeautifulSoup

from pydantic.v1 import BaseModel, Field

from crewai import Task, Agent, Crew
from textwrap import dedent

# If you have these custom modules:
from langchain_mistralai import ChatMistralAI
from langchain_openai import ChatOpenAI

In [None]:
class Config:
    """
    A configuration class that fetches environment variables.
    """
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
    MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY")
    SERPER_API_KEY = os.getenv("SERPER_API_KEY")
    LINKEDIN_EMAIL = os.getenv("LINKEDIN_EMAIL")
    LINKEDIN_PASSWORD = os.getenv("LINKEDIN_PASSWORD")
    LINKEDIN_PROFILE_NAME = os.getenv("LINKEDIN_PROFILE_NAME")
    ACCESS_TOKEN = os.getenv("ACCESS_TOKEN")

# -----------------------------
### Utilities & Exceptions
# -----------------------------

In [None]:
class LinkedinToolException(Exception):
    """
    Custom exception used when LinkedIn credentials are not provided in env variables.
    """
    def __init__(self):
        super().__init__("You need to set the LINKEDIN_EMAIL and LINKEDIN_PASSWORD env variables")


In [None]:
def parse_html_content(page_source: str):
    """
    Parses the page source HTML of a LinkedIn profile and filters 
    the containers that contain post information.
    """
    linkedin_soup = BeautifulSoup(page_source.encode("utf-8"), "lxml")
    containers = linkedin_soup.find_all("div", {"class": "feed-shared-update-v2"})
    containers = [container for container in containers 
                  if 'activity' in container.get('data-urn', '')]
    return containers


def get_post_content(container, selector, attributes):
    """
    Retrieves the text content from a specific HTML element 
    within a container.
    """
    try:
        element = container.find(selector, attributes)
        if element:
            return element.text.strip()
    except Exception as e:
        print(f"Error extracting post content: {e}")
    return ""


def get_linkedin_posts(page_source: str):
    """
    Uses parse_html_content to identify relevant containers, then 
    extracts the post text from each container.
    """
    containers = parse_html_content(page_source)
    posts = []
    for container in containers:
        post_content = get_post_content(container, "div", {"class": "update-components-text"})
        posts.append(post_content)
    return posts


### -----------------------------
#### LinkedIn Scraper Tool
### -----------------------------

In [None]:
def scrape_linkedin_posts_fn() -> str:
    """
    A function that logs into LinkedIn using credentials from environment 
    variables, scrolls through a profile's posts, and returns the posts.
    """
    linkedin_username = os.environ.get("LINKEDIN_EMAIL")
    linkedin_password = os.environ.get("LINKEDIN_PASSWORD")
    linkedin_profile_name = os.environ.get("LINKEDIN_PROFILE_NAME")

    if not (linkedin_username and linkedin_password):
        raise LinkedinToolException()

    # Initialize WebDriver (make sure chromedriver is installed and in PATH)
    browser = webdriver.Chrome()
    browser.get("https://www.linkedin.com/login")

    # Perform login
    username_input = browser.find_element("id", "username")
    password_input = browser.find_element("id", "password")
    username_input.send_keys(linkedin_username)
    password_input.send_keys(linkedin_password)
    password_input.send_keys(Keys.RETURN)

    # Wait for page to load
    time.sleep(3)

    # Navigate to the profile's "Recent Activity"
    browser.get(f"https://www.linkedin.com/in/{linkedin_profile_name}/recent-activity/all/")

    # Scroll to load more posts
    for _ in range(2):
        browser.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)

    # Extract posts
    posts = get_linkedin_posts(browser.page_source)
    browser.quit()
    return str(posts[:5])

In [None]:
@tool("ScrapeLinkedinPosts")
def scrape_linkedin_posts_tool() -> str:
    """
    A tool that can be used to scrape LinkedIn posts.
    """
    return scrape_linkedin_posts_fn()

##### -----------------------------
##### LinkedIn Poster Tool
##### -----------------------------

In [None]:
class LinkedinPersonalFeedPoster:
    """
    Posts text updates directly to a personal LinkedIn feed using the LinkedIn Marketing/Share APIs.
    """

    def __init__(self, access_token: str):
        self.access_token = access_token
        self.headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        }
        self.user_id = self._get_user_id()

    def _get_user_id(self) -> str:
        """
        Calls the LinkedIn 'me' endpoint to retrieve and return the person's ID/URN.
        """
        url = "https://api.linkedin.com/v2/userinfo"
        response = requests.get(url, headers=self.headers)
        print("User Info Response:", response.text)
        jsonData = json.loads(response.text)
        return jsonData["sub"]

    def create_post(self, text: str) -> dict:
        """
        Creates a post on the personal feed of the user whose access token we're using.
        """
        url = "https://api.linkedin.com/v2/ugcPosts"

        # A minimal text-only payload
        payload = {
            "author": f"urn:li:person:{self.user_id}",
            "lifecycleState": "PUBLISHED",
            "specificContent": {
                "com.linkedin.ugc.ShareContent": {
                    "shareCommentary": {"text": text},
                    "shareMediaCategory": "NONE"
                }
            },
            "visibility": {
                "com.linkedin.ugc.MemberNetworkVisibility": "PUBLIC"
            }
        }

        resp = requests.post(url, headers=self.headers, data=json.dumps(payload))
        resp.raise_for_status()
        return resp.json()

In [None]:
text = "Hello, this is a test post from CrewAI!"
access_token = os.getenv("ACCESS_TOKEN")

In [None]:
poster = LinkedinPersonalFeedPoster(access_token)
result = poster.create_post(text)

In [None]:
@tool("PostToLinkedInPersonalFeed")
def post_to_linkedin_personal_feed_tool(text: str) -> str:
    """
    A tool that posts text to a personal LinkedIn feed. 
    The `access_token` is read from environment variables.
    """
    access_token = Config.ACCESS_TOKEN
    if not access_token:
        raise ValueError("No ACCESS_TOKEN found in environment.")

    poster = LinkedinPersonalFeedPoster(access_token)
    result = poster.create_post(text)
    return str(result)

#### -----------------------------
#### LLM Setup
#### -----------------------------

In [None]:
openai_llm = ChatOpenAI(api_key=Config.OPENAI_API_KEY, model="gpt-3.5-turbo-0125")
mistral_llm = ChatMistralAI(api_key=Config.MISTRAL_API_KEY, model="mistral-large-latest")

# Example Tools
from crewai_tools import ScrapeWebsiteTool, SerperDevTool
scrape_website_tool = ScrapeWebsiteTool()
search_tool = SerperDevTool()

#### -----------------------------
#### Agents & Tasks
#### -----------------------------

In [None]:
linkedin_scraper_agent = Agent(
    role="LinkedIn Post Scraper",
    goal="Your goal is to scrape a LinkedIn profile to get a list of posts from the given profile",
    tools=[scrape_linkedin_posts_tool],
    backstory=dedent(
        """
        You are an experienced programmer who excels at web scraping.
        """
    ),
    verbose=True,
    allow_delegation=False,
    llm=openai_llm
)

# Task: Scrape LinkedIn
scrape_linkedin_task = Task(
    description=dedent("Scrape a LinkedIn profile to get some relevant posts."),
    expected_output=dedent("A list of LinkedIn posts obtained from a LinkedIn profile."),
    agent=linkedin_scraper_agent,
)

In [None]:
web_researcher_agent = Agent(
    role="Web Researcher",
    goal="Your goal is to search for relevant content about the comparison between Llama 2 and Llama 3",
    tools=[scrape_website_tool, search_tool],
    backstory=dedent(
        """
        You are proficient at searching for specific topics on the web, 
        selecting those that provide more value and information.
        """
    ),
    verbose=True,
    allow_delegation=False,
    llm=openai_llm
)

# Task: Web research
web_research_task = Task(
    description=dedent("Get valuable and high quality web information about the comparison between Llama 2 and Llama 3."),
    expected_output=dedent("High quality information about Llama 2 vs Llama 3."),
    agent=web_researcher_agent,
)

In [None]:
doppelganger_agent = Agent(
    role="LinkedIn Post Creator",
    goal="You will create a LinkedIn post comparing Llama 2 and Llama 3 following the writing style observed in the LinkedIn posts scraped by the LinkedIn Post Scraper.",
    backstory=dedent(
        """
        You are an expert in writing LinkedIn posts replicating influencer style.
        """
    ),
    verbose=True,
    allow_delegation=False,
    llm=mistral_llm
)

# Task: Create LinkedIn post based on scraped data + research
create_linkedin_post_task = Task(
    description=dedent(
        "Create a LinkedIn post comparing Llama 2 and Llama 3 following the style found in the scraped LinkedIn posts."
    ),
    expected_output=dedent(
        "A high-quality and engaging LinkedIn post comparing Llama 2 and Llama 3 "
        "in the style of the scraped posts."
    ),
    agent=doppelganger_agent,
)

In [None]:
linkedin_poster_agent = Agent(
    role="LinkedIn Poster",
    # We do not reference {text} here because it's not defined yet
    goal="Your goal is to take the final post text created by the Doppelganger agent and post it to LinkedIn.",
    # Pass the tool reference (function), not a function call
    tools=[post_to_linkedin_personal_feed_tool],
    backstory=dedent(
        """
        You are an intern who posts on behalf of the profile owner.
        """
    ),
    verbose=True,
    allow_delegation=False,
    llm=openai_llm
)

# Task: Post the LinkedIn content from the Doppelganger agent
post_linkedin_task = Task(
    description=dedent(
        """
        Take the final post from the Doppelganger agent and post it to LinkedIn 
        using the 'PostToLinkedInPersonalFeed' tool.
        """
    ),
    expected_output=dedent("A confirmation that the LinkedIn post was successfully made."),
    agent=linkedin_poster_agent,
)

In [None]:
# Provide context to the final post creation task (optional chaining)
create_linkedin_post_task.context = [scrape_linkedin_task, web_research_task]
# Also connect the final post creation to the posting task
post_linkedin_task.context = [create_linkedin_post_task]

#### -----------------------------
#### Crew and Pipeline
#### -----------------------------

In [None]:
from crewai.telemetry import Telemetry

def noop(*args, **kwargs):
    print("Telemetry method called and noop'd\n")
    pass

for attr in dir(Telemetry):
    if callable(getattr(Telemetry, attr)) and not attr.startswith("__"):
    setattr(Telemetry, attr, noop)
    


In [None]:
import os
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool

os.environ["OTEL_SDK_DISABLED"] = "true"

crew = Crew(
    agents=[
        linkedin_scraper_agent,
        web_researcher_agent,
        doppelganger_agent,
        linkedin_poster_agent
    ],
    tasks=[
        scrape_linkedin_task,
        web_research_task,
        create_linkedin_post_task,
        post_linkedin_task
    ]
)

In [None]:
# Kick off the pipeline
result = crew.kickoff()

print("Pipeline Result:")
print(result)