In [1]:
from utils.gemini_service import GeminiModel, GeminiJsonEngine, GeminiSimpleChatEngine
from utils.langchain_agent_service import LangchainJSONEngine, LangchainSimpleEngine


from dotenv import load_dotenv
import os
import json
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional

load_dotenv()


For example, replace imports like: `from langchain_core.pydantic_v1 import BaseModel`
with: `from pydantic import BaseModel`
or the v1 compatibility namespace if you are working in a code base that has not been fully upgraded to pydantic 2 yet. 	from pydantic.v1 import BaseModel

  from utils.langchain_agent_service import LangchainJSONEngine, LangchainSimpleEngine


True

In [2]:
def load_json_data(path: str) -> Dict[str, Any]:
    if not os.path.exists(path):
        return None
    with open(path, 'r') as file:
        data = json.load(file)
    return data

def save_json_data(path: str, data: Dict[str, Any]):
    # Create the directory if it doesn't exist
    directory = os.path.dirname(path)
    if not os.path.exists(directory):
        os.makedirs(directory, exist_ok=True)
    with open(path, 'w') as file:
        json.dump(data, file, indent=4)

## Langchain LLM Engines

In [3]:
pass

## Gemini Engines

In [4]:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/debasmitroy/Desktop/programming/gemini-agent-assist/key.json"
os.environ["GOOGLE_CLOUD_PROJECT"] = "hackathon0-project"
os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"

## Pydantic Basemodels

In [5]:
class Sentiment(BaseModel):
    """
    This tool is used to analyze the sentiment of a given text. The sentiment is analyzed based on the emotions of the text.
    """
    happy: bool = Field(title="Happy",description="The User is happy.")
    sad: bool = Field(title="Sad",description="The User is sad.")

## Sample Usage

In [6]:
# sentiment_engine = LangchainJSONEngine(
#     sampleBaseModel=Sentiment,
#     systemPromptText="You are an AI assistant. You are helping a user with a task. The user is asking you questions and you are answering them.",
#     temperature=0.0
# )

# result = sentiment_engine.run("I am happy")
# print(dict(result))

In [7]:
# gemini_sentiment_engine = GeminiJsonEngine(
#                                     model_name="gemini-2.0-flash-001",
#                                     basemodel=Sentiment,
#                                     temperature=0.5,
#                                     max_output_tokens=256,
#                                     systemInstructions=None,
#                                     max_retries=5,
#                                     wait_time=30
#                                     )

# # Not that good
# gemini_sentiment_engine(
#     [
#         "You are an AI assistant. Your task is to analyze the sentiment of the user's text.",
#         "Now analyze the sentiment of the user's text. Generate sentiment scores for anger, joy, and fear form the user's text. Use `Sentiment` as the base model. Stricly follow the arguments and return the result in the form of a JSON object.",
#         "User: I am happy. I am very happy today."
#     ]
# )

## Trend Anl Util

In [8]:
from pytrends.request import TrendReq
from datetime import datetime, timedelta
import pandas as pd
from pydantic import BaseModel, Field
from collections import defaultdict, Counter


In [9]:
class SearchTokenModel(BaseModel):
    """
    This model extracts search tokens based on the brand's market scope, TAM, etc.
    """
    search_tokens: list[str] = Field(
        title="Search Tokens",
        description="List of search tokens in the form of common nouns or search queries relevant to the brand. These tokens should be meanningful phrase not a single word."
    )

class DummyTrendModel(BaseModel):
    country: str = Field(title="Country", description="Country code (e.g., US, IN, etc.)")
    keyword: str = Field(title="Keyword", description="The keyword for which trends are simulated.")
    trendscore: int = Field(title="Trend Score", description="Peak interest score (0-100).")
    top_months: list[str] = Field(title="Top Months", description="Top 3 months where the trend peaked (YYYY-MM format).")
    top_states: list[str] = Field(title="Top States", description="Top 3 states/regions within the country.")

