Install Libraries

In [None]:
!pip install -q transformers faiss-cpu sentence-transformers requests gradio google-genai python-dotenv

Imports

In [None]:
import requests
import os
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
import gradio as gr
from datetime import datetime, timedelta
from google import genai
import json
import logging
from dotenv import load_dotenv


Keys and Configurations

In [None]:
load_dotenv()

class Config:
    WEATHER_API_KEY = os.getenv('WEATHER_API_KEY', 'default_weather_key') # Replace default_weather_key with a suitable placeholder or error handling
    GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', 'default_gemini_key') # Replace default_gemini_key
    GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY', 'default_google_key') # Replace default_google_key

    WEATHER_BASE_URL = "https://api.openweathermap.org/data/2.5/forecast"
    GOOGLE_GEOCODE_URL = "https://maps.googleapis.com/maps/api/geocode/json"
    EMBEDDING_MODEL_NAME = 'all-MiniLM-L6-v2'
    TOP_K_CONTEXT = 3
    FORECAST_DAYS = 5
    GEMINI_MODEL_NAME = "gemini-2.5-flash"

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

INITIAL_PROMPT = f"Enter location or use 'Use My Location' to load the {Config.FORECAST_DAYS}-day forecast."


Defining Class and Objects

In [None]:
class WeatherRAGAssistant:
    def __init__(self, weather_key: str, gemini_key: str):
        self.embed_model = SentenceTransformer(Config.EMBEDDING_MODEL_NAME)
        genai.configure(api_key=gemini_key) # Configure genai at the module level
        self.gemini_client = genai.GenerativeModel(Config.GEMINI_MODEL_NAME)
        self.weather_key = weather_key
        self.index = None
        self.texts = [] # Stores formatted weather data for RAG
        self.api_response = None # Stores raw API response

    def fetch_and_index_weather(self, lat: float, lon: float):
        """Fetch weather forecast using coordinates."""
        params = {"lat": lat, "lon": lon, "appid": self.weather_key, "units": "metric"}
        try:
            response = requests.get(Config.WEATHER_BASE_URL, params=params, timeout=10)
            response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
            api_response = response.json()
            self.api_response = api_response
            self.texts = self._format_weather_data(api_response)
            self._build_faiss_index(self.texts)
            logger.info(f"Successfully fetched and indexed weather for lat={lat}, lon={lon}")
            return api_response, self.texts
        except requests.exceptions.Timeout:
            logger.error(f"Weather API request timed out for lat={lat}, lon={lon}")
            raise RuntimeError("Weather API request timed out.")
        except requests.exceptions.RequestException as e:
            logger.error(f"Weather API request failed for lat={lat}, lon={lon}: {e}")
            raise RuntimeError(f"Weather API request failed: {e}")
        except json.JSONDecodeError:
            logger.error(f"Failed to decode JSON from Weather API for lat={lat}, lon={lon}")
            raise RuntimeError("Failed to parse Weather API response.")

    def _format_weather_data(self, api_response: dict) -> list[str]:
        daily_data = {}
        timezone_offset = api_response['city'].get('timezone', 0)

        for three_hour_forecast in api_response.get('list', []):
            timestamp = three_hour_forecast['dt']
            utc_datetime = datetime.utcfromtimestamp(timestamp)
            local_datetime = utc_datetime + timedelta(seconds=timezone_offset)
            date_str = local_datetime.strftime('%Y-%m-%d')

            temp_max = three_hour_forecast['main']['temp_max']
            temp_min = three_hour_forecast['main']['temp_min']
            description = three_hour_forecast['weather'][0]['description']
            rain = three_hour_forecast.get('rain', {}).get('3h', 0)

            if date_str not in daily_data:
                daily_data[date_str] = {
                    'temp_max': temp_max, 'temp_min': temp_min,
                    'description_list': set(), 'rain_total': 0.0
                }

            daily_data[date_str]['temp_max'] = max(daily_data[date_str]['temp_max'], temp_max)
            daily_data[date_str]['temp_min'] = min(daily_data[date_str]['temp_min'], temp_min)
            daily_data[date_str]['description_list'].add(description)
            daily_data[date_str]['rain_total'] += rain

        texts = []
        for date, data in daily_data.items():
            descriptions = ", ".join(sorted(list(data['description_list'])))
            rain_total_rounded = round(data['rain_total'], 2)
            texts.append(
                f"Date: {date}, Max Temp: {round(data['temp_max'], 1)}°C, "
                f"Min Temp: {round(data['temp_min'], 1)}°C, "
                f"Total Rain: {rain_total_rounded}mm, Conditions: {descriptions}"
            )
        texts.sort()
        logger.debug(f"Formatted weather data for {len(texts)} days.")
        return texts

    def _build_faiss_index(self, texts: list[str]):
        if not texts:
            self.index = None
            logger.warning("No texts to build FAISS index.")
            return
        embeddings = self.embed_model.encode(texts, convert_to_numpy=True)
        embeddings = np.ascontiguousarray(embeddings.astype('float32'))
        d = embeddings.shape[1]
        self.index = faiss.IndexFlatL2(d)
        self.index.add(embeddings)
        logger.info(f"FAISS index built with {len(texts)} entries.")

    def search_index(self, query: str) -> list[str]:
        if self.index is None or not self.texts:
            logger.warning("FAISS index not built or no texts available for search.")
            return []
        try:
            query_vec = self.embed_model.encode([query], convert_to_numpy=True).astype('float32')
            D, I = self.index.search(query_vec, Config.TOP_K_CONTEXT)
            context = [self.texts[i] for i in I[0] if 0 <= i < len(self.texts)]
            logger.debug(f"Search for '{query}' returned {len(context)} context items.")
            return context
        except Exception as e:
            logger.error(f"Error during FAISS index search for query '{query}': {e}")
            return []

    def generate_response(self, prompt: str) -> str:
        try:
            response = self.gemini_client.generate_content(contents=prompt)
            logger.info("Successfully generated response from Gemini API.")
            return response.text.strip()
        except genai.types.BlockedPromptException as e:
            logger.warning(f"Gemini API blocked prompt: {e}")
            return "I'm sorry, your query was blocked by the safety filters. Please try rephrasing."
        except genai.types.StopCandidateException as e:
            logger.warning(f"Gemini API stopped generation: {e}")
            return "I'm sorry, I could not complete the response."
        except Exception as e:
            logger.error(f"Gemini API error: {e}")
            return f"Gemini API error: Could not generate response. Details: {e}"

