# Installation des différents package

In [1]:
!pip install semantic-kernel python-dotenv google-cloud-bigquery pyyaml google-cloud-bigquery-storage

Defaulting to user installation because normal site-packages is not writeable


# Import des variables d'environnement

N'oubliez pas de copier-coller le fichier .env à la racine du code

In [2]:
from dotenv import load_dotenv
import os
load_dotenv()

True

In [3]:
GOOGLE_APPLICATION_CREDENTIALS="/path/to/your/service-account-key.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="colas-training-service-account.json"

# Import des librairies nécessaires

In [4]:
import asyncio

from semantic_kernel import Kernel
from semantic_kernel.utils.logging import setup_logging
from semantic_kernel.functions import kernel_function
from semantic_kernel.connectors.ai.open_ai import AzureChatCompletion
from semantic_kernel.connectors.ai.function_choice_behavior import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.chat_completion_client_base import ChatCompletionClientBase
from semantic_kernel.contents.chat_history import ChatHistory
from semantic_kernel.functions.kernel_arguments import KernelArguments
import logging
from semantic_kernel.connectors.ai.open_ai.prompt_execution_settings.azure_chat_prompt_execution_settings import (
    AzureChatPromptExecutionSettings,
)
import yaml
import os
from semantic_kernel.functions import kernel_function
from datetime import datetime
import requests
from typing import Annotated, Optional, List, Dict, Any
from google.cloud import bigquery
import json

# Définition du Tool permettant de récupérer la météo

In [5]:
class WeatherPlugin:
    """Weather plugin for Semantic Kernel with Open-Meteo API integration."""

    @kernel_function(
    name="get_current_weather",
    description="Fetches current weather for given coordinates using Open-Meteo API"
    )
    def get_current_weather(
        self,
        latitude: float,
        longitude: float
    ) -> dict:
        """Fetches current weather for given coordinates.

        Args:
            latitude: Latitude coordinate
            longitude: Longitude coordinate

        Returns:
            dict: Weather details with status
        """
        url = "https://api.open-meteo.com/v1/forecast"

        params = {
            "latitude": latitude,
            "longitude": longitude,
            "current": [
                "temperature_2m",
                "relative_humidity_2m",
                "precipitation",
                "weather_code",
                "wind_speed_10m",
            ],
            "timezone": "auto",
        }

        try:
            response = requests.get(url, params=params)

            if response.status_code != 200:
                return {"status": "error", "error_message": f"API Error: {response.text}"}

            data = response.json()
            current = data["current"]

            weather_description = self._get_weather_description(current["weather_code"])

            return {
                "status": "success",
                "latitude": data["latitude"],
                "longitude": data["longitude"],
                "description": weather_description,
                "temperature": current["temperature_2m"],
                "humidity": current["relative_humidity_2m"],
                "wind_speed": current["wind_speed_10m"],
                "precipitation_1h": current["precipitation"],
            }
        except Exception as e:
            return {"status": "error", "error_message": f"Request failed: {str(e)}"}

    @kernel_function(
        name="get_weather_forecast",
        description="Fetches daily weather forecast for up to 16 days using Open-Meteo API"
    )
    def get_weather_forecast(
        self,
        latitude: float,
        longitude: float,
        days: int = 7
    ) -> dict:
        """Fetches daily weather forecast.

        Args:
            latitude: Latitude coordinate
            longitude: Longitude coordinate
            days: Number of days for forecast (default 7, max 16)

        Returns:
            dict: Forecast details with status
        """
        url = "https://api.open-meteo.com/v1/forecast"

        days = min(days, 16)

        params = {
            "latitude": latitude,
            "longitude": longitude,
            "daily": [
                "temperature_2m_max",
                "temperature_2m_min",
                "weather_code",
                "precipitation_sum",
                "wind_speed_10m_max",
                "wind_gusts_10m_max",
                "wind_direction_10m_dominant",
            ],
            "timezone": "auto",
            "forecast_days": days,
        }

        try:
            response = requests.get(url, params=params)

            if response.status_code != 200:
                return {"status": "error", "error_message": f"API Error: {response.text}"}

            data = response.json()
            daily = data["daily"]

            forecasts = []
            for i in range(len(daily["time"])):
                weather_description = self._get_weather_description(daily["weather_code"][i])
                forecasts.append(
                    {
                        "date": daily["time"][i],
                        "temp_max": daily["temperature_2m_max"][i],
                        "temp_min": daily["temperature_2m_min"][i],
                        "description": weather_description,
                        "precipitation": daily["precipitation_sum"][i],
                        "wind_speed_max": daily["wind_speed_10m_max"][i],
                        "wind_gusts_max": daily["wind_gusts_10m_max"][i],
                        "wind_direction": daily["wind_direction_10m_dominant"][i],
                    }
                )

            return {
                "status": "success",
                "latitude": data["latitude"],
                "longitude": data["longitude"],
                "forecasts": forecasts,
            }
        except Exception as e:
            return {"status": "error", "error_message": f"Request failed: {str(e)}"}

    @kernel_function(
        name="get_current_date",
        description="Fetches the current date and time"
    )
    def get_current_date(self) -> dict:
        """Fetches the current date and time.

        Returns:
            dict: Current date and time with status
        """
        try:
            current_date_time = datetime.utcnow().isoformat()
            return {"status": "success", "current_date_time": current_date_time}
        except Exception as e:
            return {"status": "error", "error_message": str(e)}

    def _get_weather_description(self, code: int) -> str:
        """Maps WMO weather code to description.

        Args:
            code: WMO weather code

        Returns:
            str: Weather description
        """
        weather_codes = {
            0: "clear sky",
            1: "mainly clear",
            2: "partly cloudy",
            3: "overcast",
            45: "fog",
            48: "depositing rime fog",
            51: "light drizzle",
            53: "moderate drizzle",
            55: "dense drizzle",
            56: "light freezing drizzle",
            57: "dense freezing drizzle",
            61: "slight rain",
            63: "moderate rain",
            65: "heavy rain",
            66: "light freezing rain",
            67: "heavy freezing rain",
            71: "slight snow",
            73: "moderate snow",
            75: "heavy snow",
            77: "snow grains",
            80: "slight rain showers",
            81: "moderate rain showers",
            82: "violent rain showers",
            85: "slight snow showers",
            86: "heavy snow showers",
            95: "thunderstorm",
            96: "thunderstorm with slight hail",
            99: "thunderstorm with heavy hail",
        }
        return weather_codes.get(code, "unknown")


