In [1]:
! pip install langchain db-sqlite3 chromadb llama_index beautifulsoup4 requests

In [1]:
import requests
from bs4 import BeautifulSoup
import sqlite3
from datetime import datetime

import sqlite3
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from langchain_core.prompts import PromptTemplate
from langchain.chains import LLMChain

import random

In [1]:
import requests
from bs4 import BeautifulSoup
import sqlite3
from datetime import datetime

class ScrapingAgent:
    def __init__(self, db_path):
        self.db_path = db_path
    
    def scrape_product(self, product_url):
        headers = {"User-Agent": "Mozilla/5.0"}
        response = requests.get(product_url, headers=headers)
        soup = BeautifulSoup(response.text, 'html.parser')

        # Extract product details (modify as per Amazon's structure)
        title = soup.find("span", {"id": "productTitle"}).get_text(strip=True)
        price = soup.find("span", {"class": "a-price-whole"}).get_text(strip=True)

        return {"title": title, "price": price, "url": product_url}
    
    def save_to_db(self, product_data):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        # Create table if not exists
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY,
                title TEXT,
                price TEXT,
                url TEXT,
                last_updated TEXT
            )
        ''')
        
        # Insert or update product
        cursor.execute('''
            INSERT OR REPLACE INTO products (id, title, price, url, last_updated)
            VALUES ((SELECT id FROM products WHERE url = ?), ?, ?, ?, ?)
        ''', (product_data["url"], product_data["title"], product_data["price"], product_data["url"], datetime.now()))
        
        conn.commit()
        conn.close()
    
    def run(self, product_url):
        product_data = self.scrape_product(product_url)
        self.save_to_db(product_data)
        return product_data


In [4]:
class QueryAgent:
    def __init__(self, db_path, scraping_agent):
        self.db_path = db_path
        self.scraping_agent = scraping_agent
    
    def query_product(self, product_url):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("SELECT * FROM products WHERE url = ?", (product_url,))
        product = cursor.fetchone()
        conn.close()

        if product:
            return {"title": product[1], "price": product[2], "url": product[3], "last_updated": product[4]}
        else:
            # Trigger Scraping Agent if product is not in DB
            return self.scraping_agent.run(product_url)

In [5]:
class PredictionAgent:
    def __init__(self, db_path):
        self.db_path = db_path
    
    def predict_price_trend(self, product_url):
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()

        cursor.execute("SELECT price, last_updated FROM products WHERE url = ?", (product_url,))
        data = cursor.fetchall()
        conn.close()

        # Example: Simple trend analysis (implement ML model here)
        if len(data) >= 2:
            recent_price = float(data[-1][0].replace(",", ""))
            previous_price = float(data[-2][0].replace(",", ""))
            trend = "increasing" if recent_price > previous_price else "decreasing"
            return f"The price trend is {trend}."
        else:
            return "Not enough data for trend prediction."


In [8]:
import requests

class NotificationAgent:
    def __init__(self, telegram_bot_token, chat_id):
        self.bot_token = telegram_bot_token
        self.chat_id = chat_id
    
    def send_notification(self, message):
        # url = f"https://api.telegram.org/bot{self.bot_token}/sendMessage"
        url = ""
        data = {"chat_id": self.chat_id, "text": message}
        requests.post(url, data=data)


In [9]:
from chromadb.utils import embedding_functions
from chromadb import Client

class RecommendationAgent:
    def __init__(self):
        self.client = Client()
        self.embedding_fn = embedding_functions.SentenceTransformerEmbeddingFunction()
    
    def recommend(self, query):
        # Example: Dummy recommendations (implement real embeddings)
        return ["Recommended Product 1", "Recommended Product 2"]


In [None]:
if __name__ == "__main__":
    DB_PATH = "products.db"
    TELEGRAM_BOT_TOKEN = "your_bot_token"
    CHAT_ID = "your_chat_id"
    
    # Initialize agents
    scraper = ScrapingAgent(DB_PATH)
    query = QueryAgent(DB_PATH, scraper)
    predictor = PredictionAgent(DB_PATH)
    notifier = NotificationAgent(TELEGRAM_BOT_TOKEN, CHAT_ID)
    recommender = RecommendationAgent()
    
    # Example product URL
    # product_url = "https://www.amazon.in/dp/example-product-id"
    product_url = "https://www.amazon.in/boAt-BassHeads-100-Headphones-Black/dp/B071Z8M4KX"
    
    # Run agents
    scraper.run(product_url)
    product_data = query.query_product(product_url)
    trend = predictor.predict_price_trend(product_url)
    notifier.send_notification(f"Product: {product_data['title']} | Price: {product_data['price']} | Trend: {trend}")
    recommendations = recommender.recommend(product_data['title'])
    print("Recommendations:", recommendations)


In [None]:
import sqlite3

# Database connection
db_path = "products.db"  # Path to your SQLite database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()

# SQL command to create the products table
create_table_query = """
CREATE TABLE IF NOT EXISTS products (
    product_id INTEGER PRIMARY KEY , -- Auto-incrementing unique ID
    name TEXT NOT NULL,                          -- Product name
    image_urls TEXT,                             -- List of image URLs (comma-separated)
    video_urls TEXT,                             -- List of video URLs (comma-separated)
    offer_details TEXT,                          -- Offer details as text
    description TEXT                             -- Product description
);
"""

# Execute the SQL command
cursor.execute(create_table_query)
conn.commit()

# Close the connection
conn.close()

print("Table 'products' created successfully.")


In [None]:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()

In [None]:
tables

In [19]:
import sqlite3
from langchain.llms import OpenAI
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from langchain_core.prompts import PromptTemplate
from langchain.chains import LLMChain

class MarketplaceAgent:
    def __init__(self, db_path, llm):
        self.db_path = db_path
        self.llm = llm

    def llm_call(self, prompt):
        """Call the LLM with a given prompt."""
        try:
            print(prompt)
            response = self.llm(prompt)
            print(response)
            return response
        except Exception as e:
            return f"Error during LLM call: {e}"

    # def scrape_product(self, product_url):
    #     """Use LLM to extract product details from a URL."""
    #     prompt = f"""
    #     You are an expert assistant. Analyze the following Amazon product URL and provide the title, price, and other metadata:

    #     URL: {product_url}
    #     Response format: {{ "title": "Product Title", "price": "123.45", "url": "{product_url}" }}
    #     """
    #     response = self.llm_call(prompt)
    #     try:
    #         return eval(response)  # Convert LLM response into a dictionary
    #     except Exception as e:
    #         return {"error": f"Failed to parse LLM response: {e}"}

    def scrape_product(self, product_url):
        """Fetch product details by scraping and using the LLM for structured output."""
        try:
            # Retrieve the webpage content
            response = requests.get(product_url, headers={"User-Agent": "Mozilla/5.0"})
            response.raise_for_status()
            soup = BeautifulSoup(response.content, "html.parser")

            # Remove all tags
            for tag in soup.find_all(True):  # True matches all tags
                tag.decompose()

            # Get the text without tags
            clean_text = soup.get_text()
            # Convert HTML to string for LLM processing
            # html_data = str(soup)
            class HTMLData(BaseModel):
                product_id: str = Field(description="Unique identifier for the product")
                product_name: str = Field(description="The product's name or title")
                image_urls: list = Field(description="URLs of all images associated with the product")
                video_urls: list = Field(description="URLs of any videos associated with the product")
                offer_details: str = Field(description="Pricing, discounts, or promotions")
                description: str = Field(description="Detailed text describing the product")
            
            parser = PydanticOutputParser(pydantic_object = HTMLData)
            html_data = clean_text.strip()
            print("Html data: ", html_data)
            template = """
            You are a data extraction model. Your job is to analyze the provided HTML content of an Amazon product page and extract the following details. If any field is not available, return an empty value as specified below.

                1. Product ID: The unique identifier for the product. If not found, return an empty string.
                2. Name: The product's name or title. If not found, return an empty string.
                3. Image URLs: A list of URLs to all images associated with the product. If not found, return an empty list `[]`.
                4. Video URLs: A list of URLs to any videos associated with the product. If not found, return an empty list `[]`.
                5. Offer Details: The pricing, discounts, or promotions related to the product. If not found, return an empty string.
                6. Description: A detailed textual description of the product. If not found, return an empty string.

                HTML Content:
                {html_data}

                format_instructions:
                {format_instructions}
            """
            format_instructions = parser.get_format_instructions()
            prompt = PromptTemplate(
                template=template,
                input_type=["html_data"],
                partial_variables={"format_instructions": format_instructions},
            )
            # llm_response = self.llm_call(prompt)
            # chain = prompt | self.llm_call | parser
            # llm_response = chain.invoke({"html_data": html_data})
            chain = LLMChain(llm=self.llm, prompt=prompt)
            raw_output = chain.run({"html_data": html_data})
            llm_response = parser.parse(raw_output)

            # Attempt to parse the LLM's response
            # try:
            return eval(llm_response)
            # except Exception as parse_error:
            #     print(f"Response: {llm_response}")
            #     return {"error": f"Parsing LLM response failed: {parse_error}", "llm_response": llm_response}

        except requests.exceptions.RequestException as http_error:

            return {"error": f"HTTP request failed: {http_error}"}
        except Exception as general_error:
            return {"error": f"An error occurred: {general_error}"}

    def query_database(self, sql_query):
        """Execute the SQL query on the database."""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        try:
            cursor.execute(sql_query)
            result = cursor.fetchall()
            conn.close()
            return result
        except Exception as e:
            conn.close()
            return {"error": str(e)}

    def generate_sql(self, user_query):
        """Convert user query into an SQL statement using LLM."""
        # Extract database schema
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        tables = cursor.fetchall()
        schema = {}

        for table in tables:
            table_name = table[0]
            cursor.execute(f"PRAGMA table_info({table_name});")
            schema[table_name] = [col[1] for col in cursor.fetchall()]

        conn.close()

        # Create LLM prompt for SQL generation
        prompt = f"""
        You are an expert SQL assistant. Based on the following database schema, generate a valid SQL query:
        
        Schema: {schema}
        User Query: "{user_query}"
        SQL Query:
        """
        return self.llm_call(prompt)

    def handle_user_query(self, user_query):
        """Main handler for user queries."""
        if user_query.startswith("http"):
            # Treat as a product URL
            print("https:")
            return self.scrape_product(user_query)
        else:
            # Convert question to SQL
            sql_query = self.generate_sql(user_query)
            return {"sql_query": sql_query, "result": self.query_database(sql_query)}


In [18]:
from langchain_ollama.llms import OllamaLLM
from langchain_ollama import ChatOllama
from langchain_ollama import OllamaEmbeddings


class RealLLM:
    def __init__(self, model_name):
        self.llm = ChatOllama(model=model_name)

In [30]:

def scrape_product(product_url):
    """Fetch product details by scraping and using the LLM for structured output."""
    try:
        USER_AGENTS = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
        ]

        headers = {
            "User-Agent": random.choice(USER_AGENTS),
            "Accept-Language": "en-US,en;q=0.9",
            "Referer": "https://www.google.com/",
            "Accept-Encoding": "gzip, deflate",
            "DNT": "1",
            "Connection": "keep-alive",
        }

        # Retrieve the webpage content
        response = requests.get(product_url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, "html.parser")

        # Remove all tags
        # for tag in soup.find_all(True):  # True matches all tags
        #     tag.decompose()

        # Get the text without tags
        clean_text = soup.get_text()
        # Convert HTML to string for LLM processing
        # html_data = str(soup)
        class HTMLData(BaseModel):
            product_id: str = Field(description="Unique identifier for the product")
            product_name: str = Field(description="The product's name or title")
            image_urls: list = Field(description="URLs of all images associated with the product")
            video_urls: list = Field(description="URLs of any videos associated with the product")
            offer_details: str = Field(description="any Pricing, discounts, or promotions")
            description: str = Field(description="Detailed text describing the product")
        
        parser = PydanticOutputParser(pydantic_object = HTMLData)
        html_data = clean_text.strip()
        print("Html data: ", html_data)
        template = """
        You are a data extraction model. Your job is to analyze the provided HTML content of an Amazon product page and extract the following details. If any field is not available, return an empty value as specified below.

            1. Product ID: The unique identifier for the product. If not found, return an empty string.
            2. Name: The product's name or title. If not found, return an empty string.
            3. Image URLs: A list of URLs to all images associated with the product. If not found, return an empty list `[]`.
            4. Video URLs: A list of URLs to any videos associated with the product. If not found, return an empty list `[]`.
            5. Offer Details: The pricing, discounts, or promotions related to the product. If not found, return an empty string.
            6. Description: A detailed textual description of the product. If not found, return an empty string.

            **HTML Content**:
            {html_data}

            Examples:
            { "product_id": "12345", "product_name": "Product Name", "image_urls": ["url1", "url2"], "video_urls": ["url1", "url2"], "offer_details": "Offer details", "description": "Product description" }
            
            Return only the required fields in the following JSON format:
            **Required JSON format**:
            format_instructions:
            {format_instructions}
        """
        format_instructions = parser.get_format_instructions()
        prompt = PromptTemplate(
            template=template,
            input_type=["html_data"],
            partial_variables={"format_instructions": format_instructions},
        )
        # llm_response = self.llm_call(prompt)
        # chain = prompt | self.llm_call | parser
        # llm_response = chain.invoke({"html_data": html_data})
        # chain = LLMChain(llm=llm, prompt=prompt)
        chain = prompt | llm
        raw_output = chain.invoke({"html_data": soup})
        print("Raw output: ", raw_output)
        llm_response = parser.parse(raw_output)

        print("Response: ", llm_response)
        
        # Attempt to parse the LLM's response
        # try:
        return eval(llm_response)
        # except Exception as parse_error:
        #     print(f"Response: {llm_response}")
        #     return {"error": f"Parsing LLM response failed: {parse_error}", "llm_response": llm_response}

    except requests.exceptions.RequestException as http_error:

        return {"error": f"HTTP request failed: {http_error}"}
    except Exception as general_error:
        return {"error": f"An error occurred: {general_error}"}

In [None]:
# model_name = "deepseek-r1:8b"
model_name = "llama-3.2"
RealLLM_obj = RealLLM(model_name)
llm = RealLLM_obj.llm
# Create an instance of MarketplaceAgent
# agent = MarketplaceAgent(db_path, RealLLM_obj.llm)

# Example 1: Scrape a product URL
product_url = "https://www.amazon.in/Daikin-Inverter-Display-Technology-MTKL50U/dp/B0BK1KS6ZD/?th=1"
product_data = scrape_product(product_url)
print("Product Data:", product_data)

In [None]:
if __name__ == "__main__":
    # Initialize LLM and Database Path
    # llm = OpenAI(model="gpt-3.5-turbo", temperature=0.0)
    db_path = "products.db"  # Path to your SQLite database

    # model_name = "deepseek-r1:8b"
    model_name = "llama-3.2"
    RealLLM_obj = RealLLM(model_name)
    # Create an instance of MarketplaceAgent
    agent = MarketplaceAgent(db_path, RealLLM_obj.llm)

    # Example 1: Scrape a product URL
    product_url = "https://www.amazon.in/boAt-BassHeads-100-Headphones-Black/dp/B071Z8M4KX?th=1"
    product_data = agent.handle_user_query(product_url)
    print("Product Data:", product_data)

    # Example 2: User query for database search
    user_query = "Show all products priced below 1000."
    query_result = agent.handle_user_query(user_query)
    print("Query Result:", query_result)

In [28]:
# product_url = "https://www.amazon.in/boAt-BassHeads-100-Headphones-Black/dp/B071Z8M4KX?th=1"
# response = requests.get(product_url, headers={"User-Agent": "Mozilla/5.0"})

soup = BeautifulSoup(response.content, "html.parser")

# # Remove all tags
# for tag in soup.find_all(True):  # True matches all tags
#     tag.decompose()

In [None]:
print(soup.get_text().replace("\n\n", "\n").strip())

In [None]:
import requests
import sqlite3
import json
import random
from bs4 import BeautifulSoup
from langchain_ollama import ChatOllama
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel, Field

import logging
logging.basicConfig(filename='scraper_errors.log', level=logging.ERROR)

# Initialize LLM
class RealLLM:
    def __init__(self, model_name):
        self.llm = ChatOllama(model=model_name)


model_name = "llama3.2:latest"
RealLLM_obj = RealLLM(model_name)
llm = RealLLM_obj.llm

# Scrape Product Details Tool
# @tool
def scrape_product_details(page_url: str):
    """
    Scrapes product details from a product page URL and returns a JSON object.
    """
    try:
        USER_AGENTS = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
        ]

        headers = {
            "User-Agent": random.choice(USER_AGENTS),
            "Accept-Language": "en-US,en;q=0.9",
            "Referer": "https://www.google.com/",
            "Accept-Encoding": "gzip, deflate",
            "DNT": "1",
            "Connection": "keep-alive",
        }

        # Retrieve the webpage content
        response = requests.get(product_url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, "html.parser")

        
        product_name = soup.find("span", id="productTitle")
        price = soup.find("span", class_="a-price-whole")
        offer = soup.find("span", class_="savingsPercentage")
        rating = soup.find("span", class_="a-size-base a-color-base")
        brand = soup.find("tr", class_="po-brand")
        category = "Face Makeup"  # Assuming category since not explicitly in HTML
        description_list = soup.select("#feature-bullets ul li span.a-list-item")
        description = soup.find("div", id="productDescription")
        image = soup.find("img", id="landingImage")

        url_parts = product_url.split("/")
        product_id = url_parts[url_parts.index("dp") + 1] 
        data = {
            "product_id": product_id,
            "prpduct_url" : product_url,
            "product_name": product_name.get_text(strip=True) if product_name else None,
            "price": f"₹{price.get_text(strip=True)}" if price else None,
            "offer": offer.get_text(strip=True) if offer else None,
            "rating": float(rating.get_text(strip=True)) if rating else None,
            # "category": category,
            "brand": brand.find_all("td")[1].get_text(strip=True) if brand else None,
            "description1": ", ".join([desc.get_text(strip=True) for desc in description_list]) if description_list else None,
            "product_descripation" : description.get_text(strip=True) if description else None,
            "image_url": image["src"] if image else None
        }

        # print(json.dumps(data, indent=4, ensure_ascii=False))
        return data
    except Exception as e:
        logging.error(f"Error: {e}, URL: {product_url}")
        return {"error": f"An error occurred: {e}"}

# JSON Formatter Tool
def format_json(data: dict):
    """
    Formats a given dictionary into a properly formatted JSON string.
    """
    return json.dumps(data, indent=4)


def create_product_db():
    conn = sqlite3.connect("products.db")
    cursor = conn.cursor()
    
    # Create table if it doesn't exist
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS products (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        product_id TEXT,
        product_url TEXT,
        product_name TEXT,
        price TEXT,
        offer TEXT,
        rating REAL,
        brand TEXT,
        description1 TEXT,
        product_description TEXT,
        image_url TEXT
    )
    """)
    
    conn.commit()
    conn.close()

def save_to_db(product_data: dict):
    """
    Saves product details to an SQLite database.
    """
    try:
        conn = sqlite3.connect("products.db")
        cursor = conn.cursor()
        
        # Ensure table exists before inserting data
        create_product_db()
        
        cursor.execute("""
        INSERT INTO products (product_id, product_url, product_name, price, offer, rating, brand, description1, product_description, image_url)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            product_data.get("product_id"),
            product_data.get("product_url"),
            product_data.get("product_name"),
            product_data.get("price"),
            product_data.get("offer"),
            product_data.get("rating"),
            product_data.get("brand"),
            product_data.get("description1"),
            product_data.get("product_descripation"),
            product_data.get("image_url")
        ))
        
        conn.commit()
        conn.close()
        return {"status": "Product details saved successfully!"}
    except Exception as e:
        logging.error(f"Error: {e}, URL: {product_url}")
        return {"error": f"Database error: {e}"}


# Define LangChain Tools
tools = [
    Tool(
        name="ProductScraper",
        func=scrape_product_details,
        description="Scrapes product details from a given URL and returns a JSON object."
    ),
    Tool(
        name="JSONFormatter",
        func=format_json,
        description="Formats a given dictionary into a properly formatted JSON string."
    ),
    Tool(
        name="DatabaseSaver",
        func=save_to_db,
        description="Saves product details into a database."
    )
]

# 🔹 Initialize the Agent with AUTO Tool Selection
agent = initialize_agent(
    tools,
    llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,  # Ensures stepwise tool selection
    verbose=True
)


# Example Usage
product_url = "https://www.amazon.in/LAKM%C3%89-01-Beige-Coverage-30-Tinted-Moisturizer/dp/B01BBNF6C6"
result = agent.run(f"Get details about the product at this URL: {product_url}. Format as JSON and save to the database.")

# Print Final JSON Output
print(result)


In [None]:
result

In [None]:
tools

In [None]:
scrape_product_details(product_url)

In [21]:
# product_url = "https://www.amazon.in/boAt-BassHeads-100-Headphones-Black/dp/B071Z8M4KX"
product_url = "https://www.amazon.in/RENEE-Lumi-Glow-Highlighting-Pigmentation/dp/B0D22KD4VC/"
product_url = "https://www.amazon.in/LAKM%C3%89-01-Beige-Coverage-30-Tinted-Moisturizer/dp/B01BBNF6C6"

In [22]:
USER_AGENTS = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
            "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
        ]

headers = {
    "User-Agent": random.choice(USER_AGENTS),
    "Accept-Language": "en-US,en;q=0.9",
    "Referer": "https://www.google.com/",
    "Accept-Encoding": "gzip, deflate",
    "DNT": "1",
    "Connection": "keep-alive",
}

# Retrieve the webpage content
response = requests.get(product_url, headers=headers)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")

In [None]:
headers

In [24]:
# soup

In [None]:
from bs4 import BeautifulSoup
import random
import json

# Extracting product details
product_name = soup.find("span", id="productTitle")
price = soup.find("span", class_="a-price-whole")
offer = soup.find("span", class_="savingsPercentage")
rating = soup.find("span", class_="a-size-base a-color-base")
brand = soup.find("tr", class_="po-brand")
category = "Face Makeup"  # Assuming category since not explicitly in HTML
description_list = soup.select("#feature-bullets ul li span.a-list-item")
description = soup.find("div", id="productDescription")
image = soup.find("img", id="landingImage")

url_parts = product_url.split("/")
product_id = url_parts[url_parts.index("dp") + 1] 
data = {
    "product_id": product_id,
    "prpduct_url" : product_url,
    "product_name": product_name.get_text(strip=True) if product_name else None,
    "price": f"₹{price.get_text(strip=True)}" if price else None,
    "offer": offer.get_text(strip=True) if offer else None,
    "rating": float(rating.get_text(strip=True)) if rating else None,
    # "category": category,
    "brand": brand.find_all("td")[1].get_text(strip=True) if brand else None,
    "description1": ", ".join([desc.get_text(strip=True) for desc in description_list]) if description_list else None,
    "product_descripation" : description.get_text(strip=True) if description else None,
    "image_url": image["src"] if image else None
}

print(json.dumps(data, indent=4, ensure_ascii=False))

In [None]:
import requests
import sqlite3
import random
from bs4 import BeautifulSoup
from langchain.tools import Tool
from langchain.schema import AgentFinish
from langchain.agents import initialize_agent, AgentType
from langchain.memory import ConversationBufferMemory
from langchain_core.pydantic_v1 import BaseModel, Field
from typing import TypedDict, Annotated, List, Optional
from langgraph.graph import Graph, END
from IPython.display import display, Image as IPImage

import logging
logging.basicConfig(filename='scraper_errors.log', level=logging.ERROR)

# Initialize LLM
class RealLLM:
    def __init__(self, model_name):
        self.llm = ChatOllama(model=model_name)


model_name = "llama3.2:latest"
RealLLM_obj = RealLLM(model_name)
llm = RealLLM_obj.llm

# 🟢 Step 1: Define State Management
class ScraperState(TypedDict):
    homepage_url: str
    product_urls: Optional[List[str]] = []
    current_product: Optional[str] = None
    status: str = "INITIAL"  # Possible states: INITIAL, SCRAPING_URLS, SCRAPING_PRODUCTS, SAVING_TO_DB, DONE
    errors: Optional[List[str]] = []

state = ScraperState(homepage_url="https://www.amazon.in/s?k=laptops")

# 🟢 Step 2: User-Agent Headers
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
]

headers = {
    "User-Agent": random.choice(USER_AGENTS),
    "Accept-Language": "en-US,en;q=0.9",
    "Referer": "https://www.google.com/",
    "Accept-Encoding": "gzip, deflate",
    "DNT": "1",
    "Connection": "keep-alive",
}

# 🟢 Step 3: Scrape All Product URLs from Homepage
def scrape_product_urls(state: ScraperState):
    try:
        response = requests.get(state.homepage_url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, "html.parser")

        product_links = []
        for a_tag in soup.select("a[href*='/dp/']"):
            href = a_tag["href"]
            url_parts = href.split("/")
            product_id = url_parts[url_parts.index("dp") + 1] if "dp" in url_parts else None
            product_url = f"https://www.amazon.in/dp/{product_id}/"
            product_links.append(product_url)

        product_links = list(set(product_links))  # Remove duplicates
        state.product_urls = product_links
        state.status = "SCRAPING_PRODUCTS"
        # return f"Scraped {len(product_links)} product URLs."
        return state
    except Exception as e:
        state.errors.append(str(e))
        state.status = "DONE"
        # return f"Error scraping homepage: {e}"
        return state

# 🟢 Step 4: Scrape Product Details
def scrape_product_details(state: ScraperState):
    if not state.product_urls:
        # return "No product URLs found."
        state.status = "DONE"
        return state

    product_url = state.product_urls.pop(0)  # Process one URL at a time
    state.current_product = product_url

    try:
        response = requests.get(product_url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, "html.parser")

        product_name = soup.find("span", id="productTitle")
        price = soup.find("span", class_="a-price-whole")
        offer = soup.find("span", class_="savingsPercentage")
        rating = soup.find("span", class_="a-size-base a-color-base")
        brand = soup.find("tr", class_="po-brand")
        description_list = soup.select("#feature-bullets ul li span.a-list-item")
        description = soup.find("div", id="productDescription")
        image = soup.find("img", id="landingImage")

        product_id = product_url.split("/dp/")[1].split("/")[0]

        product_data = {
            "product_id": product_id,
            "product_url": product_url,
            "product_name": product_name.get_text(strip=True) if product_name else None,
            "price": f"₹{price.get_text(strip=True)}" if price else None,
            "offer": offer.get_text(strip=True) if offer else None,
            "rating": float(rating.get_text(strip=True)) if rating else None,
            "brand": brand.find_all("td")[1].get_text(strip=True) if brand else None,
            "description1": ", ".join([desc.get_text(strip=True) for desc in description_list]) if description_list else None,
            "product_description": description.get_text(strip=True) if description else None,
            "image_url": image["src"] if image else None,
        }

        state.status = "SAVING_TO_DB"
        return product_data
    except Exception as e:
        state.errors.append(str(e))
        # return {"error": f"Error scraping product: {e}"}
        return state

# 🟢 Step 5: Save Data to SQLite Database
def save_to_db(state: ScraperState, product_data: dict):
    try:
        conn = sqlite3.connect("products.db")
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS products (
                product_id TEXT PRIMARY KEY,
                product_url TEXT,
                product_name TEXT,
                price TEXT,
                offer TEXT,
                rating REAL,
                brand TEXT,
                description1 TEXT,
                product_description TEXT,
                image_url TEXT
            )
        """)
        
        cursor.execute("""
            INSERT INTO products (product_id, product_url, product_name, price, offer, rating, brand, description1, product_description, image_url)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(product_id) DO UPDATE SET 
                product_url = excluded.product_url,
                product_name = excluded.product_name,
                price = excluded.price,
                offer = excluded.offer,
                rating = excluded.rating,
                brand = excluded.brand,
                description1 = excluded.description1,
                product_description = excluded.product_description,
                image_url = excluded.image_url
        """, (
            product_data.get("product_id"),
            product_data.get("product_url"),
            product_data.get("product_name"),
            product_data.get("price"),
            product_data.get("offer"),
            product_data.get("rating"),
            product_data.get("brand"),
            product_data.get("description1"),
            product_data.get("product_description"),
            product_data.get("image_url"),
        ))

        conn.commit()
        conn.close()
        state.status = "SCRAPING_PRODUCTS" if state.product_urls else "DONE"
        return f"Product {product_data['product_id']} saved!"
    except Exception as e:
        state.errors.append(str(e))
        # return {"error": f"Database error: {e}"}
        return state

# 🟢 Step 6: Define LangChain Tools
scrape_urls_tool = Tool(name="Scrape Product URLs", func=scrape_product_urls, description="Scrape all product URLs from homepage.")
scrape_details_tool = Tool(name="Scrape Product Details", func=scrape_product_details, description="Scrape details of a product from a given URL.")
save_db_tool = Tool(name="Save Product to DB", func=save_to_db, description="Save product data to SQLite database.")

# 🟢 Step 7: Initialize AI Agent
memory = ConversationBufferMemory(memory_key="chat_history")

agent = initialize_agent(
    tools=[scrape_urls_tool, scrape_details_tool, save_db_tool],
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True,
    memory=memory,
)

# 🟢 Step 8: Execute the Workflow
print("🔄 Starting Scraping Process...")
agent.invoke("Scrape Product URLs", state)
# agent.invoke("Scrape Product URLs", state.dict())
while state.status == "SCRAPING_PRODUCTS":
    product_data = agent.invoke("Scrape Product Details", state)
    if "error" in product_data:
        print(product_data["error"])
    else:
        agent.invoke("Save Product to DB", state, product_data)

print("✅ Scraping Completed!")


In [None]:
# from langchain.workflow import Graph

# Define each scraping function as a node
workflow = Graph()

# Adding nodes for each step in the workflow
workflow.set_entry_point("scrape_product_urls")

workflow.add_node("scrape_product_urls", scrape_product_urls)
workflow.add_node("scrape_product_details", scrape_product_details)
workflow.add_node("save_to_db", save_to_db)

# Adding edges to define the order of execution
workflow.add_edge("scrape_product_urls", "scrape_product_details")

# Conditional edge for scraping details based on product URLs available
workflow.add_conditional_edges(
    "scrape_product_details",
    lambda state: "save_to_db" if state["product_urls"] else "END",  # Check if we have product URLs to scrape
    {
        "save_to_db": "save_to_db",  # Proceed to saving data if products are available
        "END": END  # End the process if no products are available
    }
)

# Optionally, add more conditional edges or other logic for error handling or retries
# Example: If there's an error in scraping, you can add a fallback to retry or log the error
workflow.add_conditional_edges(
    "scrape_product_urls",
    lambda state: "END" if len(state["errors"]) > 0 else "scrape_product_details",
    {
        "END": END,  # End if there are errors
        "scrape_product_details": "scrape_product_details",  # Proceed to next step if no errors
    }
)

# Optionally, add more nodes or conditional logic
# workflow.add_node("error_handling", error_handling_function)

# Final edge to indicate completion
workflow.add_edge("save_to_db", END)

# Compile the workflow
app = workflow.compile()


In [None]:
display(
    IPImage(
        app.get_graph().draw_mermaid_png(
        )
    )
)

In [None]:
# Generate the workflow diagram as a PNG file
workflow_diagram = app.get_graph().draw_mermaid_png()

# Save the image file
with open("workflow_diagram.png", "wb") as f:
    f.write(workflow_diagram)

print("Workflow diagram saved as workflow_diagram.png")


In [84]:
async def run_workflow(query: str):
    """Run the LangGraph workflow and display results."""
    initial_state = {
       "homepage_url": query,
        "product_urls": [],
        "current_product": None,
        "status": "INITIAL",
        "errors": [],
    }
    try:
        result = await app.ainvoke(initial_state)
        
        return result["formatted_results"]
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        return None
    

In [None]:
query = "https://www.amazon.in/s?srs=93797680031&bbn=93797680031&rh=p_72%3A1318476031&dc&ds=v1%3AtSB1tQ%2BflV1BQc6XbzK6p1izuWr0xV2adbDC6Zy6FeE&qid=1738386643&rnid=1318475031&ref=sr_nr_p_72_1"
print(await run_workflow(query))

In [68]:
def scrape_product_urls(homepage_url):
    try:
        USER_AGENTS = [
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36",
        ]

        headers = {
            "User-Agent": random.choice(USER_AGENTS),
            "Accept-Language": "en-US,en;q=0.9",
            "Referer": "https://www.google.com/",
            "Accept-Encoding": "gzip, deflate",
            "DNT": "1",
            "Connection": "keep-alive",
        }

        response = requests.get(homepage_url, headers=headers)
        response.raise_for_status()
        soup = BeautifulSoup(response.content, "html.parser")

        product_links = []
        for a_tag in soup.select("a[href*='/dp/']"):
            href = a_tag["href"]
            url_parts = href.split("/")
            product_id = url_parts[url_parts.index("dp") + 1] if "dp" in url_parts else None
            product_url = f"https://www.amazon.in/dp/{product_id}/"
            product_links.append(product_url)

        product_links = list(set(product_links))  # Remove duplicates
        state.product_urls = product_links
        state.status = "SCRAPING_PRODUCTS"
        return f"Scraped {len(product_links)} product URLs."
    except Exception as e:
        state.errors.append(str(e))
        state.status = "DONE"
        return f"Error scraping homepage: {e}"

In [69]:
homepage_url = "https://www.amazon.in/s?k=cosmetics+for+women&i=beauty&rh=n%3A1355016031&dc=&crid=ACTZFOKZB8Q6&nsdOptOutParam=true&qid=1738383717&rnid=1741387031&sprefix=cos%2Cbeauty%2C697&ref=sr_nr_p_36_0_0&low-price=&high-price=10000"
meg,product_url = scrape_product_urls(homepage_url)

In [None]:
meg

In [None]:
product_url

In [None]:
conn = sqlite3.connect("products.db")

query = "SELECT * FROM products"
cursor = conn.cursor()
cursor.execute(query)
result = cursor.fetchall()
conn.close()

print(len(result))

In [None]:
print(result)
print(len(result))

In [148]:
# delete table
conn = sqlite3.connect("products.db")
cursor = conn.cursor()
cursor.execute("DROP TABLE products")
conn.commit()
conn.close()


In [None]:
import sqlite3
import pandas as pd

conn = sqlite3.connect("products.db")
query = "SELECT * FROM products"
df = pd.read_sql_query(query, conn)
conn.close()
df.to_excel('products_data.xlsx', index=False)

print("Data has been saved to 'products_data.xlsx'")

In [None]:
from pandasai.llm.local_llm import LocalLLM
import pandas as pd
from pandasai import SmartDataframe
from pandas_ai 

model = LocalLLM(
    api_base="http://localhost:11434/v1",
    # model="llama3"
    model="llama3.2:latest"
)

In [11]:
uploaded_file = "products_data 2.xlsx"
data =  pd.read_excel(uploaded_file)

In [None]:
data.head(1)

In [None]:
# df=SmartDataframe(data,config={"llm":model})
# prompt = "iphone best offer list"
# df.chat(query=prompt,output_type=str)

In [21]:
from langchain.agents.agent_types import AgentType
from langchain_experimental.agents.agent_toolkits import create_pandas_dataframe_agent

import pandas as pd
# from langchain_openai import OpenAI

model_name = "llama3.2:latest"
RealLLM_obj = RealLLM(model_name)
llm = RealLLM_obj.llm
df = pd.read_csv(
    "https://raw.githubusercontent.com/pandas-dev/pandas/main/doc/data/titanic.csv"
)

In [None]:
agent = create_pandas_dataframe_agent(llm, df, verbose=True)

In [None]:
from graphviz import Digraph

# Create a new directed graph
dot = Digraph(comment="CrewAI Multi-Agent System")

# Define agents as nodes
dot.node('U', 'User Agent', shape='ellipse', style='filled', fillcolor='lightblue')
dot.node('W', 'Web Scraping Agent', shape='box', style='filled', fillcolor='lightgreen')
dot.node('D', 'Database Agent', shape='box', style='filled', fillcolor='lightyellow')
dot.node('S', 'Social Media Agent', shape='box', style='filled', fillcolor='lightpink')

# Define agent relationships
dot.edge('U', 'W', label="Scrape website data")
dot.edge('U', 'D', label="Query database")
dot.edge('U', 'S', label="Post to Twitter")
dot.edge('W', 'D', label="Store data in DB")
dot.edge('D', 'S', label="Retrieve data for social media")

# Save and render the graph
dot.render('crewai_agents_flow', format='png', cleanup=True)

# Show the graph
dot


In [None]:
class BaseAgent:
    def __init__(self, name):
        self.name = name
    
    def perform_task(self):
        pass  # Define the core function that all agents should execute

class SupervisorAgent(BaseAgent):
    def __init__(self, name, agents):
        super().__init__(name)
        self.agents = agents  # List of the 4 agents managed by the supervisor

    def manage_agents(self):
        for agent in self.agents:
            print(f"Managing agent: {agent.name}")
            agent.perform_task()  # Supervised task execution

    def add_agent(self, agent):
        self.agents.append(agent)
    
    def remove_agent(self, agent):
        self.agents.remove(agent)

class Agent1(BaseAgent):
    def perform_task(self):
        print(f"{self.name} is performing task 1.")
        
class Agent2(BaseAgent):
    def perform_task(self):
        print(f"{self.name} is performing task 2.")
        
class Agent3(BaseAgent):
    def perform_task(self):
        print(f"{self.name} is performing task 3.")
        
class Agent4(BaseAgent):
    def perform_task(self):
        print(f"{self.name} is performing task 4.")

# Creating the 4 agents
agent1 = Agent1(name="Agent1")
agent2 = Agent2(name="Agent2")
agent3 = Agent3(name="Agent3")
agent4 = Agent4(name="Agent4")

# Adding the 4 agents to the supervisor
supervisor = SupervisorAgent(name="SupervisorAgent", agents=[agent1, agent2, agent3, agent4])

# Supervisor manages the agents
supervisor.manage_agents()


In [None]:
from graphviz import Digraph

# Create a new directed graph
dot = Digraph(comment='CrewAI Agent Structure')

# Add nodes for SupervisorAgent and the 4 agents
dot.node('S', 'SupervisorAgent')
dot.node('A1', 'Agent1')
dot.node('A2', 'Agent2')
dot.node('A3', 'Agent3')
dot.node('A4', 'Agent4')

# Add edges to show the relationships (Supervisor -> Agents)
dot.edge('S', 'A1')
dot.edge('S', 'A2')
dot.edge('S', 'A3')
dot.edge('S', 'A4')

# Render and view the graph (optional: to a file, for example, 'agent_structure.pdf')
dot.render('agent_structure', format='png', cleanup=True)

# Show the graph
dot


In [14]:
class BaseAgent:
    def __init__(self, name, tools, description):
        self.name = name
        self.tools = tools  # List of tools available for the agent
        self.description = description  # Agent description
    
    def perform_task(self):
        pass  # Define the core function that all agents should execute
    
    def display_description(self):
        print(f"{self.name}: {self.description}")
    
    def display_tools(self):
        print(f"{self.name}'s Tools:")
        for tool in self.tools:
            print(f"- {tool.name}: {tool.description}")
            
class Tool:
    def __init__(self, name, description):
        self.name = name
        self.description = description  # Tool description
    
    def perform_tool_task(self):
        pass  # Define the core function of the tool
    
    def display_description(self):
        print(f"{self.name}: {self.description}")

# Define tools
tool1_A = Tool("Tool1_A", "This tool handles data analysis.")
tool2_A = Tool("Tool2_A", "This tool performs data cleanup.")
tool3_A = Tool("Tool3_A", "This tool visualizes results.")
tool4_A = Tool("Tool4_A", "This tool connects to external APIs.")
tool5_A = Tool("Tool5_A", "This tool performs reporting.")

# Define Agent1 with description and tools
agent1 = BaseAgent(
    "Agent1", 
    [tool1_A, tool2_A, tool3_A, tool4_A, tool5_A], 
    "Agent1 is responsible for data analysis tasks."
)

# Similarly for Agent2, Agent3, and Agent4 (define other tools and descriptions)
tool1_B = Tool("Tool1_B", "Tool for performance monitoring.")
tool2_B = Tool("Tool2_B", "Tool for error handling.")
tool3_B = Tool("Tool3_B", "Tool for cloud synchronization.")
tool4_B = Tool("Tool4_B", "Tool for system diagnostics.")
tool5_B = Tool("Tool5_B", "Tool for backup management.")

agent2 = BaseAgent(
    "Agent2", 
    [tool1_B, tool2_B, tool3_B, tool4_B, tool5_B], 
    "Agent2 handles system health and maintenance tasks."
)



In [None]:
from graphviz import Digraph

# Create a new directed graph
dot = Digraph('CrewAI_Agent_Structure')

# Set the colors
supervisor_color = '#dff0d8'  # Light Green
agent_color = '#ffcc00'  # Yellow
tool_color = '#a2d5f2'  # Light Blue

# Supervisor Node
dot.node('S', 'Supervisor Agent\n(Manages and supervises all agents)', style='filled', fillcolor=supervisor_color, shape='box')

# Subgraph for Agents
with dot.subgraph(name='cluster_agents') as agents:
    agents.attr(label='Agents', color='black')
    agents.node('A1', 'Agent 1\n(Data Analysis)', style='filled', fillcolor=agent_color, shape='box')
    agents.node('A2', 'Agent 2\n(System Maintenance)', style='filled', fillcolor=agent_color, shape='box')
    agents.node('A3', 'Agent 3\n(Operations & Automation)', style='filled', fillcolor=agent_color, shape='box')
    agents.node('A4', 'Agent 4\n(Monitoring & Alerts)', style='filled', fillcolor=agent_color, shape='box')

# Subgraph for Agent 1 Tools (Only showing one agent's tools as an example)
with dot.subgraph(name='cluster_tools_A1') as tools_A1:
    tools_A1.attr(label='Agent 1 Tools', color='blue')
    tools_A1.node('T1_A1', 'Tool 1\n(Data Analysis)', style='filled', fillcolor=tool_color, shape='ellipse')
    tools_A1.node('T2_A1', 'Tool 2\n(Data Cleanup)', style='filled', fillcolor=tool_color, shape='ellipse')
    tools_A1.node('T3_A1', 'Tool 3\n(Visualization)', style='filled', fillcolor=tool_color, shape='ellipse')
    tools_A1.node('T4_A1', 'Tool 4\n(API Connection)', style='filled', fillcolor=tool_color, shape='ellipse')
    tools_A1.node('T5_A1', 'Tool 5\n(Reporting)', style='filled', fillcolor=tool_color, shape='ellipse')

# Connecting Supervisor to Agents
dot.edge('S', 'A1')
dot.edge('S', 'A2')
dot.edge('S', 'A3')
dot.edge('S', 'A4')

# Connecting Agent 1 to its Tools
dot.edge('A1', 'T1_A1')
dot.edge('A1', 'T2_A1')
dot.edge('A1', 'T3_A1')
dot.edge('A1', 'T4_A1')
dot.edge('A1', 'T5_A1')

# Render and view the graph
dot.render('crew_ai_agents_with_subgraph', format='pdf', view=True)

# Print the source for debugging
dot

In [None]:
from graphviz import Digraph

# Create a new directed graph with labels and colors
dot = Digraph('CrewAI_Agent_Structure')

# Set the colors for agents and tools
agent_color = '#ffcc00'  # Yellow
tool_color = '#a2d5f2'  # Light Blue
supervisor_color = '#dff0d8'  # Light Green

# Supervisor Agent
dot.node('S', 'Supervisor Agent\n(Manages and supervises all agents)', style='filled', fillcolor=supervisor_color, shape='box')

# Agents with descriptions
dot.node('A1', 'Agent 1\n(Data Analysis)', style='filled', fillcolor=agent_color, shape='box')
dot.node('A2', 'Agent 2\n(System Maintenance)', style='filled', fillcolor=agent_color, shape='box')
dot.node('A3', 'Agent 3\n(Operations & Automation)', style='filled', fillcolor=agent_color, shape='box')
dot.node('A4', 'Agent 4\n(Monitoring & Alerts)', style='filled', fillcolor=agent_color, shape='box')

# Tools for Agent 1
dot.node('T1_A1', 'Tool 1\n(Data Analysis)', style='filled', fillcolor=tool_color, shape='ellipse')
dot.node('T2_A1', 'Tool 2\n(Data Cleanup)', style='filled', fillcolor=tool_color, shape='ellipse')
dot.node('T3_A1', 'Tool 3\n(Visualization)', style='filled', fillcolor=tool_color, shape='ellipse')
dot.node('T4_A1', 'Tool 4\n(API Connection)', style='filled', fillcolor=tool_color, shape='ellipse')
dot.node('T5_A1', 'Tool 5\n(Reporting)', style='filled', fillcolor=tool_color, shape='ellipse')

# Tools for Agent 2
dot.node('T1_A2', 'Tool 1\n(Monitoring)', style='filled', fillcolor=tool_color, shape='ellipse')
dot.node('T2_A2', 'Tool 2\n(Error Handling)', style='filled', fillcolor=tool_color, shape='ellipse')
dot.node('T3_A2', 'Tool 3\n(Cloud Sync)', style='filled', fillcolor=tool_color, shape='ellipse')
dot.node('T4_A2', 'Tool 4\n(Diagnostics)', style='filled', fillcolor=tool_color, shape='ellipse')
dot.node('T5_A2', 'Tool 5\n(Backup Management)', style='filled', fillcolor=tool_color, shape='ellipse')

# Connecting Supervisor to Agents
dot.edge('S', 'A1')
dot.edge('S', 'A2')
dot.edge('S', 'A3')
dot.edge('S', 'A4')

# Connecting Agents to Their Tools (Only Agent 1 Tools Visible by Default)
dot.edge('A1', 'T1_A1')
dot.edge('A1', 'T2_A1')
dot.edge('A1', 'T3_A1')
dot.edge('A1', 'T4_A1')
dot.edge('A1', 'T5_A1')

# Generate and display the graph
dot.render('crew_ai_agents', format='pdf', view=True)

# Print source for debugging
dot

In [None]:
import graphviz

# Create a directed graph
dot = graphviz.Digraph("CrewAI_Workflow", format="png")

# Super Agent
dot.node("CrewSupervisor", shape="doubleoctagon", style="filled", color="gold", label="🧠 CrewSupervisor")

# Subgraph 1: Task Management
with dot.subgraph(name="cluster_1") as sub1:
    sub1.attr(label="Task Agent", style="filled", color="lightgrey")
    sub1.node("TaskPlanner", shape="box")
    sub1.node("TaskAllocator", shape="box")
    sub1.edge("TaskPlanner", "TaskAllocator")

# Subgraph 2: LLM Execution
with dot.subgraph(name="cluster_2") as sub2:
    sub2.attr(label="LLM Agent", style="filled", color="lightblue")
    sub2.node("PromptGenerator", shape="box")
    sub2.node("LLMInvoker", shape="box")
    sub2.edge("PromptGenerator", "LLMInvoker")

# Subgraph 3: Data Handling
with dot.subgraph(name="cluster_3") as sub3:
    sub3.attr(label="Data Agent", style="filled", color="lightgreen")
    sub3.node("DatabaseManager", shape="box")
    sub3.node("DataUpdater", shape="box")
    sub3.edge("DatabaseManager", "DataUpdater")

# Subgraph 4: Monitoring Agent
with dot.subgraph(name="cluster_4") as sub4:
    sub4.attr(label="Monitoring Agent", style="filled", color="lightcoral")
    sub4.node("Logger", shape="box")
    sub4.node("AlertSystem", shape="box")
    sub4.edge("Logger", "AlertSystem")

# Super Agent connects to all other agents
dot.edge("CrewSupervisor", "TaskPlanner", label="Manage Tasks")
dot.edge("CrewSupervisor", "PromptGenerator", label="Control LLM")
dot.edge("CrewSupervisor", "DatabaseManager", label="Manage Data")
dot.edge("CrewSupervisor", "Logger", label="Monitor System")

# New Connection: TaskPlanner -> DataUpdater
dot.edge("TaskPlanner", "DataUpdater", label="Update Data")

# Render the graph
dot.render("crewai_superagent_workflow", view=True)

print("Graph saved as 'crewai_superagent_workflow.png'")


In [None]:
from crewai_tools import ScrapeWebsiteTool
from langchain.tools import we

# To enable scrapping any website it finds during it's execution
tool = ScrapeWebsiteTool()

# Initialize the tool with the website URL, 
# so the agent can only scrap the content of the specified website
tool = ScrapeWebsiteTool(website_url='https://www.amazon.in/dp/B0CWRZDGV1/')

# Extract the text from the site
text = tool.run()
print(text)

In [31]:
# Chess knowledge base
chess_knowledge = """
# Opening Principles
1. Essential Development Rules:
   - Don't move the same piece twice in the opening
   - Develop knights before bishops
   - Don't bring queen out too early
   - Castle within the first 7-8 moves
   - Control center with pawns (e4, d4, e5, d5)
   - Only make pawn moves that aid development

2. Common Opening Mistakes to Avoid:
   - Moving edge pawns (a,h) too early
   - Making too many pawn moves
   - Moving queen prematurely
   - Making pointless knight moves
   - Weakening king's position

3. Center Control Strategy:
   - Occupy center with pawns first (e4/d4 or e5/d5)
   - Support center pawns with minor pieces
   - Don't exchange center pawns without clear benefit
   - Maintain tension when advantageous

# Middlegame Strategy
1. King Safety Priority:
   - Complete castling before attacking
   - Maintain pawn shield in front of castled king
   - Watch for diagonal weaknesses
   - Don't advance pawns in front of castled king without purpose

2. Piece Coordination:
   - Connect rooks after castling
   - Place bishops on active diagonals
   - Establish knights on strong outposts
   - Create piece chains protecting each other
   - Coordinate pieces before launching attacks

3. Attack Prerequisites:
   - Ensure king safety first
   - Have more pieces in attacking zone
   - Control key squares around enemy king
   - Create weaknesses in enemy position
   - Don't attack without proper preparation

# Position Evaluation
1. Material Balance:
   - Consider piece values (P=1, N=3, B=3, R=5, Q=9)
   - Bishop pair is worth extra half-pawn
   - Knights strong in closed positions
   - Bishops strong in open positions

2. Positional Factors:
   - Pawn structure health
   - Piece activity and coordination
   - King safety assessment
   - Control of key squares and files
   - Development lead
   - Space advantage

3. Dynamic Elements:
   - Piece mobility
   - Attacking chances
   - Tactical opportunities
   - Pawn breaks
   - Piece coordination potential

# Common Tactical Patterns
1. Basic Tactics:
   - Fork: One piece attacks two
   - Pin: Piece can't move due to exposure
   - Skewer: Similar to pin but higher value piece in front
   - Discovery: Moving one piece reveals attack from another

2. Tactical Motifs:
   - Overloading: Piece defending too many squares
   - Deflection: Forcing piece away from defense
   - Clearance: Removing blocking piece
   - Interference: Blocking defensive piece

# Safety Checks Before Moving
1. Pre-Move Checklist:
   - Check all opponent's captures
   - Look for tactical threats
   - Consider opponent's best reply
   - Evaluate resulting position
   - Verify move aids overall plan

2. Position Maintenance:
   - Keep pieces protected
   - Maintain pawn structure
   - Watch diagonal weaknesses
   - Control key squares
   - Keep king safe
"""

In [32]:
import os
os.environ["HUGGINGFACEHUB_API_TOKEN"] = "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"

In [33]:
# Load Chess Knowledge Base (RAG)
def load_chess_knowledge():
    loader = TextLoader("chess_knowledge.txt")  # A document with chess theories
    # docs = loader.load()
    docs = [Document(page_content=chess_knowledge, metadata={"source": "chess_knowledge"})]
    # Split the document into chunks
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
    chunks = text_splitter.split_documents(docs)
    embeddings = HuggingFaceEmbeddings()
    # vectorstore = FAISS.from_documents(chunks, embeddings)
    vectorstore = Chroma.from_documents(chunks, embeddings)
    return vectorstore.as_retriever()

In [None]:
from crewai import Crew, Agent, Task
from langchain_community.llms import HuggingFaceHub
from langchain_community.tools import Tool
from langchain.chains import RetrievalQA
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
import chess
import chess.engine
from langchain_core.documents import Document

# Load Chess Knowledge Base (RAG)
# def load_chess_knowledge():
#     loader = TextLoader("chess_knowledge.txt")  # A document with chess theories
#     docs = loader.load()
#     text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
#     chunks = text_splitter.split_documents(docs)
#     embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
#     vectorstore = FAISS.from_documents(chunks, embeddings)
#     return vectorstore.as_retriever()

retriever = load_chess_knowledge()

# Load Open-Source LLM from Hugging Face
llm = HuggingFaceHub(
    repo_id="mistralai/Mistral-7B-Instruct-v0.1", 
    model_kwargs={"temperature": 0.7, "max_length": 512},
    task="text-generation"
)

# Define Chess Agents
engine = chess.engine.SimpleEngine.popen_uci("stockfish")

def get_best_move(fen):
    board = chess.Board(fen)
    result = engine.play(board, chess.engine.Limit(time=0.5))
    return board.san(result.move)

class MoveValidationTool:
    def __init__(self):
        pass

    def validate_move(self, fen, move):
        board = chess.Board(fen)
        try:
            chess.Move.from_uci(board.parse_san(move).uci())
            return True
        except:
            return False

validate_tool = MoveValidationTool()

# Define Tools
stockfish_tool = Tool(name="Stockfish Engine", func=get_best_move, description="Suggest best move")
retriever_tool = Tool(name="Chess RAG Retriever", func=retriever.get_relevant_documents, description="Retrieve chess strategies")
validate_move_tool = Tool(name="Move Validator", func=validate_tool.validate_move, description="Validate move")

# Define Agents
move_generator = Agent(
    name="Move Generator",
    role="Chess Move Selector",
    goal="Suggests the best possible move for the given chess position.",
    backstory="A chess AI trained on various strategies and tactics, designed to find the optimal move in any given position.",
    llm=llm
)

explanation_agent = Agent(
    name="Move Explainer",
    role="Chess Strategy Analyst",
    goal="Explains why the chosen move is good using chess knowledge.",
    backstory="An expert in chess theories, capable of retrieving and explaining moves using a vast knowledge base of chess strategies.",
    llm=llm
)

validation_agent = Agent(
    name="Move Validator",
    role="Chess Move Validator",
    goal="Checks if the suggested move is valid.",
    backstory="Ensures that every suggested move follows the official rules of chess, preventing illegal or incorrect moves.",
    llm=llm
)

# Define Crew Workflow
crew = Crew(
    agents=[move_generator, explanation_agent, validation_agent],
    tasks=[
        Task(
            name="Generate Move", 
            agent=move_generator, 
            inputs=["FEN"],
            tools=[stockfish_tool],
            description="Generates the best possible move for the given board position.",
            expected_output="A valid chess move in standard algebraic notation."
        ),
        Task(
            name="Validate Move", 
            agent=validation_agent, 
            inputs=["FEN", "Move"],
            tools=[validate_move_tool],
            description="Validates if the generated move is legal and possible in the given position.",
            expected_output="Boolean value indicating whether the move is valid."
        ),
        Task(
            name="Explain Move", 
            agent=explanation_agent, 
            inputs=["Move"],
            tools=[retriever_tool],
            description="Provides an explanation for why the generated move is a strong choice.",
            expected_output="A textual explanation of the move's strategy and impact."
        ),
    ]
)

# Example Execution
fen = "rnbqkb1r/pppppppp/5n2/8/8/5N2/PPPPPPPP/RNBQKB1R w KQkq - 2 2"  # Example board state
print("Current Position (FEN):", fen)

response = crew.kickoff(inputs={"FEN": fen})
print(response)

In [12]:
from crewai import Agent, Task
from langchain.llms import Ollama
from langchain_ollama import ChatOllama
from langchain.schema import HumanMessage
import json

class PlannerAgent:
    def __init__(self):
        # model_name = "llama-3.2"
        model_name = "deepseek-r1:8b"
        # model_name = "llama3:latest"
        self.llm = ChatOllama(model=model_name)
        # self.llm = Ollama(model="llama3")
        
    def generate_plan(self):
        prompt = """
        You are a Senior Technical Architect responsible for automating MLOps auto-deployment using a multi-agent system.
        Generate a JSON list of agents and tasks needed for this project.
        
        Each agent should have:
        - role
        - goal
        - backstory
        - tools
        - memory
        - verbose
        - allow delegation
        - cache
        
        Each task should have:
        - name
        - description
        - agent role
        - tools
        - expected output
        - output JSON
        - output file name

        output format: json
        {agents: [{
        role:,goal:,backstory,tools,memory,verbose,allow delegation,cache}], tasks: [{task1}, {task2}, ...]
        }
        """
        
        # response = self.llm.invoke([HumanMessage(content=prompt)])
        response = self.llm([HumanMessage(content=prompt)])
        # plan = json.loads(response.content)
        # return plan
        return response
    
if __name__ == "__main__":
    planner = PlannerAgent()
    plan = planner.generate_plan()
    # print(json.dumps(plan, indent=4))
    print(plan)

content='<think>\nOkay, so I\'m trying to figure out how to set up an MLOps auto-deployment system using a multi-agent approach. The user wants me to generate a JSON list of agents and tasks needed for this project. Let me break down what each part requires.\n\nFirst, the Senior Technical Architect role is about automating MLOps, so I need to identify the main areas where automation would benefit. That probably includes model deployment, monitoring, logging, data processing, etc. So, maybe I can have agents that handle each of these tasks.\n\nEach agent should have a role, goal, backstory, tools used, memory capacity, verbose level, whether delegation is allowed, and caching capabilities. Let me think about what roles are essential for MLOps auto-deployment:\n\n1. **Model Deployer**: This would be responsible for deploying models into production environments. It needs to handle configurations, maybe scale models based on traffic, and ensure reliability.\n\n2. **Data Mapper**: Ensures d

In [13]:
plan

AIMessage(content='<think>\nOkay, so I\'m trying to figure out how to set up an MLOps auto-deployment system using a multi-agent approach. The user wants me to generate a JSON list of agents and tasks needed for this project. Let me break down what each part requires.\n\nFirst, the Senior Technical Architect role is about automating MLOps, so I need to identify the main areas where automation would benefit. That probably includes model deployment, monitoring, logging, data processing, etc. So, maybe I can have agents that handle each of these tasks.\n\nEach agent should have a role, goal, backstory, tools used, memory capacity, verbose level, whether delegation is allowed, and caching capabilities. Let me think about what roles are essential for MLOps auto-deployment:\n\n1. **Model Deployer**: This would be responsible for deploying models into production environments. It needs to handle configurations, maybe scale models based on traffic, and ensure reliability.\n\n2. **Data Mapper**:

In [14]:
print(plan.content)

<think>
Okay, so I'm trying to figure out how to set up an MLOps auto-deployment system using a multi-agent approach. The user wants me to generate a JSON list of agents and tasks needed for this project. Let me break down what each part requires.

First, the Senior Technical Architect role is about automating MLOps, so I need to identify the main areas where automation would benefit. That probably includes model deployment, monitoring, logging, data processing, etc. So, maybe I can have agents that handle each of these tasks.

Each agent should have a role, goal, backstory, tools used, memory capacity, verbose level, whether delegation is allowed, and caching capabilities. Let me think about what roles are essential for MLOps auto-deployment:

1. **Model Deployer**: This would be responsible for deploying models into production environments. It needs to handle configurations, maybe scale models based on traffic, and ensure reliability.

2. **Data Mapper**: Ensures data is compatible w

In [None]:
print(plan.content)

In [9]:
print(plan.content)

What a fascinating project! As a Senior Technical Architect, I'd be delighted to help you generate the JSON list of agents and tasks needed for automating MLOps auto-deployment using a multi-agent system.

Here's the JSON data:

**Agents**
```json
[
  {
    "role": "Model Trainer",
    "goal": "Train and validate machine learning models",
    "backstory": "Experienced in training and validating various machine learning models for classification, regression, clustering, etc.",
    "tools": ["TensorFlow", "PyTorch"],
    "memory": 16,
    "verbose": true,
    "allow_delegation": false,
    "cache": {
      "model_data": {}
    }
  },
  {
    "role": "Model Deployer",
    "goal": "Deploy trained models to production environments",
    "backstory": "Proficient in deploying models using various frameworks such as TensorFlow Serving, AWS SageMaker, etc.",
    "tools": ["TensorFlow Serving", "AWS SageMaker"],
    "memory": 8,
    "verbose": false,
    "allow_delegation": true,
    "cache": {