def _generate_weather_rag_prompt(user_query: str, context: list[str], target_date_str: str | None = None) -> str:
    if target_date_str:
        # For specific date queries, ensure the context is focused
        # The RAG system will ideally retrieve the exact date string now.
        # If a single context item contains the target date, make it prominent.
        filtered_context = [text for text in context if target_date_str in text]
        if not filtered_context:
            filtered_context = context # Fallback if RAG didn't find the exact date match
            logger.warning(f"RAG did not return specific data for {target_date_str}. Using general context.")

        return f"""
        You are a weather expert. Provide the weather details **including the date** in your answer.

        ---
        VERIFIED WEATHER DATA:
        {'\n'.join(filtered_context)}
        ---

        QUESTION: {user_query}

        ANSWER FORMAT:
        - Date(s): {target_date_str} (or relevant date from context)
        - Max Temp / Min Temp
        - Weather conditions
        - Recommendation
        """
    else:
        # General RAG prompt
        return f"""
        You are a helpful and concise weather assistant.
        Always include the **date(s)** explicitly in your answer.

        Context:
        ---
        {'\n'.join(context)}
        ---

        Question: {user_query}

        Answer:
        - Date(s): state clearly
        - Weather details (Max/Min temp, rain, conditions)
        - Recommendation
        """


Pipeline

In [None]:
def reverse_geocode(lat: float, lon: float) -> str:
    """Convert coordinates to a human-readable address using Google Geocoding API."""
    try:
        params = {"latlng": f"{lat},{lon}", "key": Config.GOOGLE_API_KEY}
        response = requests.get(Config.GOOGLE_GEOCODE_URL, params=params, timeout=10)
        response.raise_for_status()
        data = response.json()
        if data.get("results"):
            formatted_address = data["results"][0]["formatted_address"]
            logger.info(f"Successfully reverse geocoded {lat},{lon} to '{formatted_address}'.")
            return formatted_address
        logger.warning(f"No results found for reverse geocoding {lat},{lon}.")
        return "Unknown Location"
    except requests.exceptions.Timeout:
        logger.error(f"Reverse geocoding request timed out for {lat},{lon}.")
        return "Error: Geocoding request timed out."
    except requests.exceptions.RequestException as e:
        logger.error(f"Reverse geocoding request failed for {lat},{lon}: {e}")
        return f"Error: Geocoding request failed: {e}"
    except json.JSONDecodeError:
        logger.error(f"Failed to decode JSON from Google Geocoding for {lat},{lon}.")
        return "Error: Failed to parse geocoding API response."