# Tool Bigquery

In [6]:
from google.cloud import bigquery
from typing import Annotated, Optional
from semantic_kernel.functions import kernel_function
import json
import yaml


class BigQueryPlugin:
    """
    A simplified Semantic Kernel plugin for BigQuery table queries.
    The agent determines the SQL query, executes it, and returns results.
    """
    
    def __init__(self):
        """
        Initialize the BigQuery plugin.
        Configured for table: colas-training.colas_data.data-chantier
        """
        self.client = bigquery.Client(project="colas-training")
        self.project_id = "colas-training"
        self.dataset_id = "colas_data"
        self.table_name = "data-chantier"
        self.full_table_id = f"{self.project_id}.{self.dataset_id}.`{self.table_name}`"
        self.schema_cache = {}
    
    def _get_table_schema_yaml(self) -> str:
        """
        Get table schema in YAML format for the agent to understand the table structure.
        """
        if self.schema_cache:
            return self.schema_cache.get('schema', '')
        
        try:
            table_ref = f"{self.project_id}.{self.dataset_id}.{self.table_name}"
            table = self.client.get_table(table_ref)
            
            schema_dict = {
                'platform': 'BigQuery',
                'project': self.project_id,
                'dataset': self.dataset_id,
                'table': self.table_name,
                'full_path': f"{self.project_id}.{self.dataset_id}.{self.table_name}",
                'description': table.description or 'Colas construction site data',
                'columns': []
            }
            
            for field in table.schema:
                column_info = {
                    'name': field.name,
                    'type': field.field_type,
                }
                if field.description:
                    column_info['description'] = field.description
                schema_dict['columns'].append(column_info)
            
            yaml_schema = yaml.dump(schema_dict, default_flow_style=False)
            self.schema_cache['schema'] = yaml_schema
            return yaml_schema
            
        except Exception as e:
            return f"Error retrieving schema: {str(e)}"
    
    @kernel_function(
        name="get_table_schema",
        description="Gets the schema of the data-chantier table including column names, types, and descriptions. ALWAYS use this first to understand the table structure before creating a query."
    )
    async def get_table_schema(self) -> str:
        """Retrieve table schema so the agent can construct appropriate queries."""
        return self._get_table_schema_yaml()
    
    @kernel_function(
        name="execute_query",
        description="Executes a SQL SELECT query against the data-chantier table and returns the results. The table path is already configured as `colas-training.colas_data.data-chantier`."
    )
    async def execute_query(
        self,
        query: Annotated[str, "The SQL SELECT query to execute. The table is available as `colas-training.colas_data.data-chantier` or you can use backticks for the table name with hyphens."],
        max_results: Annotated[int, "Maximum number of results to return. Default is 100."] = 100
    ) -> str:
        """
        Execute a BigQuery SQL query and return results as JSON.
        """
        # Security: Ensure query is SELECT only
        query_upper = query.strip().upper()
        if not query_upper.startswith('SELECT'):
            return json.dumps({
                'error': 'Only SELECT queries are allowed',
                'row_count': 0,
                'data': []
            })
        
        # Prevent dangerous operations
        dangerous_keywords = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'CREATE', 'ALTER', 'TRUNCATE']
        if any(keyword in query_upper for keyword in dangerous_keywords):
            return json.dumps({
                'error': 'Query contains prohibited keywords. Only SELECT queries are allowed.',
                'row_count': 0,
                'data': []
            })
        
        try:
            query_job = self.client.query(query)
            results = query_job.result(max_results=max_results)
            
            # Convert to list of dictionaries
            rows = []
            for row in results:
                rows.append(dict(row))
            
            result_data = {
                'row_count': len(rows),
                'total_rows': results.total_rows,
                'data': rows,
                'success': True
            }
            
            return json.dumps(result_data, indent=2, default=str)
            
        except Exception as e:
            return json.dumps({
                'error': f"Error executing query: {str(e)}",
                'row_count': 0,
                'data': [],
                'success': False
            })