In [133]:
class TrendAnalyzer:
    def __init__(self):
        # Engine to generate search tokens
        self.token_engine = LangchainJSONEngine(
            sampleBaseModel=SearchTokenModel,
            systemPromptText="""
            You are a marketing strategist helping a brand identify the most effective search terms for trend analysis. 
            Based on the brand description, TAM, and market scope, generate a list of highly relevant search tokens (common noun queries). 
            The tokens should be simple, commonly searched phrases like "electric scooter in India", "eco-friendly scooter", "best commuter scooter", etc. 
            Focus on intent-based keywords that would likely appear in Google Trends.
            """
        )

        # Dummy trend data generator
        self.dummy_engine = LangchainJSONEngine(
            sampleBaseModel=DummyTrendModel,
            systemPromptText="""
            You are simulating trend data for a market research report. 
            Given a country and a search token, provide:
            - a realistic peak trendscore (between 60 to 100),
            - top 3 months in the format "YYYY-MM" in 2024.
            - top 3 states/regions where this search term is popular within the country.
            The data should appear realistic and well-distributed.
            """
        )

        self.countries = ['US', 'IN']


    def _get_countrywise_top_states_per_month(self,trend_data):
        country_month_state_counter = defaultdict(lambda: defaultdict(Counter))

        # Step 1: Aggregate data
        for entry in trend_data:
            for country, info in entry.items():
                months = info['top-months']
                states = info['top-states']
                for month in months:
                    country_month_state_counter[country][month].update(states)

        # Step 2: Pick top state(s) for each country-month
        result = defaultdict(dict)
        for country, month_data in country_month_state_counter.items():
            for month, state_counter in month_data.items():
                max_count = max(state_counter.values())
                top_states = [state for state, count in state_counter.items() if count == max_count]
                month_name = datetime.strptime(month, "%Y-%m").strftime("%B")
                result[country][month_name] = top_states

        return dict(result)


    def run(self, description: str):
        # Step 1: Extract search tokens from brand description
        tokens_result = self.token_engine.run(description)
        search_tokens = tokens_result.search_tokens

        # Step 2: Dummy trend simulation instead of actual API calls
        output = []

        for country in self.countries:
            for keyword in search_tokens:
                # print(f"Country: {country}, Keyword: {keyword}")
                dummy_data = self.dummy_engine.run(f"Country: {country}, Keyword: {keyword}")
                output.append({
                    dummy_data.country: {
                        "keyword": dummy_data.keyword,
                        "trendscore": dummy_data.trendscore,
                        "top-months": dummy_data.top_months,
                        "top-states": dummy_data.top_states
                    }
                })
        
        # Step 3: Post-process the data
        result = self._get_countrywise_top_states_per_month(output)
        return result


In [134]:
trend_analyzer = TrendAnalyzer()



In [135]:
description = """
    Our brand is ElecX, revolutionizing electric scooters. 
    We are passionate about creating fast, eco-friendly, and commuter-friendly scooters. 
    The new ElecX Pro targets urban mobility solutions worldwide.
    """
trend_result = trend_analyzer.run(description)

In [136]:
trend_result

{'US': {'June': ['California', 'Texas', 'New York'],
  'September': ['California', 'Texas', 'New York'],
  'November': ['California', 'Texas', 'New York'],
  'February': ['California', 'New York', 'Texas']},
 'IN': {'March': ['Maharashtra', 'Karnataka'],
  'June': ['Maharashtra', 'Karnataka'],
  'September': ['Maharashtra', 'Karnataka'],
  'November': ['Maharashtra', 'Karnataka', 'Tamil Nadu'],
  'February': ['Maharashtra', 'Karnataka', 'Tamil Nadu']}}

## Sitelog Agent

In [143]:
from utils.sql_engine_service import get_sitelog_inmemory_db,SITELOG_INMEM_DB

In [144]:
SITELOG_INMEM_DB = get_sitelog_inmemory_db(load_json_data("data/sitelog.json"))

In [145]:
class SiteLogQuery(BaseModel):
    text_query: str = Field(title="Text Query", description="The text query to search in the site log.")
    sql_query: str = Field(title="SQL Query", description="Corresponding SQL query to search in the site log.")

class SiteLogQueries(BaseModel):
    queries: List[SiteLogQuery] = Field(title="Site Log Queries", description="List of text queries and their corresponding SQL queries.")