def use_my_location(lat, lon, assistant_state: gr.State, current_location_name_state: gr.State):
    assistant = assistant_state.value
    if not assistant:
        return "Assistant not initialized. Please refresh the page."
    try:
        location_name = reverse_geocode(lat, lon)
        _, _ = assistant.fetch_and_index_weather(lat=lat, lon=lon)
        if assistant.texts:
            current_location_name_state.value = location_name
            logger.info(f"Weather data loaded for user's location: {location_name}")
            return f"Location detected: {location_name}. Weather data loaded!"
        else:
            current_location_name_state.value = "Failed to load"
            logger.warning(f"Failed to load weather data for user's location: {location_name}")
            return f"Failed to load weather data for: {location_name}"
    except RuntimeError as e:
        current_location_name_state.value = "Error"
        logger.error(f"Error in use_my_location: {e}")
        return f"Error: {e}"
    except Exception as e:
        current_location_name_state.value = "Error"
        logger.error(f"An unexpected error occurred in use_my_location: {e}")
        return f"An unexpected error occurred: {e}"

def update_location_and_index(new_location: str, assistant_state: gr.State, current_location_name_state: gr.State):
    """Validate location with Google Geocoding, then fetch weather via lat/lon."""
    assistant = assistant_state.value
    if not assistant:
        return "Assistant not initialized. Please refresh the page."

    clean_location = new_location.strip()
    if not clean_location:
        return "Please enter a location."

    lat, lon = None, None
    resolved_location = clean_location

    try:
        # Try to parse as lat,lon first
        if ',' in clean_location and all(part.replace('.', '', 1).isdigit() or (part.startswith('-') and part[1:].replace('.', '', 1).isdigit()) for part in clean_location.split(',')):
            try:
                parts = clean_location.split(',')
                lat = float(parts[0].strip())
                lon = float(parts[1].strip())
                # Reverse geocode to get a human-readable name
                resolved_location = reverse_geocode(lat, lon)
                logger.info(f"Parsed coordinates: {lat},{lon}. Resolved location: {resolved_location}")
            except ValueError:
                pass # Not a valid lat,lon pair, proceed to geocoding by address

        if lat is None or lon is None: # If not parsed as lat,lon or parsing failed, use geocoding API
            geo_params = {"address": clean_location, "key": Config.GOOGLE_API_KEY}
            response = requests.get(Config.GOOGLE_GEOCODE_URL, params=geo_params, timeout=10)
            response.raise_for_status()
            geo_data = response.json()
            if not geo_data.get("results"):
                logger.warning(f"Could not find location matching '{clean_location}'.")
                return f"Could not find any location matching '{clean_location}'."

            location_info = geo_data["results"][0]["geometry"]["location"]
            lat, lon = location_info["lat"], location_info["lng"]
            resolved_location = geo_data["results"][0]["formatted_address"]
            logger.info(f"Geocoded '{clean_location}' to {lat},{lon}. Resolved location: {resolved_location}")

    except requests.exceptions.Timeout:
        logger.error(f"Geocoding API request timed out for '{clean_location}'.")
        return f"Error validating location: Geocoding request timed out."
    except requests.exceptions.RequestException as e:
        logger.error(f"Geocoding API request failed for '{clean_location}': {e}")
        return f"Error validating location: Geocoding request failed: {e}"
    except json.JSONDecodeError:
        logger.error(f"Failed to decode JSON from Google Geocoding for '{clean_location}'.")
        return "Error validating location: Failed to parse geocoding API response."
    except Exception as e:
        logger.error(f"An unexpected error occurred during location validation for '{clean_location}': {e}")
        return f"An unexpected error occurred during location validation: {e}"

    try:
        _, _ = assistant.fetch_and_index_weather(lat=lat, lon=lon)
        if assistant.texts:
            current_location_name_state.value = resolved_location
            logger.info(f"Weather data loaded for {resolved_location}.")
            return f"Weather data for {resolved_location} loaded!"
        else:
            current_location_name_state.value = "Failed to load"
            logger.warning(f"Failed to load weather data for {resolved_location}.")
            return f"Failed to load weather data for {resolved_location}."
    except RuntimeError as e:
        current_location_name_state.value = "Error"
        logger.error(f"Error fetching weather for {resolved_location}: {e}")
        return f"Error fetching weather: {e}"
    except Exception as e:
        current_location_name_state.value = "Error"
        logger.error(f"An unexpected error occurred while fetching weather for {resolved_location}: {e}")
        return f"An unexpected error occurred while fetching weather: {e}"

