In [1]:
pip install google-api-python-client beautifulsoup4 pinecone openai requests tqdm textblob scikit-learn textstat pandas

Note: you may need to restart the kernel to use updated packages.


In [12]:
from config import (
    PINECONE_API_KEY, 
    PINECONE_INDEX_NAME, 
    PINECONE_ADS_INDEX_NAME
)
# from embeddings.embedding import get_embedding
from pinecone import Pinecone

import os
import re
import time
import requests
import google.generativeai as genai
from config import GEMINI_API_KEY
# from database.pinecone_db import query_pinecone, store_ad_in_pinecone, query_ad_pinecone
from bs4 import BeautifulSoup

In [13]:
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

def clean_text(text, max_length=5000):
    """Pre-process text before embedding: removes unwanted characters and limits length."""
    if not isinstance(text, str) or not text.strip():
        raise ValueError("Invalid text input. Must be a non-empty string.")
    
    text = re.sub(r'\s+', ' ', text).strip()  # Remove extra spaces
    return text[:max_length]  # Truncate if too long

def get_embedding(text, max_retries=5):
    """Generate an embedding using the Google Gemini API with retry logic."""
    
    if GEMINI_API_KEY is None:
        raise ValueError("GEMINI_API_KEY is not set. Please check your environment variables.")
    
    url = f"https://generativelanguage.googleapis.com/v1beta/models/embedding-001:embedContent?key={GEMINI_API_KEY}"
    headers = {"Content-Type": "application/json"}

    try:
        text = clean_text(text)  # Clean input text
    except ValueError as e:
        print(f"Preprocessing Error: {e}")
        return None  

    data = {
        "content": {"parts": [{"text": text}]}
    }

    for attempt in range(max_retries):
        try:
            response = requests.post(url, headers=headers, json=data)
            response.raise_for_status()
            return response.json()["embedding"]["values"]  # Extract embedding vector

        except requests.exceptions.RequestException as e:
            wait_time = 3 ** attempt  # Exponential backoff
            print(f"API error: {e}. Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
    
    print("Max retries exceeded for embedding API.")
    return None  # Return None on failure


In [14]:
# Initialize Pinecone client
pc = Pinecone(api_key=PINECONE_API_KEY)

# Initialize indexes
index = pc.Index(name=PINECONE_INDEX_NAME)
ads_index = pc.Index(name=PINECONE_ADS_INDEX_NAME)

def store_in_pinecone(text, metadata):
    """Stores text embeddings in Pinecone."""
    embedding = get_embedding(text)
    index.upsert(vectors=[{"id": metadata["url"], "values": embedding, "metadata": metadata}])

def query_pinecone(query, top_k=3):
    """Queries Pinecone to retrieve the most relevant stored embeddings."""
    query_embedding = get_embedding(query)
    results = index.query(vector=query_embedding, top_k=top_k, include_metadata=True)
    return [res["metadata"]["content"] for res in results.get("matches", [])]

def store_ad_in_pinecone(product, ad_text):
    """Stores generated ad content in a separate Pinecone index."""
    embedding = get_embedding(ad_text)
    metadata = {"product": product, "ad_text": ad_text}
    ads_index.upsert(vectors=[{"id": product, "values": embedding, "metadata": metadata}])

def query_ad_pinecone(product, top_k=1):
    """Retrieves stored ads for a product from Pinecone."""
    query_embedding = get_embedding(product)
    results = ads_index.query(vector=query_embedding, top_k=top_k, include_metadata=True)
    return [res["metadata"]["ad_text"] for res in results.get("matches", [])]


In [15]:
def scrape_website(url):
    """Scrapes text content from a given URL."""
    try:
        headers = {"User-Agent": "Mozilla/5.0"}
        response = requests.get(url, headers=headers, timeout=5)
        soup = BeautifulSoup(response.text, "html.parser")
        return " ".join([p.text for p in soup.find_all("p")])[:2000]  # Limit to 2000 chars
    except Exception as e:
        print(f"Failed to scrape {url}: {e}")
        return ""

In [16]:
genai.configure(api_key=GEMINI_API_KEY)

def generate_ad(product, force_regenerate=False):
    """
    Generates an ad for a product using Gemini 1.5 Flash based on retrieved reviews.
    Implements Retrieval-Augmented Generation (RAG) with Pinecone.
    Make sure the Ad has the appropriate trust signals, urgency and call to action 
    Parameters:
      product (str): The product name.
      force_regenerate (bool): If True, regenerates the ad even if one exists.
    
    Returns:
      str: The generated advertisement.
    """
    # Check if an ad already exists in Pinecone unless we're forcing a regeneration.
    if not force_regenerate:
        existing_ads = query_ad_pinecone(product)
        if existing_ads:
            # Verify if the retrieved ad appears to match the product
            if any(product.lower() in ad.lower() for ad in existing_ads):
                print(f"✅ Retrieved existing ad for {product} from Pinecone.")
                return existing_ads[0]
            else:
                print("ℹ️ Existing ad found but does not seem to match the product; regenerating.")

    # Retrieve relevant reviews from the Pinecone vector database
    reviews = query_pinecone(product, top_k=5)

    # Handle cases where no reviews are found
    if not reviews:
        print(f"⚠️ No reviews found for {product}. Using a generic ad prompt.")
        reviews_text = "No reviews available. Write a general, compelling ad for the product."
    else:
        reviews_text = "\n".join(reviews)

    # Construct an enhanced prompt with retrieval-augmented context
    prompt = f"""
You are an expert ad copywriter. Create a compelling advertisement for the product: "{product}".

Base your ad on the following customer reviews and insights:
{reviews_text}

Ensure the ad is engaging, persuasive, and accurately highlights the best aspects of the product.
    """

    try:
        model = genai.GenerativeModel("gemini-1.5-flash")
        response = model.generate_content(prompt)

        ad_text = response.text if hasattr(response, "text") else "⚠️ Failed to generate ad."

        # Optionally, warn if the generated ad doesn't reference the product name
        if product.lower() not in ad_text.lower():
            print("⚠️ Warning: The generated ad does not appear to mention the product properly.")

        # Store the generated ad in Pinecone for future retrieval
        store_ad_in_pinecone(product, ad_text)

        return ad_text

    except Exception as e:
        print(f"❌ Gemini API Error: {e}")
        return "⚠️ An error occurred while generating the ad."


In [17]:
import requests
import os

API_KEY = os.getenv("GOOGLE_SEARCH_API")
CX_ID = os.getenv("GOOGLE_CSE_ID")

def google_search(query, num_results=5):
    """Search Google Custom Search API and return a list of URLs."""
    url = f"https://customsearch.googleapis.com/customsearch/v1"
    params = {
        "q": query,
        "cx": CX_ID,
        "num": num_results,
        "key": API_KEY,
    }

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()  # Raises an error for HTTP codes 4xx/5xx
        
        data = response.json()
        
        if "items" in data:
            return [item["link"] for item in data["items"]]
        else:
            print("❌ No search results found.")
            return []

    except requests.exceptions.RequestException as e:
        print(f"⚠️ API Request Error: {e}")
        return []


In [18]:
def main():
    product_name = input("Enter product name: ")
    print(f"\n🔍 Searching for relevant websites related to '{product_name}'...")
    
    urls = google_search(product_name + " reviews")
    
    if not urls:
        print("❌ No URLs found. Exiting...")
        return
    
    print(f"✅ Found {len(urls)} URLs. Proceeding to scrape data...\n")
    
    scraped_data = []  # Store all scraped content for later use
    
    for index, url in enumerate(urls, start=1):
        print(f"🕵️ Scraping ({index}/{len(urls)}): {url}")
        content = scrape_website(url)
        
        if content:
            print(f"✅ Successfully scraped content from {url}. Storing in Pinecone...\n")
            metadata = {"url": url, "product": product_name, "content": content}
            store_in_pinecone(content, metadata)
            scraped_data.append(content)  # Collect for ad generation
        else:
            print(f"⚠️ Skipping {url} (No content extracted).")

    print("\n🚀 Data processing complete! Ready to generate ads.\n")
    
    # Generate ad only if we have scraped data
    if scraped_data:
        print("\n📝 Generating Ad...")
        ad_text = generate_ad(product_name)
        
        # Store the generated ad in Pinecone
        store_ad_in_pinecone(product_name, ad_text)
        
        print("\n📢 Generated Ad:\n", ad_text)
    else:
        print("⚠️ No scraped data found. Skipping ad generation.")


In [19]:
main()


🔍 Searching for relevant websites related to 'coca cola'...
✅ Found 5 URLs. Proceeding to scrape data...

🕵️ Scraping (1/5): https://www.indeed.com/cmp/The-Coca--cola-Company/reviews
✅ Successfully scraped content from https://www.indeed.com/cmp/The-Coca--cola-Company/reviews. Storing in Pinecone...

🕵️ Scraping (2/5): https://investors.coca-colacompany.com/filings-reports/annual-reviews
✅ Successfully scraped content from https://investors.coca-colacompany.com/filings-reports/annual-reviews. Storing in Pinecone...

🕵️ Scraping (3/5): https://adage.com/article/agency-news/coca-cola-reviews-us-media-wpp-and-publicis/2604706/
⚠️ Skipping https://adage.com/article/agency-news/coca-cola-reviews-us-media-wpp-and-publicis/2604706/ (No content extracted).
🕵️ Scraping (4/5): https://www.yourcoca-cola.co.uk/coca-cola-original-taste-24-x-330ml-glass-bottles/12657918.reviews
✅ Successfully scraped content from https://www.yourcoca-cola.co.uk/coca-cola-original-taste-24-x-330ml-glass-bottles/1265