In [146]:
class SiteLogAgent:
    def __init__(self, target_product_id: str):
        global SITELOG_INMEM_DB
        SITELOG_INMEM_DB_COLS, SITELOG_INMEM_DB_HEAD = SITELOG_INMEM_DB.query_data("SELECT * FROM sitelog LIMIT 5")
        self.PD_SITELOG_INMEM_DB_HEAD = pd.DataFrame(SITELOG_INMEM_DB_HEAD, columns=SITELOG_INMEM_DB_COLS)

        self.engine = GeminiJsonEngine(
                                    model_name="gemini-2.0-flash-001",
                                    basemodel=SiteLogQueries,
                                    temperature=0.5,
                                    max_output_tokens=1024,
                                    systemInstructions=None,
                                    max_retries=5,
                                    wait_time=30
                                    )
        
    def run(self,target_product_id):

        # Validate the product ID
        PRODUCT_ID_COUNT = SITELOG_INMEM_DB.query_data(f"SELECT COUNT(*) FROM sitelog WHERE product_id = '{target_product_id}'")
        if PRODUCT_ID_COUNT[1][0][0] == 0:
            raise ValueError(f"Product ID '{target_product_id}' not found in the site log.")

        queries_result = self.engine(
            [
                "You are a SQL expert. Your task is to write a SQL script to query data from the given table. Note: you are generating a SQL script for SQLLite's python library. You must be careful while writing complex queries as it is very sensitive.",
                f"Here are the first few rows of the table sitelog: {self.PD_SITELOG_INMEM_DB_HEAD}.",
                f"Now generate a list of text queries and their corresponding SQL queries to search in the site log to fetch some useful information and groupings.",
                f"Example: From which regions are most of the users purchasing the product with product_id = {target_product_id}? Give me the percentage of users from each region.",
                f"SQL Query: SELECT region, COUNT(*) * 100.0 / (SELECT COUNT(*) as PERCENT FROM sitelog WHERE product_id = '{target_product_id}') as percentage FROM sitelog WHERE product_id = '{target_product_id}' GROUP BY region ORDER BY percentage DESC;",
                f"You are allowed to write multiple queries. Make sure to provide the text query and the corresponding SQL query in the response.",
                f"Always extract percentage not count. Also, make sure to order the results in both TOP and BOTTOM order with limit 3.",
                f"Use different groupings based on Age, Demography, Gender, Month etc."
            ]
        )

        final_result = []
        succes = 0
        for query in queries_result[0]['queries']:
            try:
                result = SITELOG_INMEM_DB.query_data(query['sql_query'])
                succes += 1
            except Exception as e:
                pass
            final_result.append({
                "text_query": query['text_query'],
                "sql_query": query['sql_query'],
                "result": result
            })
        
        print(f"Successful queries: {succes} out of {len(queries_result[0]['queries'])}")
        return final_result

In [147]:
# Initialize the SiteLogAgent
site_log_agent=SiteLogAgent(target_product_id="P001")