def get_target_date_string(user_query: str, assistant_state: gr.State) -> str | None:
    assistant = assistant_state.value
    if not assistant or not assistant.api_response:
        return None

    query_lower = user_query.lower()
    tz_offset = assistant.api_response['city'].get('timezone', 0)
    current_time = datetime.utcnow() + timedelta(seconds=tz_offset)
    target_date = None

    day_mapping = {'today': 0, 'tomorrow': 1, 'day after tomorrow': 2}
    current_weekday = current_time.weekday()
    day_names = ['monday','tuesday','wednesday','thursday','friday','saturday','sunday']

    for i in range(Config.FORECAST_DAYS): # Limit search to forecast days
        target_day_index = (current_weekday + i) % 7
        if day_names[target_day_index] in query_lower:
            target_date = current_time + timedelta(days=i)
            break

    for term, days_delta in day_mapping.items():
        if term in query_lower:
            target_date = current_time + timedelta(days=days_delta)
            break

    if target_date:
        # Ensure the target date is within the forecast period
        forecast_end_date = current_time + timedelta(days=Config.FORECAST_DAYS - 1)
        if target_date.date() <= forecast_end_date.date():
            logger.debug(f"Detected target date: {target_date.strftime('%Y-%m-%d')}")
            return target_date.strftime('%Y-%m-%d')
        else:
            logger.info(f"Detected date {target_date.strftime('%Y-%m-%d')} is outside the {Config.FORECAST_DAYS}-day forecast.")
            return None
    return None

def answer_weather_query(user_query: str, assistant_state: gr.State):
    assistant = assistant_state.value

    if not assistant or not assistant.texts:
        logger.warning("No weather data loaded, cannot answer query.")
        return "Please load the weather data first by entering a location or using 'Use My Location'."

    target_date_str = get_target_date_string(user_query, assistant_state)
    context_for_llm = []

    if target_date_str:
        # When a specific date is detected, refine the RAG query to include it
        # This ensures the RAG system is consistently used.
        date_specific_query = f"Weather conditions for {target_date_str}"
        context_for_llm = assistant.search_index(date_specific_query + ". " + user_query)
        if not context_for_llm:
            # If RAG found nothing for the specific date, try general search
            context_for_llm = assistant.search_index(user_query)
            logger.warning(f"RAG did not find specific context for {target_date_str}. Using general context.")
        else:
            logger.info(f"Using date-specific RAG context for {target_date_str}.")
    else:
        context_for_llm = assistant.search_index(user_query)
        logger.info("Using general RAG context.")

    if not context_for_llm:
        logger.info("No relevant weather information found for query.")
        return "I found no relevant weather information. Please try a different query or ensure the location data is loaded correctly."

    prompt = _generate_weather_rag_prompt(user_query, context_for_llm, target_date_str)
    logger.debug(f"Generated prompt: {prompt}")

    return assistant.generate_response(prompt)


Gradio UI

In [None]:
with gr.Blocks(title="Enhanced OpenWeatherMap Assistant") as demo:
    gr.Markdown(f"""
        # Enhanced RAG Weather Assistant (OpenWeatherMap)
        Ask about the **{Config.FORECAST_DAYS}-day forecast** (Today + {Config.FORECAST_DAYS - 1} Upcoming Days).
    """)

    # Initialize the assistant instance once and store it in Gradio State
    initial_assistant_instance = WeatherRAGAssistant(
        weather_key=Config.WEATHER_API_KEY,
        gemini_key=Config.GEMINI_API_KEY
    )
    assistant_state = gr.State(initial_assistant_instance)
    current_location_name_state = gr.State("No location loaded yet.")

    with gr.Row():
        location_input = gr.Textbox(
            label="1. Enter Location (City, Country OR Landmark OR lat,lon)",
            placeholder="e.g., Hyderabad, IN or 17.3850,78.4867",
            interactive=True,
            scale=3
        )
        refresh_button = gr.Button("Refresh Data", scale=1)

    status_output = gr.Textbox(
        label="Data Status",
        value=INITIAL_PROMPT,
        interactive=False
    )

    gr.Markdown("## Ask a Question")

    with gr.Row():
        query_input = gr.Textbox(
            label="Your Weather Question",
            placeholder="e.g., What's the highest temperature tomorrow?",
            lines=2,
            scale=4
        )
        submit_button = gr.Button("Get Answer", scale=1)

    answer_output = gr.Textbox(
        label="Assistant's Answer",
        lines=5,
        interactive=False
    )

    gr.Markdown(f"*(Data provided by OpenWeatherMap, RAG powered by Sentence-Transformers and {Config.GEMINI_MODEL_NAME})*")

    location_input.submit(
        fn=update_location_and_index,
        inputs=[location_input, assistant_state, current_location_name_state],
        outputs=[status_output]
    )
    refresh_button.click(
        fn=update_location_and_index,
        inputs=[location_input, assistant_state, current_location_name_state],
        outputs=[status_output]
    )

    submit_button.click(
        fn=answer_weather_query,
        inputs=[query_input, assistant_state],
        outputs=[answer_output]
    )
    query_input.submit(
        fn=answer_weather_query,
        inputs=[query_input, assistant_state],
        outputs=[answer_output]
    )

demo.launch(share=True)