In [9]:

async def main():
    # Initialize the kernel
    kernel = Kernel()

    # Add Azure OpenAI chat completion
    chat_completion = AzureChatCompletion(
        deployment_name=os.getenv("DEPLOYMENT_NAME"),
        api_key=os.getenv("API_KEY"),
        base_url=os.getenv("BASE_URL"),
    )
    kernel.add_service(chat_completion)

    # Set the logging level for  semantic_kernel.kernel to DEBUG.
    setup_logging()
    logging.getLogger("kernel").setLevel(logging.DEBUG)

    # Add BigQuery plugin
    bq_plugin = BigQueryPlugin()
    kernel.add_plugin(
        bq_plugin,
        plugin_name="BigQuery"
    )

    # Enable planning
    execution_settings = AzureChatPromptExecutionSettings()
    execution_settings.function_choice_behavior = FunctionChoiceBehavior.Auto()

    # Create a history of the conversation
    history = ChatHistory()

    # Initiate a back-and-forth chat
    userInput = None
    while True:
        # Collect user input
        userInput = input("User > ")

        # Terminate the loop if the user says "exit"
        if userInput == "exit":
            break

        # Add user input to the history
        history.add_user_message(userInput)

        # Get the response from the AI
        result = await chat_completion.get_chat_message_content(
            chat_history=history,
            settings=execution_settings,
            kernel=kernel,
        )

        # Print the results
        print("Assistant > " + str(result))

        # Add the message from the agent to the chat history
        history.add_message(result)

await main()

Assistant > Here are the first 5 rows of the table colas-training.colas_data.data-chantier:

1) ID_chef_chantier: C149
   NOM_chef_chantier: Durand Olivier
   COOR_chantier: 47.9029;1.9093
   TYPE_chantier: Aménagement paysager

2) ID_chef_chantier: C199
   NOM_chef_chantier: Durand Olivier
   COOR_chantier: 47.9029;1.9093
   TYPE_chantier: Aménagement paysager

3) ID_chef_chantier: C118
   NOM_chef_chantier: Petit Nathalie
   COOR_chantier: 44.8378;-0.5792
   TYPE_chantier: Aménagement paysager

4) ID_chef_chantier: C168
   NOM_chef_chantier: Petit Nathalie
   COOR_chantier: 44.8378;-0.5792
   TYPE_chantier: Aménagement paysager

5) ID_chef_chantier: C107
   NOM_chef_chantier: Richard Michel
   COOR_chantier: 48.8566;2.3522
   TYPE_chantier: Aménagement paysager

Would you like me to display more rows, filter by a column (for example by chef name or type), or convert the COOR_chantier field into separate latitude/longitude columns?
Assistant > I filtered the table for TYPE_chantier = 