[1;36m2025-03-16 23:41:40,131 - DEBUG ==> Initialized GeminiModel with model gemini-2.0-flash-001 , project hackathon0-project, location us-central1[0m


In [148]:
site_log_reuslt = site_log_agent.run(target_product_id="P001")

Successful queries: 8 out of 8


In [149]:
site_log_reuslt

[{'text_query': 'What are the top 3 and bottom 3 regions by percentage of total transactions?',
  'sql_query': 'SELECT region, COUNT(*) * 100.0 / (SELECT COUNT(*) FROM sitelog) AS percentage FROM sitelog GROUP BY region ORDER BY percentage DESC LIMIT 3;',
  'result': (['region', 'percentage'],
   [['Maharashtra', 18.181818181818183],
    ['Texas', 15.151515151515152],
    ['Karnataka', 15.151515151515152]])},
 {'text_query': 'What are the bottom 3 regions by percentage of total transactions?',
  'sql_query': 'SELECT region, COUNT(*) * 100.0 / (SELECT COUNT(*) FROM sitelog) AS percentage FROM sitelog GROUP BY region ORDER BY percentage ASC LIMIT 3;',
  'result': (['region', 'percentage'],
   [['', 3.0303030303030303],
    ['Berlin', 3.0303030303030303],
    ['Sao Paulo', 3.0303030303030303]])},
 {'text_query': 'What are the top 3 age groups by percentage of total transactions?',
  'sql_query': 'SELECT age, COUNT(*) * 100.0 / (SELECT COUNT(*) FROM sitelog) AS percentage FROM sitelog GROU

## SiteLogRefinerAgent

In [150]:
class RegionDemography(BaseModel):
    state: str = Field(title="State", description="The state or region name.")
    country: str = Field(title="Country", description="The country name.")

class SiteLogRefinedResult(BaseModel):
    """
    This model represents the refined results of the site
    """
    top2_months: List[str] = Field(title="Top 2 Months", description="Top 2 months with the highest number of user interactions. Not year specific. Connvert month number to month name.")
    top_age_group: str = Field(title="Top Age Group", description="Top age group with the highest number of user interactions. Example: 18-24, 25-34, etc.")
    top2_regions: List[RegionDemography] = Field(title="Top 2 Regions", description="Top 2 regions with the highest number of user interactions.")

    bottom2_months: List[str] = Field(title="Bottom 2 Months", description="Bottom 2 months with the lowest number of user interactions. Not year specific. Connvert month number to month name.")
    bottom_age_group: str = Field(title="Bottom Age Group", description="Bottom age group with the lowest number of user interactions. Example: 18-24, 25-34, etc.")
    bottom2_regions: List[RegionDemography] = Field(title="Bottom 2 Regions", description="Bottom 2 regions with the lowest number of user interactions.")

In [160]:
class SiteLogRefinerAgent:
    def __init__(self):
        self.engine = LangchainJSONEngine(
            sampleBaseModel=SiteLogRefinedResult,
            systemPromptText="""
            You are a data analyst. Your task is to analyze the site log data to extract meaningful insights.
            """,
            temperature=0.2
        )

    def run(self, site_log_result):
        parsed_result = "\n".join([f"{res['text_query']}\nResult: {res['result']}" for res in site_log_result])
        result = self.engine.run(f"""These are the results of the SQL queries on the site log data: {parsed_result}
        Analyze the data and provide the following insights:
        - Top/Bottom 2 months with the highest number of user interactions.
        - Top/Bottom 2 regions with the highest number of user interactions.
        - Top/Bottom age group with the highest number of user interactions.
        - Top/Bottom region with the highest number of user interactions.
        """)

        result_dict = dict(result)
        # Convert List(top2_region) to List[Dict]
        top2_regions = [dict(region) for region in result_dict['top2_regions']]
        result_dict['top2_regions'] = top2_regions

        # Convert List(bottom2_region) to List[Dict]
        bottom2_regions = [dict(region) for region in result_dict['bottom2_regions']]
        result_dict['bottom2_regions'] = bottom2_regions

        # Top Vs Bottom Results 
        top_bottom_results = {
            "top": {
                "months": result_dict['top2_months'],
                "age_group": result_dict['top_age_group'],
                "regions": result_dict['top2_regions']
            },
            "bottom": {
                "months": result_dict['bottom2_months'],
                "age_group": result_dict['bottom_age_group'],
                "regions": result_dict['bottom2_regions']
            }
        }

        return top_bottom_results

In [161]:
site_log_refiner_agent = SiteLogRefinerAgent()



In [163]:
site_log_refiner_result = site_log_refiner_agent.run(site_log_reuslt)

In [165]:
site_log_refiner_result

{'top': {'months': ['February', 'March'],
  'age_group': '22',
  'regions': [{'state': 'Maharashtra', 'country': ''},
   {'state': 'Texas', 'country': ''}]},
 'bottom': {'months': ['December', 'January'],
  'age_group': '18',
  'regions': [{'state': '', 'country': ''},
   {'state': 'Berlin', 'country': ''}]}}

## Now We Have Two Result

In [157]:
trend_result

{'US': {'June': ['California', 'Texas', 'New York'],
  'September': ['California', 'Texas', 'New York'],
  'November': ['California', 'Texas', 'New York'],
  'February': ['California', 'New York', 'Texas']},
 'IN': {'March': ['Maharashtra', 'Karnataka'],
  'June': ['Maharashtra', 'Karnataka'],
  'September': ['Maharashtra', 'Karnataka'],
  'November': ['Maharashtra', 'Karnataka', 'Tamil Nadu'],
  'February': ['Maharashtra', 'Karnataka', 'Tamil Nadu']}}

In [166]:
site_log_refiner_result

{'top': {'months': ['February', 'March'],
  'age_group': '22',
  'regions': [{'state': 'Maharashtra', 'country': ''},
   {'state': 'Texas', 'country': ''}]},
 'bottom': {'months': ['December', 'January'],
  'age_group': '18',
  'regions': [{'state': '', 'country': ''},
   {'state': 'Berlin', 'country': ''}]}}

## Script Writing Agent

In [214]:
class TextScript(BaseModel):
    title: str = Field(title="Text", description="A very flashy title for the advertisement post.")
    body: str = Field(title="Body", description="The body of the advertisement post.")
    month: str = Field(title="Month", description="The month for which the advertisement is being created.")
    age_group: str = Field(title="Age Group", description="The target age group for the advertisement.")
    region: str = Field(title="Region", description="The target region for the advertisement. It should be a full state and Country name. The format should be 'State, Country'.")
    hashtags: List[str] = Field(title="Hashtags", description="List of hashtags to be used in the advertisement post. The hashtags should be separated by commas and should contain the '#' symbol.")

class TextScripts(BaseModel):
    scripts: List[TextScript] = Field(title="Text Scripts", description="List of advertisement text scripts.")

In [215]:
class ScriptWriterAgent:
    def __init__(self):
        self.engine = LangchainJSONEngine(
            sampleBaseModel=TextScripts,
            systemPromptText="""
            You are a content writer. Your task is to create an advertisement script for a new product.
            """,
            temperature=0.2
        )

    def run(self, product_description, trend_result, site_log_refiner_result):
        prompt0 = f""" This is the product description: {product_description}.
Here are few insights from the trend analysis and site log data. This Trend Data is not Product specific, it is based on the overall market trends of that category.
Trend Data: {trend_result}.
Here are the insights from log data for the product with particualr product_id with top performing regions, months and age groups.
Site Log Data: {site_log_refiner_result['top']}.
Now, generate a list of advertisement text scripts for the product based on the insights for each month, age group, and region. 
Note: The Title and Body of the advertisement should have synergy with other demographics such as age group and region.
Note: Here your intent should be bring more and more focus on this demograohy as they are the top performers.
        """
        prompt1 = f"""This is the product description: {product_description}.
Here are few insights from the trend analysis and site log data. This Trend Data is not Product specific, it is based on the overall market trends of that category.
Trend Data: {trend_result}.
Here are the insights from log data for the product with particualr product_id with least performing regions, months and age groups.
Site Log Data: {site_log_refiner_result['bottom']}.
Now, generate a list of advertisement text scripts for the product based on the insights for each month, age group, and region.
Note: The Title and Body of the advertisement should have synergy with other demographics such as age group and region.
Note: Here your intent should be capture the audience from these demograohy as they are the least performers.
        """
        result0 = self.engine.run(prompt0)
        result1 = self.engine.run(prompt1)
        
        result = {
            "top": [dict(script) for script in result0.scripts],
            "bottom": [dict(script) for script in result1.scripts]
        }

        result["bucket_id"] = "-1" # Default bucket ID, with no image 
        return result

In [216]:
script_writer_agent = ScriptWriterAgent()



In [217]:
script_result = script_writer_agent.run(
    product_description=description,
    trend_result=trend_result,
    site_log_refiner_result=site_log_refiner_result
)

In [218]:
script_result

{'top': [{'title': 'Ride the Future with ElecX Pro in Maharashtra, India',
   'body': 'Experience the thrill of urban mobility with ElecX Pro, the ultimate electric scooter designed for speed and eco-friendliness. Join the trendsetters in Maharashtra and embrace a sustainable commute. #ElecXPro #UrbanMobility #Maharashtra #India #GreenCommute',
   'month': 'February',
   'age_group': '22',
   'region': 'Maharashtra, India',
   'hashtags': ['#ElecXPro',
    '#UrbanMobility',
    '#Maharashtra',
    '#India',
    '#GreenCommute']},
  {'title': 'Unleash Your Speed in Texas, USA with ElecX Pro',
   'body': 'Get ready to conquer the streets of Texas with ElecX Pro, the cutting-edge electric scooter that combines style and performance. Join the urban revolution and ride with power. #ElecXPro #Texas #USA #ElectricRevolution #Speedsters',
   'month': 'February',
   'age_group': '22',
   'region': 'Texas, USA',
   'hashtags': ['#ElecXPro',
    '#Texas',
    '#USA',
    '#ElectricRevolution',
  

## Image Generator Agent

In [219]:
import vertexai
from vertexai.preview.vision_models import ImageGenerationModel
import random
import os
import time

In [220]:
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/debasmitroy/Desktop/programming/gemini-agent-assist/key.json"
os.environ["GOOGLE_CLOUD_PROJECT"] = "hackathon0-project"
os.environ["GOOGLE_CLOUD_LOCATION"] = "us-central1"

In [None]:
class ImageGenerator:
    def __init__(self):
        vertexai.init(project=os.environ["GOOGLE_CLOUD_PROJECT"], location=os.environ["GOOGLE_CLOUD_LOCATION"])
        self.model = ImageGenerationModel.from_pretrained("imagen-3.0-generate-002")

    def _generate(self,script_result, flag, randomly_genrated_local_bucket_id):
        for i,script in enumerate(script_result[flag][:2]):
            prompt = f"""You are image genrator AI Assistant. You are assigned to generate an image for a advertisement company.
            Here is the ad details:
            Title: {script['title']}
            Body: {script['body']}
            Month: {script['month']}
            Age Group: {script['age_group']}
            Region: {script['region']}

            Now, generate an image for the advertisement post. The image should be relevant to the product and the target demographics.
            """
            images = self.model.generate_images(
                prompt=prompt,
                # Optional parameters
                number_of_images=1,
                language="auto",
                # You can't use a seed value and watermark at the same time.
                # add_watermark=False,
                # seed=100,
                aspect_ratio="1:1",
                safety_filter_level="block_some",
                person_generation="allow_adult",
            )

            image_name = f"{script['month']}_{script['age_group']}_{script['region']}_{flag}_{i}.png"
            image_path = f"assets/{randomly_genrated_local_bucket_id}/{flag}/{image_name}"

            try:
                images[0].save(image_path)

                print(f"Image generated for {image_name}. Now delaying for 30 sec to avoid rate limit.")
                time.sleep(30)
            except Exception as e:
                print(f"Error while saving image: {e} for {image_name}")

    def run(self,script_result):
        randomly_genrated_local_bucket_id = str(random.randint(1000,9999))
        os.makedirs(f"assets/{randomly_genrated_local_bucket_id}/top", exist_ok=True)
        os.makedirs(f"assets/{randomly_genrated_local_bucket_id}/bottom", exist_ok=True)

        self._generate(script_result, "top", randomly_genrated_local_bucket_id)
        self._generate(script_result, "bottom", randomly_genrated_local_bucket_id)

        script_result["bucket_id"] = randomly_genrated_local_bucket_id
        return script_result

In [222]:
image_generator = ImageGenerator()

In [223]:
script_result = image_generator.run(script_result)

Image generated for February_22_Maharashtra, India_top_0.png. Now delaying for 30 sec to avoid rate limit.
Image generated for February_22_Texas, USA_top_1.png. Now delaying for 30 sec to avoid rate limit.
Error while saving image: list index out of range for December_18_Berlin, Germany_bottom_0.png
Image generated for January_18_Berlin, Germany_bottom_1.png. Now delaying for 30 sec to avoid rate limit.


## Audio Agent

In [238]:
from google.cloud import texttospeech

class AudioAgent:
    def __init__(self):
        self.client = texttospeech.TextToSpeechClient()

    def emotional_tts(self, text, speaking_rate=1.0, pitch=0.0, path="output.wav"):
        input_text = texttospeech.SynthesisInput(text=text)

        voice = texttospeech.VoiceSelectionParams(
            language_code="en-US",
            name="en-US-Wavenet-F",  # More natural voice
            ssml_gender=texttospeech.SsmlVoiceGender.FEMALE,
        )

        audio_config = texttospeech.AudioConfig(
            audio_encoding=texttospeech.AudioEncoding.LINEAR16,  # WAV format
            speaking_rate=speaking_rate,
            pitch=pitch
        )

        response = self.client.synthesize_speech(
            input=input_text, voice=voice, audio_config=audio_config
        )

        # Ensure .wav file extension
        if not path.endswith('.wav'):
            path += '.wav'

        with open(path, "wb") as out:
            out.write(response.audio_content)
            print(f'🎧 Audio content written to {path}')

    def run(self, script_result):
        for i, script in enumerate(script_result['top'][:2]):
            text = f"{script['title']}. {script['body']}."
            fname = f"{script['month']}_{script['age_group']}_{script['region']}_top_{i}"
            path = f"assets/{script_result['bucket_id']}/top/{fname}.wav"
            # Only if the the image exists 
            if os.path.exists(f"assets/{script_result['bucket_id']}/top/{fname}.png"):
                self.emotional_tts(text, path=path)

        for i, script in enumerate(script_result['bottom'][:2]):
            text = f"{script['title']}. {script['body']}."
            fname = f"{script['month']}_{script['age_group']}_{script['region']}_bottom_{i}"
            path = f"assets/{script_result['bucket_id']}/bottom/{fname}.wav"
            if os.path.exists(f"assets/{script_result['bucket_id']}/bottom/{fname}.png"):
                self.emotional_tts(text, path=path)

        return script_result

In [239]:
audio_agent = AudioAgent()

In [240]:
script_result = audio_agent.run(script_result)

🎧 Audio content written to assets/5180/top/February_22_Maharashtra, India_top_0.wav
🎧 Audio content written to assets/5180/top/February_22_Texas, USA_top_1.wav
🎧 Audio content written to assets/5180/bottom/January_18_Berlin, Germany_bottom_1.wav


## Email Agent

In [224]:
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.audio import MIMEAudio
from email.mime.image import MIMEImage
from email.mime.base import MIMEBase
from email import encoders
import os

In [242]:
class EmailAgent:
    def __init__(self):
        self.SENDER_MAIL = os.environ.get("SENDER_MAIL")
        self.SENDER_PASSWORD = os.environ.get("SENDER_PASSWORD")

    def send_email_with_attachments(self,sender_email, sender_password, receiver_email, subject, body_text, image_path, audio_path):
        # Create the base message
        msg = MIMEMultipart()
        msg['From'] = sender_email
        msg['To'] = receiver_email
        msg['Subject'] = subject

        # Add text body
        msg.attach(MIMEText(body_text, 'plain'))

        # Attach image
        with open(image_path, 'rb') as img_file:
            img = MIMEImage(img_file.read())
            img.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(image_path)}"')
            msg.attach(img)

        # Attach audio
        with open(audio_path, 'rb') as audio_file:
            audio = MIMEAudio(audio_file.read())
            audio.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(audio_path)}"')
            msg.attach(audio)

        # Connect to Gmail SMTP and send
        try:
            server = smtplib.SMTP('smtp.gmail.com', 587)
            server.starttls()
            server.login(sender_email, sender_password)
            server.send_message(msg)
            server.quit()
            print("✅ Email sent successfully!")
        except Exception as e:
            print(f"❌ Failed to send email: {e}")

    
    def run(self, script_result):

        for i, script in enumerate(script_result['top'][:2]):
            image_path = f"assets/{script_result['bucket_id']}/top/{script['month']}_{script['age_group']}_{script['region']}_top_{i}.png"
            audio_path = f"assets/{script_result['bucket_id']}/top/{script['month']}_{script['age_group']}_{script['region']}_top_{i}.wav"

            if os.path.exists(image_path) and os.path.exists(audio_path):
                self.send_email_with_attachments(
                    sender_email=self.SENDER_MAIL,
                    sender_password=self.SENDER_PASSWORD,
                    receiver_email="kabirrajsingh10@gmail.com",
                    subject=f"Advertisement for {script['month']} - {script['age_group']} - {script['region']}",
                    body_text=f"{script['title']}. {script['body']}.",
                    image_path=image_path,
                    audio_path=audio_path
                )
        

        for i, script in enumerate(script_result['bottom'][:2]):
            image_path = f"assets/{script_result['bucket_id']}/bottom/{script['month']}_{script['age_group']}_{script['region']}_bottom_{i}.png"
            audio_path = f"assets/{script_result['bucket_id']}/bottom/{script['month']}_{script['age_group']}_{script['region']}_bottom_{i}.wav"

            if os.path.exists(image_path) and os.path.exists(audio_path):
                self.send_email_with_attachments(
                    sender_email=self.SENDER_MAIL,
                    sender_password=self.SENDER_PASSWORD,
                    receiver_email="kabirrajsingh10@gmail.com",
                    subject=f"Advertisement for {script['month']} - {script['age_group']} - {script['region']}",
                    body_text=f"{script['title']}. {script['body']}.",
                    image_path=image_path,
                    audio_path=audio_path
                )
        script_result['email_sent'] = True
        return script_result

In [243]:
email_agent = EmailAgent()

In [244]:
script_result = email_agent.run(script_result)

✅ Email sent successfully!
✅ Email sent successfully!
✅ Email sent successfully!


## Agent Structure 

In [None]:
{
    "ProductDescription": description, # Product Description + ProdcutID + Add should be like this
}