# Marseille Food Tour

![food-tour.png](https://i.postimg.cc/7Lqxx9gT/food-tour.png)

Le projet Marseille Food Tour est con√ßu pour exploiter les donn√©es afin d‚Äôoptimiser l‚Äôexp√©rience touristique gastronomique √† Marseille. √Ä cet effet, il int√®gre un wrapper m√©diateur qui centralise et standardise l‚Äôacc√®s aux diff√©rentes sources de donn√©es (bases SQL, API externes, etc.), facilitant ainsi leur int√©gration dans le syst√®me global.

Par ailleurs, le projet met en ≈ìuvre des vues mat√©rialis√©es pour pr√©-calculer et stocker des agr√©gats de donn√©es complexes (Calculs de Distances), permettant une consultation rapide et performante lors des analyses. Ces vues sont automatiquement rafra√Æchies via des cron jobs, assurant que les donn√©es pr√©sent√©es restent toujours √† jour et pertinentes.

### Le portage de notre projet sur Databricks Community Edition est con√ßu pour √™tre enti√®rement scalable et pr√™t √† √©voluer. 

### Voici les points cl√©s :


Les fichiers CSV sources, repr√©sentant des donn√©es historiques, sont import√©s et trait√©s pour constituer nos DataFrames. En parall√®le, nous enrichissons ces donn√©es en consommant avec soin des APIs pour obtenir des informations m√©t√©orologiques en temps r√©el.

Un wrapper m√©diateur centralise l'acc√®s aux diff√©rentes sources de donn√©es, facilitant l'int√©gration de nouvelles sources ou services √† mesure que le projet se d√©veloppe.
De plus, l'utilisation de vues mat√©rialis√©es permet de pr√©-calculer des agr√©gats pour acc√©l√©rer les requ√™tes analytiques, m√™me en cas de volume de donn√©es croissant.

Pour garantir que les vues mat√©rialis√©es et les donn√©es m√©t√©o restent √† jour, nous pouvons mettre en place des Job Scheduler de Databricks qui automatisent leur rafra√Æchissement r√©gulier. Cette solution offre un bon compromis pour garantir la vitesse des requetes et la pertinence des donn√©es.

L'architecture actuelle pr√©pare le projet √† supporter une extension future, que ce soit par l'ajout de nouvelles sources de donn√©es, l'int√©gration de services compl√©mentaires ou le passage √† un environnement de production plus large. Cette approche modulaire ouvre la voie √† un d√©veloppement continu et flexible de notre solution.

### Les donn√©es viennent de data.gouv.fr

### Vous trouverez les Vues Construites un peu plus bas

### Le but de ce projet est de permettre des requetes rapides pour trouver des √©lements de plusieurs tables postgresql les plus proches ( selon la distance g√©ographique )
### Vous trouverez quelques exemples concrets d'utilisations que vous pourrez executer √† la fin du notebook


In [79]:
%pip install SQLAlchemy==2.0.40

Note: you may need to restart the kernel to use updated packages.


In [None]:
import os
import requests
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine, types, text
import psycopg2
from psycopg2.extras import execute_values


# ---------------------------
# Environment variables setup
# ---------------------------

# You'll need to set up a database

DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "food_tour"
DB_USER = "postgres"
DB_PASSWORD = "root"


In [None]:
API_KEY = ""  # Your OpenWeatherMap API key


In [82]:
# Base abstract class for all wrappers
class BaseWrapper:
    def __init__(self, data_source=None):
        self.data_source = data_source
    def load_data(self, data_source):
        print(f"üìÑ Lecture du fichier CSV : {data_source}")
        df = pd.read_csv(data_source)
        # raise NotImplementedError("Subclasses should implement load_data method")
        return df
    def preprocess_data(self, df):
        raise NotImplementedError("Subclasses should implement preprocess_data method")
    def run(self):
        df = self.load_data(self.data_source)
        df = self.preprocess_data(df)
        return df
    

In [84]:
# Concrete implementation using OpenWeatherMap API
class OpenWeatherMapWrapper(BaseWrapper):
    BASE_URL = "https://api.openweathermap.org/data/2.5/forecast"
    # TABLE_NAME = "meteo_forecast"
    API_KEY = "2ec05eca6b9b8b051818e2aa3aedc23b"  # Your OpenWeatherMap API key
# ---------------------------
    def __init__(self):
        self.api_key = API_KEY
        self.data_source = "Marseille,FR"
        self.column_types = {
            "timestamp": types.TIMESTAMP(timezone=True),
            "temperature": types.NUMERIC(5, 2),
            "humidity": types.INTEGER,
            "wind_speed": types.NUMERIC(5, 2),
            "weather_description": types.TEXT
        }
        # self.table_name = "meteo_forecast"

        
    def load_data(self, location):
        params = {
            "appid": self.api_key,
            "units": "metric"  # Use metric units (temperature in Celsius, wind speed in m/s)
        }
        # Accept location as a city name (e.g., "Marseille,FR") or (lat,lon) tuple.
        if isinstance(location, str):
            params["q"] = location
        elif isinstance(location, tuple) and len(location) == 2:
            params["lat"] = location[0]
            params["lon"] = location[1]
        else:
            raise ValueError("location must be a string (city name) or a (lat, lon) tuple")
        
        response = requests.get(self.BASE_URL, params=params)
        response.raise_for_status()   # Raise an error if the request failed
        return response.json()
    
    def preprocess_data(self, raw_data):
        # Normalize the API's JSON output into a standardized DataFrame.
        # Our goal is to output a DataFrame with the following columns:
        # timestamp, temperature, humidity, wind_speed, weather_description
        records = []
        for entry in raw_data.get("list", []):
            dt = datetime.utcfromtimestamp(entry.get("dt"))
            temp = entry["main"]["temp"]
            humidity = entry["main"]["humidity"]
            wind_speed = entry["wind"]["speed"]
            # There may be several weather entries; we take the first one.
            description = entry["weather"][0]["description"]
            records.append({
                "timestamp": dt,
                "temperature": temp,
                "humidity": humidity,
                "wind_speed": wind_speed,
                "weather_description": description
            })
        return pd.DataFrame(records)
    def run(self):
        data = self.load_data(self.data_source)
        return self.preprocess_data(data)

In [85]:

class RestaurantDataWrapper(BaseWrapper):

    def __init__(self):
        self.data_source = "restaurants_final.csv"
        self.column_types = {
            "nom": types.Text(),
            "description": types.Text(),
            "adresse": types.Text(),
            "code_postal": types.Integer(),
            "commune": types.Text(),
            "latitude": types.Numeric(9, 6),
            "longitude": types.Numeric(9, 6),
            "periode_ouverte": types.Text(),
            "specialites": types.Text(),
            "review_count": types.Integer(),
            "google_note": types.Numeric(2, 1)
        }

    def preprocess_data(self, df):
        return df


In [86]:

# ------------------------
# Run block
# ------------------------
restaurant_wrapper = RestaurantDataWrapper()
restaurant_wrapper.run()

üìÑ Lecture du fichier CSV : restaurants_final.csv


Unnamed: 0,nom,description,adresse,code_postal,commune,latitude,longitude,periode_ouverte,specialites,review_count,google_note
0,Abaco,Cuisine traditionnelle et m√©ridionale raffin√©e...,38 Rue Papety,13007,Marseille,43.290686,5.356852,"Du 02/12 au 31/12, tous les lundis, mardis, me...","Cuisine m√©diterran√©enne, Cuisine traditionnell...",27.0,4.3
1,Adonis du Liban,Plongez au coeur de la culture libanaise : nou...,10-12 Des Trois Rois,13006,Marseille,43.294179,5.384598,"Toute l'ann√©e, tous les lundis, mercredis, jeu...","Cuisine orientale, Cuisine libanaise, Cuisine ...",28.0,3.6
2,Tonton Marius,"Chez Tonton Marius, c'est un cocon convivial e...",9 quai du Lazaret,13002,Marseille,43.306758,5.364598,Toute l'ann√©e. Tous les jours de 10h √† 20h.,"Cuisine m√©diterran√©enne, Cuisine italienne, Sa...",2.0,4.5
3,Al Dente,Nous ouvrons ce restaurant en 1984 et il conna...,10 rue Edmond Rostand,13006,Marseille,43.289296,5.379859,"Toute l'ann√©e, tous les jours.\nOuvert midi et...","Cuisine m√©diterran√©enne, Cuisine italienne",261.0,3.8
4,L'Atelier Dkal√©,L'Atelier est ouvert du mardi au vendredi pour...,17 Rue Mar√©chal Fayolle,13004,Marseille,43.300811,5.398588,"Toute l'ann√©e, tous les mardis, mercredis, jeu...",,1.0,5.0
...,...,...,...,...,...,...,...,...,...,...,...
614,Shanga√Ø Kitchen,Vous ne vous tromperez pas : c'est bien ici qu...,14 cours Jean Ballard,13001,Marseille,43.293250,5.374266,"Toute l'ann√©e, tous les mardis, mercredis, jeu...","Cuisine asiatique, Cuisine chinoise, Plats v√©g...",,
615,Grand Bao,C‚Äôest le nouveau restaurant de la Bao Family e...,3 cours Saint Louis,13001,Marseille,43.296117,5.378259,"Toute l'ann√©e, tous les jours de 12h √† 23h.\nO...","Cuisine asiatique, Cuisine chinoise",,
616,Tigermilk,Cuisine sud-am√©ricaine servie dans un cadre su...,23 rue Saint Sa√´ns,13001,Marseille,43.293412,5.373807,"Toute l'ann√©e, tous les jours de 12h √† 23h.\nO...","Cuisine sud-am√©ricaine, Plats v√©g√©tariens",,
617,MS Club,MS CLUB est bien plus qu'un simple restaurant ...,9 Bd Gay Lussac,13011,Marseille,43.334248,5.374000,"Du 01/01/2024 au 31/12/2025, tous les jours de...","Cuisine m√©diterran√©enne, Cuisine traditionnell...",,


In [88]:
class TransportStopDataInjector(BaseWrapper):
    def __init__(self):
        self.data_source = "cleaned_arrets.csv"
        self.column_types = {
            "id": types.Text(),
            "name": types.Text(),
            "latitude": types.Numeric(9, 6),
            "longitude": types.Numeric(9, 6),
            "description": types.Text(),
            "wheelchair boarding": types.Integer()
        }

    def preprocess_data(self, df):
        """Preprocess the DataFrame: clean column names, split coordinates."""

        # Convert all column names to lowercase (strip whitespace as well)
        df.columns = df.columns.str.strip().str.lower()

        # Manipulate the 'coordinates' column:
        if "coordinates" in df.columns:
            print("üõ†Ô∏è Splitting 'coordinates' into 'latitude' and 'longitude'...")
            df[['latitude', 'longitude']] = df['coordinates'].str.split(',', expand=True)
            df['latitude'] = df['latitude'].astype(float)
            df['longitude'] = df['longitude'].astype(float)
            df = df.drop(columns=['coordinates'])
        else:
            print("‚ÑπÔ∏è No 'coordinates' column found. Check your CSV format.")
        
        return df

In [89]:
arrets_wrapper = TransportStopDataInjector()
arrets_wrapper.run()

üìÑ Lecture du fichier CSV : cleaned_arrets.csv
üõ†Ô∏è Splitting 'coordinates' into 'latitude' and 'longitude'...


Unnamed: 0,id,name,description,wheelchair boarding,latitude,longitude
0,RTM-00001532,3 Fr√®res Barth√©lemy,,2,43.293140,5.385872
1,RTM-00000154,4 Chemins des Aygalades,,2,43.346700,5.366183
2,RTM-00000217,5 Avenues Burel,,2,43.317198,5.389619
3,RTM-00003085,5 Avenues Fond√®re,,2,43.301798,5.399085
4,RTM-00001955,Aiguier Cantini,,2,43.257998,5.403374
...,...,...,...,...,...,...
2661,RTM-00000152,ZI de la Delorme,,1,43.340822,5.365446
2662,RTM-00001938,Z√©natti Ecoles,,2,43.246699,5.385828
2663,RTM-00001947,Z√©natti Ecoles,,2,43.246464,5.386440
2664,RTM-00001927,Z√©natti Lancier,,2,43.246426,5.383798


In [90]:
class MusicEventDataInjector(BaseWrapper):

    def __init__(self):
        self.data_source = "evenement_musical_cleaned.csv"
        self.column_types = {
            "titre": types.Text(),
            "description": types.Text(),
            "nom_lieu": types.Text(),
            "adresse": types.Text(),
            "ville": types.Text(),
            "code_postal": types.Integer(),
            "latitude": types.Numeric(9, 6),
            "longitude": types.Numeric(9, 6),
            "date_debut": types.DateTime(),
            "date_fin": types.DateTime()
        }
        
    def preprocess_data(self, df):
        
        # Normalize column names: strip, lowercase and replace spaces with underscores
        df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
        print("Detected columns:", df.columns.tolist())

        # Ensure correct numeric types for the relevant columns
        df["code_postal"] = pd.to_numeric(df["code_postal"], errors="coerce").astype("Int64")
        df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce")
        df["latitude"] = pd.to_numeric(df["latitude"], errors="coerce")
        return df


In [91]:
music_event_injector = MusicEventDataInjector()
music_event_injector.run()

üìÑ Lecture du fichier CSV : evenement_musical_cleaned.csv
Detected columns: ['titre', 'description', 'nom_lieu', 'adresse', 'ville', 'code_postal', 'latitude', 'longitude', 'date_debut', 'date_fin']


Unnamed: 0,titre,description,nom_lieu,adresse,ville,code_postal,latitude,longitude,date_debut,date_fin
0,Concert & repas de soutien: Roms dans la guerr...,‚ô´‚ô´‚ô´,La Dar Centre Social Autog√©r√©,"127, rue d'Aubagne 13006 Marseille",Marseille,13006,43.293333,5.382115,2025-04-10 19:00:00+02:00,2025-04-10 22:30:00+02:00
1,DESCARGAS CUBANAS #4,‚ô´‚ô´‚ô´,Club 27,"27, rue d'Anvers13004 Marseille",Marseille,13004,43.303056,5.395165,2025-04-10 19:00:00+02:00,2025-04-10 22:00:00+02:00
2,Swinging Papy's,‚ô´JAZZ‚ô´,L'arlequin,"1, rue Missiri 13013 Marseille",Marseille,13014,43.355711,5.410335,2025-04-10 19:30:00+02:00,2025-04-10 23:59:00+02:00
3,LocoMuerte kami no ikari Rising All Star,‚ô´TRASH PUNK METAL‚ô´,Makeda,"103 rue Ferrari, 13005 Marseille",Marseille,13005,43.293709,5.391759,2025-04-10 19:30:00+02:00,2025-04-10 23:00:00+02:00
4,Jam open Mic / Concert,‚ô´‚ô´‚ô´,Bar de La Plaine,"57 Place Jean Jaur√®s, 13005 Marseille",Marseille,13005,43.293987,5.386669,2025-04-10 20:00:00+02:00,2025-04-11 01:00:00+02:00
...,...,...,...,...,...,...,...,...,...,...
419,LA FOUINE,‚ô´‚ô´‚ô´,le D√¥me,"48 Avenue de Saint-Just, 13004 Marseille",Marseille,13013,43.314877,5.404349,2026-01-20 20:00:00+01:00,2026-01-20 23:00:00+01:00
420,LOUANE,‚ô´‚ô´‚ô´,le D√¥me,"48 Avenue de Saint-Just, 13004 Marseille",Marseille,13013,43.314877,5.404349,2026-01-30 20:00:00+01:00,2026-01-30 23:00:00+01:00
421,Ara Malikian,‚ô´‚ô´‚ô´,le Silo,"35 quai du Lazaret, 13002 Marseille",Marseille,13002,43.310347,5.367188,2026-03-23 20:00:00+01:00,2026-03-23 22:00:00+01:00
422,Kim Wilde,‚ô´‚ô´‚ô´,le Silo,"35 quai du Lazaret, 13002 Marseille",Marseille,13002,43.310347,5.367188,2026-04-16 20:30:00+02:00,2026-04-16 23:30:00+02:00


In [92]:
import ast
class TransportLineDataWrapper(BaseWrapper):
    def __init__(self):
        self.data_source = "lignes_transport_with_stop_ids.csv"
        self.column_types = {
            "id": types.Text(),
            "short_name": types.Text(),
            "long_name": types.Text(),
            "route_type": types.Text(),  # 'Bus', 'Tram', etc.
            "color": types.Text()
        }
    def load_data(self, data_source):
        """Read CSV and prepare dataframe."""
        print(f"üìÑ Reading CSV file: {self.data_source}")
        df = pd.read_csv(self.data_source, sep=";")
        return df
    def preprocess_data(self, df):
        # Normalize column names
        df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
        df = df[['id', 'short_name', 'long_name', 'route_type', 'color']]
        df.columns = ['id', 'short_name', 'long_name', 'route_type', 'color']
        return df

In [93]:
#test
transport_line_wrapper = TransportLineDataWrapper()
transport_line_wrapper.run()

üìÑ Reading CSV file: lignes_transport_with_stop_ids.csv


Unnamed: 0,id,short_name,long_name,route_type,color
0,RTM-9,5,M√©tro La Rose - La Parade,Bus,009FE3
1,RTM-14,6,Foch 5 Avenues - Bois Lema√Ætre,Bus,F39200
2,RTM-19,7T,Foch 5 Avenues - Allauch Barbaraou,Bus,FBBA00
3,RTM-13,9,Les Caillols Centre Urbain - St Julien,Bus,189B52
4,AUB-10,10,Gare Aubagne - Font de Mai - La Treille,Bus,E8378D
...,...,...,...,...,...
147,RTM-122,144S,M√©tro La Rose - Allauch,Bus,E73333
148,RTM-121,144,M√©tro La Rose - Allauch,Bus,009FE3
149,RTM-107,530,Canebi√®re Bourse - La Savine Les Pins,Bus,009FE3
150,RTM-108,533,Canebi√®re Bourse - G√©raniums,Bus,009640


In [94]:
import ast
class TransportLineStopsDataWrapper(BaseWrapper):
    def __init__(self):
        self.data_source = "lignes_transport_with_stop_ids.csv"
        self.column_types = {
            "line_id": types.Text(),
            "stop_id": types.Text()
        }
    def load_data(self, data_source):
        """Read CSV and prepare dataframe."""
        print(f"üìÑ Reading CSV file: {self.data_source}")
        df = pd.read_csv(self.data_source, sep=";")
        df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
        # print("Detected columns:", df.columns.tolist())
        
        return df
    def preprocess_data(self, df):
        # Normalize column names
        df = df[['id', 'short_name', 'long_name', 'route_type', 'color', 'geo_point_2d', 'stop_ids']]
        df.columns = ['id', 'short_name', 'long_name', 'route_type', 'color', 'geo_point_2d', 'stop_ids']
        line_stop_pairs = []

        for _, row in df.iterrows():
            line_id = row['id']
            try:
                stop_ids = ast.literal_eval(row['stop_ids'])  # Parse the string into a Python list
                for stop_id in stop_ids:
                    line_stop_pairs.append({'line_id': line_id, 'stop_id': stop_id})
            except Exception as e:
                print(f"‚ö†Ô∏è Error parsing stop_ids for line {line_id}: {e}")

        df_line_stops = pd.DataFrame(line_stop_pairs)
        return df_line_stops

In [95]:
line_stops_wrapper = TransportLineStopsDataWrapper()
line_stops_wrapper.run()

üìÑ Reading CSV file: lignes_transport_with_stop_ids.csv


Unnamed: 0,line_id,stop_id
0,RTM-9,RTM-00003930
1,RTM-9,RTM-00003931
2,RTM-9,RTM-00000738
3,RTM-9,RTM-00000731
4,RTM-9,RTM-00000677
...,...,...
5481,RTM-110,RTM-00003828
5482,RTM-110,RTM-00003828
5483,RTM-110,RTM-00002051
5484,RTM-110,RTM-00000468


In [96]:
class BaignadeDataInjector(BaseWrapper):
    def __init__(self):
        self.data_source = "baignades.csv"
        self.column_types = {
            "nom_du_site": types.Text(),
            "categorie": types.Text(),
            "baignade_surveillee": types.Boolean(),
            "adresse": types.Text(),
            "code_postal": types.Integer(),
            "ville": types.Text(),
            "numero_de_telephone": types.Text(),
            "longitude": types.Numeric(9, 6),
            "latitude": types.Numeric(9, 6)
        }

    def preprocess_data(self, df):
        
        df.columns = df.columns.str.strip().str.lower().str.replace(" ", "_")
        print("Detected columns:", df.columns.tolist())

        # Convert and clean columns
        df["baignade_surveillee"] = df["baignade_surveillee"].str.strip().str.lower().map({
            "oui": True, "non": False, "non concerne": False
        })
        df["code_postal"] = pd.to_numeric(df["code_postal"], errors="coerce").astype("Int64")
        df["longitude"] = pd.to_numeric(df["longitude"], errors="coerce")
        df["latitude"] = pd.to_numeric(df["latitude"], errors="coerce")
        return df

In [97]:
baignade_wrapper = BaignadeDataInjector()
baignade_wrapper.run()

üìÑ Lecture du fichier CSV : baignades.csv
Detected columns: ['nom_du_site', 'categorie', 'baignade_surveillee', 'adresse', 'code_postal', 'ville', 'numero_de_telephone', 'longitude', 'latitude']


Unnamed: 0,nom_du_site,categorie,baignade_surveillee,adresse,code_postal,ville,numero_de_telephone,longitude,latitude
0,Base nautique de l'Huveaune,Bases nautiques,False,109 avenue Pierre Mendes France,13008,Marseille,04 91 76 54 42,5.376404,43.261086
1,Base Nautique Corbieres,Bases nautiques,False,250 plage de l'Estaque,13016,Marseille,04 91 46 19 37,5.293255,43.358719
2,Base Nautique du Roucas Blanc,Bases nautiques,False,2 promenade Georges Pompidou,13008,Marseille,04 91 29 30 40,5.371334,43.266873
3,Base Nautique de Pointe Rouge,Bases nautiques,False,Port de la pointe Rouge,13008,Marseille,04 91 73 74 23,5.370918,43.244462
4,Plage Borely,Plages,True,,13008,Marseille,,5.37506,43.255979
5,Plage de David,Plages,True,,13008,Marseille,,5.372253,43.261648
6,Plage du Grand Roucas,Plages,True,,13008,Marseille,,5.370885,43.263369
7,Plage de l'Huveaune,Plages,True,,13008,Marseille,,5.375597,43.258558
8,Plage de Bonneveine,Plages,True,,13008,Marseille,,5.374763,43.253519
9,Plage de la Vieille Chapelle,Plages,True,,13008,Marseille,,5.373735,43.249989


# Views


In [98]:
sql_vue_mat_baignades_arrets = """ DROP MATERIALIZED VIEW IF EXISTS baignades_nearest_stops CASCADE;

create materialized view if not exists baignades_nearest_stops as
   with ranked_stops as (
      select b.nom_du_site as nom,
                b.categorie,
                b.baignade_surveillee,
                b.adresse,
                b.numero_de_telephone,
             a.id as arret_id,
             a.name as arret_name,
             b.geog as geog_baignade,
             a.geog as geog_arret,
             st_distance(
                b.geog,
                a.geog
             ) as distance,
             row_number()
             over(partition by b.nom_du_site
                  order by b.geog <-> a.geog
             ) as rn
        from baignades b
        join arrets_transport a
      on st_dwithin(
         b.geog,
         a.geog,
         400
      )
   )
   select nom,
          geog_baignade,
            categorie,
            baignade_surveillee,
            adresse,
            numero_de_telephone,
          arret_id,
          arret_name,
          geog_arret,
          distance
     from ranked_stops
    where rn = 1; """

In [99]:
sql_vue_mat_baignade_meteo = """
DROP MATERIALIZED VIEW IF EXISTS baignades_meteo CASCADE;
create materialized view if not exists baignades_meteo as
   select b.nom_du_site,
          b.categorie,
          b.baignade_surveillee,
          b.adresse,
          b.numero_de_telephone,
          b.geog as geog_baignade,
          m.timestamp as forecast_time,
          m.temperature as temperature,
          m.humidity as humidite,
          m.wind_speed as vitesse_vent,
          m.weather_description as description_meteo
     from baignades b
    cross join meteo_forecast m
    order by 
             m.timestamp, b.nom_du_site;"""

In [100]:
sql_vue_mat_baignade_meilleurs_conditions = """
DROP MATERIALIZED VIEW IF EXISTS baignades_best_conditions CASCADE;
CREATE MATERIALIZED VIEW IF NOT EXISTS baignades_best_conditions AS
SELECT nom_du_site, 
          b.categorie,
          b.baignade_surveillee,
          b.adresse,
          b.numero_de_telephone,
          b.geog_baignade,
         forecast_time::date AS date, temperature, humidite, vitesse_vent, description_meteo
FROM baignades_meteo b
WHERE temperature >= 25 AND vitesse_vent <= 20
ORDER BY temperature DESC;
"""

In [101]:
sql_vue_mat_baignade_temperature_moy = """
DROP MATERIALIZED VIEW IF EXISTS avg_temp_per_baignade CASCADE;
CREATE MATERIALIZED VIEW IF NOT EXISTS avg_temp_per_baignade AS
SELECT
  nom_du_site,
  geog_baignade,
  forecast_time::date AS date,
  AVG(temperature) AS avg_temp
FROM baignades_meteo
GROUP BY nom_du_site, forecast_time::date, geog_baignade
ORDER BY date, avg_temp DESC;
"""

In [102]:
sql_vue_mat_baignade_arrets_temp = """
DROP MATERIALIZED VIEW IF EXISTS baignades_with_nearest_stop_and_temp CASCADE;
CREATE MATERIALIZED VIEW IF NOT EXISTS baignades_with_nearest_stop_and_temp AS
SELECT bns.nom,
    bns.categorie,
    bns.geog_baignade,
          bns.baignade_surveillee,
        bns.adresse,
        bns.numero_de_telephone,
            bns.arret_name,
          bns.geog_arret,
          bns.distance,
    atb.avg_temp
FROM baignades_nearest_stops bns
JOIN avg_temp_per_baignade atb ON bns.nom = atb.nom_du_site;
"""

In [103]:
sql_vue_mat_events_arrets ="""drop materialized view if exists evenements_arrets CASCADE;
create MATERIALIZED view if not exists evenements_arrets as
   with ranked_stops as (
      select e.titre as evenement,
                e.description as evenement_description,
                e.nom_lieu as evenement_lieu,
                e.date_debut as debut_evenement,
                e.date_fin as fin_evenement,
             a.id as arret_id,
             a.name as arret_name,
             e.geog as geog_evenement,
             a.geog as geog_arret,
             st_distance(
                e.geog,
                a.geog
             ) as distance,
             row_number()
             over(partition by e.titre
                  order by e.geog <-> a.geog
             ) as rn
        from evenements_musicaux e
        join arrets_transport a
      on st_dwithin(
         e.geog,
         a.geog,
         400
      )
   )
   select evenement,
            evenement_description,
            evenement_lieu,
            debut_evenement,
            fin_evenement,
          arret_id,
          arret_name,
          geog_evenement,
          geog_arret,
          distance
     from ranked_stops
    where rn <= 3;"""

In [104]:
sql_vue_mat_nb_evenement_arret = """
DROP MATERIALIZED VIEW IF EXISTS nb_evenements_par_arret CASCADE;
CREATE MATERIALIZED VIEW IF NOT EXISTS nb_evenements_par_arret AS
SELECT arret_name, COUNT(*) AS nb_evenements
FROM evenements_arrets
GROUP BY arret_name
ORDER BY nb_evenements DESC;
"""

In [105]:
sql_vue_mat_restos_arrets = """DROP MATERIALIZED VIEW IF EXISTS restaurant_nearest_stops CASCADE;

CREATE MATERIALIZED VIEW IF NOT EXISTS restaurant_nearest_stops AS
WITH ranked_stops AS (
    SELECT 
        r.nom AS restaurant_name,
        r.description AS restaurant_description,
        r.specialites,
        r.periode_ouverte,
        r.google_note AS note,
        r.review_count AS review_count,
        r.geog AS geog_restaurant,
        a.id AS arret_id,
        a.name AS arret_name,
        a.geog AS geog_arret,
        ST_Distance(r.geog, a.geog) AS distance,
        ROW_NUMBER() OVER (
            PARTITION BY r.nom, a.name
            ORDER BY r.geog <-> a.geog
        ) AS rn
    FROM restaurants r
    JOIN arrets_transport a
        ON ST_DWithin(r.geog, a.geog, 400)  -- within 400m
)
SELECT 
    restaurant_name,
    restaurant_description,
    specialites,
    periode_ouverte,
    note,
    review_count,
    geog_restaurant,
    arret_id,
    arret_name,
    geog_arret,
    distance,
    rn AS stop_rank
FROM ranked_stops
WHERE rn <= 3
ORDER BY restaurant_name, stop_rank;
"""

In [106]:
sql_mat_event_bon_resto_arret = """DROP MATERIALIZED VIEW IF EXISTS mini_itineraire_event_good_restaurant_arret CASCADE;

CREATE MATERIALIZED VIEW IF NOT EXISTS mini_itineraire_event_good_restaurant_arret AS
WITH nearest_restaurants AS (
    SELECT 
        e.titre AS evenement,
        e.description AS evenement_description,
        e.adresse AS evenement_adresse,
        e.date_debut,
        e.date_fin,
        r.nom AS restaurant_name,
        r.description AS restaurant_description,
        r.specialites,
        r.periode_ouverte,
        r.google_note AS note,
        r.review_count,
        r.geog AS geog_restaurant,
        ST_Distance(e.geog, r.geog) AS distance_event_restaurant,
        e.geog AS geog_evenement,
        ROW_NUMBER() OVER (PARTITION BY e.titre ORDER BY e.geog <-> r.geog) AS rn_restaurant
    FROM evenements_musicaux e
    JOIN restaurants r
      ON ST_DWithin(e.geog, r.geog, 200)
     AND r.google_note >= 3
     AND r.review_count >= 3
),
event_to_restaurant AS (
    SELECT *
    FROM nearest_restaurants
    WHERE rn_restaurant <= 5
),
nearest_stops AS (
    SELECT 
        er.evenement,
        er.evenement_description,
        er.evenement_adresse,
        er.date_debut,
        er.date_fin,
        er.restaurant_name,
        er.restaurant_description,
        er.specialites,
        er.periode_ouverte,
        er.note,
        er.review_count,
        er.geog_evenement,
        er.geog_restaurant,
        er.distance_event_restaurant,
        a.id AS arret_id,
        a.name AS arret_name,
        a.geog AS geog_arret,
        ST_Distance(er.geog_restaurant, a.geog) AS distance_restaurant_arret,
        ROW_NUMBER() OVER (
            PARTITION BY er.evenement
            ORDER BY er.geog_restaurant <-> a.geog
        ) AS rn_stop
    FROM event_to_restaurant er
    JOIN arrets_transport a ON ST_DWithin(er.geog_restaurant, a.geog, 300)
)
SELECT 
    evenement,
    evenement_description,
    evenement_adresse,
    date_debut,
    date_fin,
    restaurant_name,
    restaurant_description,
    specialites,
    periode_ouverte,
    note,
    review_count,
    arret_name,
    arret_id,
    distance_event_restaurant,
    distance_restaurant_arret,
    geog_evenement,
    geog_restaurant,
    geog_arret
FROM nearest_stops
WHERE rn_stop <= 5
ORDER BY distance_event_restaurant, distance_restaurant_arret;
"""

In [107]:
sql_vue_mat_restos_event = """DROP MATERIALIZED VIEW IF EXISTS restaurant_nearest_event CASCADE;

CREATE MATERIALIZED VIEW IF NOT EXISTS public.restaurant_nearest_event AS
WITH ranked_restaurants AS (
    SELECT 
        e.titre AS evenement,
        e.description AS evenement_description,
        e.adresse AS evenement_adresse,
        e.date_debut,
        e.date_fin,
        r.nom AS restaurant_name,
        r.google_note AS note,
        r.review_count AS review_count,
        r.description AS restaurant_description,
        r.specialites AS specialites,
        r.periode_ouverte AS periode_ouverte,
        r.geog AS geog_restaurant,
        ST_Distance(e.geog, r.geog) AS distance,
        e.geog AS geog_evenement,
        ROW_NUMBER() OVER (PARTITION BY e.titre ORDER BY e.geog <-> r.geog) AS rn
    FROM evenements_musicaux e
    JOIN restaurants r ON ST_DWithin(e.geog, r.geog, 400)
)
SELECT 
    evenement,
    evenement_description,
    evenement_adresse,
    date_debut,
    date_fin,
    restaurant_name,
    note,
    review_count,
    restaurant_description,
    specialites,
    periode_ouverte,
    geog_restaurant,
    distance,
    geog_evenement
FROM ranked_restaurants
WHERE rn <= 5;
"""

In [108]:
sql_vue_mat_nb_restos_arrets = """
DROP MATERIALIZED VIEW IF EXISTS nb_restaurants_par_arret CASCADE;
CREATE MATERIALIZED VIEW IF NOT EXISTS nb_restaurants_par_arret AS
SELECT arret_name, COUNT(*) AS nb_restaurants
FROM restaurant_nearest_stops
GROUP BY arret_name
ORDER BY nb_restaurants DESC;
"""

In [109]:
materialized_views = [
    sql_vue_mat_baignades_arrets,
    sql_vue_mat_events_arrets,
    sql_vue_mat_nb_evenement_arret,
    sql_vue_mat_restos_arrets,
    sql_mat_event_bon_resto_arret,
    sql_vue_mat_restos_event,
    sql_vue_mat_nb_restos_arrets,
    sql_vue_mat_baignade_meteo,
    sql_vue_mat_baignade_meilleurs_conditions,
    sql_vue_mat_baignade_temperature_moy,
    sql_vue_mat_baignade_arrets_temp
]

## Mediator

In [None]:
import psycopg2
from sqlalchemy import create_engine, text

class Mediator:
    def __init__(self):
        self.engine = create_engine(
            f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
        )
        self.meteo_wrapper = OpenWeatherMapWrapper()
        self.restaurant_wrapper = RestaurantDataWrapper()
        self.transport_stop_wrapper = TransportStopDataInjector()
        self.music_event_wrapper = MusicEventDataInjector()
        self.baignade_wrapper = BaignadeDataInjector()
        self.transport_line_wrapper = TransportLineDataWrapper()
        self.transport_line_stops_wrapper = TransportLineStopsDataWrapper()
        # list map wrapper table name
        self.wrapper_list = [
            {
                "wrapper": self.meteo_wrapper,
                "table_name": "meteo_forecast"
            },
            {
                "wrapper": self.restaurant_wrapper,
                "table_name": "restaurants"
            },
            {
                "wrapper": self.transport_stop_wrapper,
                "table_name": "arrets_transport"
            },
            {
                "wrapper": self.music_event_wrapper,
                "table_name": "evenements_musicaux"
            },
            {
                "wrapper": self.baignade_wrapper,
                "table_name": "baignades"
            },
            {
                "wrapper": self.transport_line_wrapper,
                "table_name": "lignes_transport"
            },
            {
                "wrapper": self.transport_line_stops_wrapper,
                "table_name": "arrets_lignes_transport"
            }
        ]
        self.wrappers_to_refresh = [
            self.meteo_wrapper,
            self.restaurant_wrapper,
            self.transport_stop_wrapper,
            self.music_event_wrapper,
            self.baignade_wrapper,
        ]
        meteo_view_definitions = {
            "baignades_meteo" : sql_vue_mat_baignade_meteo,
            "baignades_best_conditions" : sql_vue_mat_baignade_meilleurs_conditions,
            "avg_temp_per_baignade" : sql_vue_mat_baignade_temperature_moy,
            "baignades_with_nearest_stop_and_temp" : sql_vue_mat_baignade_arrets_temp
        }
        events_view_definitions = {
            "evenements_arrets" : sql_vue_mat_events_arrets,
            "nb_evenements_par_arret" : sql_vue_mat_nb_evenement_arret,
            "mini_itineraire_event_good_restaurant_arret" : sql_mat_event_bon_resto_arret,
            "restaurant_nearest_event" : sql_vue_mat_restos_event
        }
        restaurant_view_definitions = {
            "restaurant_nearest_stops" : sql_vue_mat_restos_arrets,
            "nb_restaurants_par_arret" : sql_vue_mat_nb_restos_arrets,
            "restaurant_nearest_event" : sql_vue_mat_restos_event,
            "mini_itineraire_event_good_restaurant_arret" : sql_mat_event_bon_resto_arret
        }
        baigande_view_definitions = {
            "baignades_nearest_stops" : sql_vue_mat_baignades_arrets,
            "baignades_meteo" : sql_vue_mat_baignade_meteo,
            "baignades_best_conditions" : sql_vue_mat_baignade_meilleurs_conditions,
            "baignades_with_nearest_stop_and_temp" : sql_vue_mat_baignade_arrets_temp,
            "avg_temp_per_baignade" : sql_vue_mat_baignade_temperature_moy
        }
        arrets_view_definitions = {
            "baignades_nearest_stops" : sql_vue_mat_baignades_arrets,
            "evenements_arrets" : sql_vue_mat_events_arrets,
            "nb_evenements_par_arret" : sql_vue_mat_nb_evenement_arret,
            "restaurant_nearest_stops" : sql_vue_mat_restos_arrets,
            "nb_restaurants_par_arret" : sql_vue_mat_nb_restos_arrets,
            "mini_itineraire_event_good_restaurant_arret" : sql_mat_event_bon_resto_arret
        }
        
        
        self.wrapper_views_list = [
            {
                "wrapper": self.meteo_wrapper,
                "view_name": meteo_view_definitions
            },
            {
                "wrapper": self.music_event_wrapper,
                "view_name": events_view_definitions
            },
            {
                "wrapper": self.restaurant_wrapper,
                "view_name": restaurant_view_definitions
            },
            {
                "wrapper": self.baignade_wrapper,
                "view_name": baigande_view_definitions
            },
            {
                "wrapper": self.transport_stop_wrapper,
                "view_name": arrets_view_definitions
            }
        ]

    def add_postgis_extension(self):
        """Add PostGIS extension to the database."""
        with self.engine.begin() as connection:
            connection.execute(text("CREATE EXTENSION IF NOT EXISTS postgis;"))
            print("üß± Extension PostGIS activ√©e.")

    def create_tables(self):
        """Create tables (Drop and Recreate) using the wrappers."""
        with self.engine.begin() as conn:
            for wrapper in self.wrapper_list:
                # Drop the table if it exists
                conn.execute(text(f"DROP TABLE IF EXISTS {wrapper['table_name']} CASCADE;"))
                
                df = wrapper["wrapper"].run()
                df.to_sql(wrapper["table_name"], conn, if_exists="replace", index=False, dtype=wrapper["wrapper"].column_types)
                print(f"‚úÖ Table '{wrapper['table_name']}' created or replaced.")
                
    def add_geography_columns(self):
        """Add geography columns to the tables if they do not exist."""
        with self.engine.begin() as connection:
            connection.execute(text("CREATE EXTENSION IF NOT EXISTS postgis;"))
            for table_name in ['restaurants', 'evenements_musicaux', 'baignades', 'arrets_transport']:
                connection.execute(text(f"""
                    ALTER TABLE {table_name}
                    ADD COLUMN IF NOT EXISTS geog geography(Point, 4326);
                """))
                connection.execute(text(f"""
                    UPDATE {table_name}
                    SET geog = ST_SetSRID(ST_MakePoint(longitude, latitude), 4326)
                    WHERE longitude IS NOT NULL AND latitude IS NOT NULL;
                """))
                # print(f"üß≠ Geography column 'geog' added and populated for table '{table_name}'.")
    
    def refresh_data_wrapper(self, wrapper):
        """use to_sql replace table"""
        # get table_name from self.wrapper_list using wrapper["wrapper"]
        table_name = next((w["table_name"] for w in self.wrapper_list if w["wrapper"] == wrapper), None)
        
        df = wrapper.run()
        with self.engine.begin() as conn:
            # Drop the table if it exists
            conn.execute(text(f"DROP TABLE IF EXISTS {table_name} CASCADE;"))
            # Create the table and insert data
            df.to_sql(table_name, conn, if_exists="replace", index=False, dtype=wrapper.column_types)
            print(f"‚úÖ Table '{table_name}' Data refreshed successfully.")
        
        self.add_geography_columns()
        self.refresh_views(wrapper)

        
    
    def drop_views(self, wrapper):
        # get view_name from self.wrapper_views_list using wrapper["wrapper"]
        wrapper_name = next((w for w in self.wrapper_views_list if w["wrapper"] == wrapper), None)
        view = wrapper_name["view_name"] if wrapper_name else None

        with self.engine.begin() as connection:
            for view_name, view_sql in view.items():
                print(f"‚è≥ Dropping view {view_name} (if it exists)...")
                # Drop the view if it exists. CASCADE will remove dependent objects.
                connection.execute(text(f"DROP MATERIALIZED VIEW IF EXISTS {view_name} CASCADE;"))

    def create_views(self, wrapper):
        # get view_name from self.wrapper_views_list using wrapper["wrapper"]
        wrapper_name = next((w for w in self.wrapper_views_list if w["wrapper"] == wrapper), None)
        view = wrapper_name["view_name"] if wrapper_name else None

        with self.engine.begin() as connection:
            for view_name, view_sql in view.items():
                print(f"‚è≥ Creating view {view_name}...")
                # Create the materialized view
                connection.execute(text(view_sql))
                print(f"‚úÖ View {view_name} created successfully.")

    def refresh_views(self, wrapper):
        """Refresh all views by dropping and recreating them."""

        print("üîÑ Refreshing views...")
        self.drop_views(wrapper)
        self.create_views(wrapper)
        print("‚úÖ All views refreshed.")
    
    def refresh_all_materialized_views(self):
        """Refresh all materialized views."""
        with self.engine.begin() as connection:
            for view_sql in materialized_views:
                print(f"‚è≥ Refreshing view...")
                connection.execute(text(view_sql))
                print(f"‚úÖ View refreshed successfully.")

In [111]:
mediator = Mediator()
mediator.add_postgis_extension()
mediator.create_tables()
mediator.add_geography_columns()
mediator.refresh_all_materialized_views()

üß± Extension PostGIS activ√©e.
‚úÖ Table 'meteo_forecast' created or replaced.
üìÑ Lecture du fichier CSV : restaurants_final.csv
‚úÖ Table 'restaurants' created or replaced.
üìÑ Lecture du fichier CSV : cleaned_arrets.csv
üõ†Ô∏è Splitting 'coordinates' into 'latitude' and 'longitude'...
‚úÖ Table 'arrets_transport' created or replaced.
üìÑ Lecture du fichier CSV : evenement_musical_cleaned.csv
Detected columns: ['titre', 'description', 'nom_lieu', 'adresse', 'ville', 'code_postal', 'latitude', 'longitude', 'date_debut', 'date_fin']
‚úÖ Table 'evenements_musicaux' created or replaced.
üìÑ Lecture du fichier CSV : baignades.csv
Detected columns: ['nom_du_site', 'categorie', 'baignade_surveillee', 'adresse', 'code_postal', 'ville', 'numero_de_telephone', 'longitude', 'latitude']
‚úÖ Table 'baignades' created or replaced.
üìÑ Reading CSV file: lignes_transport_with_stop_ids.csv
‚úÖ Table 'lignes_transport' created or replaced.
üìÑ Reading CSV file: lignes_transport_with_stop_ids.

## The Following code needs to be executed within a python cron job with the desired frequency
### This will refresh data of the materialized views

In [None]:
# for wrapper in mediator.wrappers_to_refresh:
#     mediator.refresh_data_wrapper(wrapper["wrapper"])

###  Unlike Baignades, Arrets and Restaurants
###  Meteo and Evenements Data are not generally static => They need daily ( or even hourly for meteo forecast ) refresh

In [128]:
# mediator.refresh_data_wrapper(mediator.meteo_wrapper)

## Exploring Views and Queries

In [117]:
%pip install folium shapely

Note: you may need to restart the kernel to use updated packages.


When the mediator receives a request involving tables 
- restaurants
- evenements_musicaux

it uses the materialized view 
> **restaurant_nearest_event**
> 

Then, it applies the desired constraints on the view like the **exemple** below :
- Distance between restaurants and events
- Restaurants Note and Review Counts

In [118]:
from sqlalchemy import create_engine, types, text
import folium
from shapely import wkt


engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')


# SQL query converting geometry to WKT for restaurant and event.
sql = """
SELECT restaurant_name, 
    note, 
    review_count, 
    ST_AsText(geog_restaurant) as geog_restaurant,
    ST_AsText(geog_evenement) as geog_evenement,
    distance,
    evenement,
    date_debut, date_fin
FROM restaurant_nearest_event
WHERE note >= 4.0
  AND review_count >= 3 
  AND distance <= 100
  AND date_debut <= NOW()
  
ORDER BY evenement
"""

with engine.begin() as connection:
    df = pd.read_sql_query(text(sql), connection)

# Convert the WKT strings into Shapely geometry objects using the correct column names.
df["restaurant_geom"] = df["geog_restaurant"].apply(wkt.loads)
df["event_geom"] = df["geog_evenement"].apply(wkt.loads)

# Create a Folium map centered on Marseille.
map_center = [43.296482, 5.36978]  # [latitude, longitude]
m = folium.Map(location=map_center, zoom_start=12)

# Loop through each record to add markers to the map.
for idx, row in df.iterrows():
    # Extract coordinates from the Shapely geometry objects.
    # Note: Shapely geometry has .x for longitude and .y for latitude.
    r_lon, r_lat = row["restaurant_geom"].x, row["restaurant_geom"].y
    e_lon, e_lat = row["event_geom"].x, row["event_geom"].y

    # Create a popup for the restaurant.
    popup_restaurant = (
        f"<b>{row['restaurant_name']}</b><br>"
        f"Rating: {row['note']} ‚≠ê<br>"
        f"Reviews: {row['review_count']}<br>"
        f"Distance: {row['distance']:.2f} m"
    )
    folium.Marker(
        location=[r_lat, r_lon],
        popup=popup_restaurant,
        icon=folium.Icon(color="blue", icon="cutlery", prefix="fa")
    ).add_to(m)

    # Create a popup for the event.
    popup_event = f"<b>Event:</b> {row['evenement']}"
    folium.Marker(
        location=[e_lat, e_lon],
        popup=popup_event,
        icon=folium.Icon(color="red", icon="music", prefix="fa")
    ).add_to(m)


m

When the mediator receives a request involving tables 
- arrets
- evenements_musicaux

it uses the materialized view 
> **evenements_arrets**
> 

Then, it applies the desired constraints on the view like the **exemple** below :
- Distance between arrets and events inferior or equal to 50 meters >
- Evenement has not started yet

In [119]:
# Create engine
engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# Query from the materialized view
sql = """
SELECT evenement,
       evenement_description,
       evenement_lieu,
       debut_evenement,
       fin_evenement,
       arret_name,
       ST_AsText(geog_evenement) AS geog_evenement,
       ST_AsText(geog_arret) AS geog_arret,
       distance
FROM evenements_arrets
WHERE debut_evenement <= NOW() AND distance <= 50
ORDER BY evenement, distance
"""

# Execute the query
with engine.connect() as connection:
    df = pd.read_sql_query(text(sql), connection)

# Convert geometry columns
df["event_geom"] = df["geog_evenement"].apply(wkt.loads)
df["arret_geom"] = df["geog_arret"].apply(wkt.loads)

# Create a Folium map centered on Marseille
map_center = [43.296482, 5.36978]
m = folium.Map(location=map_center, zoom_start=12)

# Track which events have been mapped to avoid duplicate event markers
plotted_events = set()

# Add markers for events and their nearest stops
for _, row in df.iterrows():
    event_coords = [row["event_geom"].y, row["event_geom"].x]
    stop_coords = [row["arret_geom"].y, row["arret_geom"].x]

    # Add the event marker only once
    if row["evenement"] not in plotted_events:
        popup_event = (
            f"<b>Event:</b> {row['evenement']}<br>"
            f"<b>Lieu:</b> {row['evenement_lieu']}<br>"
            f"<b>D√©but:</b> {row['debut_evenement']}<br>"
            f"<b>Fin:</b> {row['fin_evenement']}"
        )
        folium.Marker(
            location=event_coords,
            popup=popup_event,
            icon=folium.Icon(color="red", icon="music", prefix="fa")
        ).add_to(m)
        plotted_events.add(row["evenement"])

    # Add marker for each nearby stop
    popup_stop = (
        f"<b>Arret:</b> {row['arret_name']}<br>"
        f"<b>Distance:</b> {row['distance']:.1f} m"
    )
    folium.Marker(
        location=stop_coords,
        popup=popup_stop,
        icon=folium.Icon(color="green", icon="bus", prefix="fa")
    ).add_to(m)

# Show the map
m

When the mediator receives a request involving tables 
- arrets
- restaurants

it uses the materialized view 
> **restaurant_nearest_stops**
> 

Then, it applies the desired constraints on the view like the **exemple** below :
- Distance between arrets and restos inferior or equal to 50 meters >
- Good restaurants only

In [120]:
engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# SQL query using the materialized view
sql = """
SELECT restaurant_name,
       restaurant_description,
       specialites,
       periode_ouverte,
       note,
       review_count,
       ST_AsText(geog_restaurant) AS geog_restaurant,
       arret_name,
       ST_AsText(geog_arret) AS geog_arret,
       distance,
       stop_rank
FROM restaurant_nearest_stops
WHERE note >= 4.0 AND review_count >= 3 AND distance <=50
ORDER BY restaurant_name, stop_rank
"""

# Execute the query
with engine.connect() as connection:
    df = pd.read_sql_query(text(sql), connection)

# Convert geometry columns
df["restaurant_geom"] = df["geog_restaurant"].apply(wkt.loads)
df["arret_geom"] = df["geog_arret"].apply(wkt.loads)

# Initialize map centered on Marseille
map_center = [43.296482, 5.36978]
m = folium.Map(location=map_center, zoom_start=13)

# Keep track of plotted restaurants to avoid duplicate markers
plotted_restaurants = set()

# Add markers
for _, row in df.iterrows():
    r_coords = [row["restaurant_geom"].y, row["restaurant_geom"].x]
    a_coords = [row["arret_geom"].y, row["arret_geom"].x]

    # Plot restaurant only once
    if row["restaurant_name"] not in plotted_restaurants:
        popup_restaurant = (
            f"<b>{row['restaurant_name']}</b><br>"
            f"‚≠ê {row['note']} ({row['review_count']} avis)<br>"
            f"<i>{row['specialites']}</i><br>"
            f"<b>Ouverture:</b> {row['periode_ouverte']}<br>"
            f"<br>{row['restaurant_description']}"
        )
        folium.Marker(
            location=r_coords,
            popup=popup_restaurant,
            icon=folium.Icon(color="blue", icon="cutlery", prefix="fa")
        ).add_to(m)
        plotted_restaurants.add(row["restaurant_name"])

    # Plot each nearby stop
    popup_stop = (
        f"<b>Arr√™t:</b> {row['arret_name']}<br>"
        f"<b>Distance:</b> {row['distance']:.1f} m"
    )
    folium.Marker(
        location=a_coords,
        popup=popup_stop,
        icon=folium.Icon(color="green", icon="bus", prefix="fa")
    ).add_to(m)

# Display map
m


When the mediator receives a request involving tables 
- arrets
- restaurants
- evenements_musicaux

it uses the materialized view 
> **mini_itineraire_event_good_restaurant_arret**
> 

Then, it applies the desired constraints on the view like the **exemple** below :
- Distance between arrets and restos, restos and events inferior or equal to 100 meters >
- Events didnt finish yet

In [121]:
engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# Query from the materialized view
sql = """
SELECT 
    evenement,
    evenement_description,
    evenement_adresse,
    date_debut,
    date_fin,
    restaurant_name,
    restaurant_description,
    specialites,
    periode_ouverte,
    note,
    review_count,
    arret_name,
    arret_id,
    distance_event_restaurant,
    distance_restaurant_arret,
    ST_AsText(geog_evenement) AS geog_evenement,
    ST_AsText(geog_restaurant) AS geog_restaurant,
    ST_AsText(geog_arret) AS geog_arret
FROM mini_itineraire_event_good_restaurant_arret
WHERE date_fin >= NOW() AND distance_event_restaurant <=100 AND distance_restaurant_arret <= 100
ORDER BY distance_event_restaurant, distance_restaurant_arret
"""

# Load data
with engine.connect() as connection:
    df = pd.read_sql_query(text(sql), connection)

# Convert geometries
df["geom_event"] = df["geog_evenement"].apply(wkt.loads)
df["geom_restaurant"] = df["geog_restaurant"].apply(wkt.loads)
df["geom_stop"] = df["geog_arret"].apply(wkt.loads)

# Create map
m = folium.Map(location=[43.296482, 5.36978], zoom_start=13)

# Track which event/resto already plotted
plotted_events = set()
plotted_restaurants = set()

# Add markers
for _, row in df.iterrows():
    # Extract coords
    e_lat, e_lon = row["geom_event"].y, row["geom_event"].x
    r_lat, r_lon = row["geom_restaurant"].y, row["geom_restaurant"].x
    s_lat, s_lon = row["geom_stop"].y, row["geom_stop"].x

    # Event marker
    if row["evenement"] not in plotted_events:
        popup_event = (
            f"<b>{row['evenement']}</b><br>"
            f"<i>{row['evenement_description']}</i><br>"
            f"<b>Adresse:</b> {row['evenement_adresse']}<br>"
            f"<b>Du:</b> {row['date_debut']}<br>"
            f"<b>Au:</b> {row['date_fin']}"
        )
        folium.Marker(
            location=[e_lat, e_lon],
            popup=popup_event,
            icon=folium.Icon(color="red", icon="music", prefix="fa")
        ).add_to(m)
        plotted_events.add(row["evenement"])

    # Restaurant marker
    if row["restaurant_name"] not in plotted_restaurants:
        popup_restaurant = (
            f"<b>{row['restaurant_name']}</b><br>"
            f"<i>{row['specialites']}</i><br>"
            f"‚≠ê {row['note']} ({row['review_count']} avis)<br>"
            f"<b>Ouverture:</b> {row['periode_ouverte']}<br>"
            f"{row['restaurant_description']}"
        )
        folium.Marker(
            location=[r_lat, r_lon],
            popup=popup_restaurant,
            icon=folium.Icon(color="blue", icon="cutlery", prefix="fa")
        ).add_to(m)
        plotted_restaurants.add(row["restaurant_name"])

    # Transport stop marker
    popup_stop = (
        f"<b>Arr√™t:</b> {row['arret_name']}<br>"
        f"<b>Distance restaurant ‚Üí arr√™t:</b> {row['distance_restaurant_arret']:.1f} m"
    )
    folium.Marker(
        location=[s_lat, s_lon],
        popup=popup_stop,
        icon=folium.Icon(color="green", icon="bus", prefix="fa")
    ).add_to(m)

# Show the map
m


When the mediator receives a request involving tables 
- baignades
- arrets

it uses the materialized view 
> **baignades_nearest_stops**
> 


In [122]:

engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# Query from the materialized view
sql = """
SELECT 
    nom,
    categorie,
    baignade_surveillee,
    adresse,
    numero_de_telephone,
    arret_name,
    arret_id,
    ST_AsText(geog_baignade) AS geog_baignade,
    ST_AsText(geog_arret) AS geog_arret,
    distance
FROM baignades_nearest_stops
ORDER BY nom
"""

# Load data
with engine.connect() as connection:
    df = pd.read_sql_query(text(sql), connection)

# Convert geometries
df["geom_baignade"] = df["geog_baignade"].apply(wkt.loads)
df["geom_stop"] = df["geog_arret"].apply(wkt.loads)

# Create map
m = folium.Map(location=[43.296482, 5.36978], zoom_start=12)

# Add markers
for _, row in df.iterrows():
    b_lat, b_lon = row["geom_baignade"].y, row["geom_baignade"].x
    s_lat, s_lon = row["geom_stop"].y, row["geom_stop"].x

    # Bathing site marker
    popup_baignade = (
        f"<b>{row['nom']}</b><br>"
        f"<i>{row['categorie']}</i><br>"
        f"<b>Surveill√©e:</b> {row['baignade_surveillee']}<br>"
        f"<b>Adresse:</b> {row['adresse']}<br>"
        f"<b>T√©l:</b> {row['numero_de_telephone']}"
    )
    folium.Marker(
        location=[b_lat, b_lon],
        popup=popup_baignade,
        icon=folium.Icon(color="cadetblue", icon="tint", prefix="fa")
    ).add_to(m)

    # Transport stop marker
    popup_stop = (
        f"<b>Arr√™t:</b> {row['arret_name']}<br>"
        f"<b>Distance:</b> {row['distance']:.1f} m"
    )
    folium.Marker(
        location=[s_lat, s_lon],
        popup=popup_stop,
        icon=folium.Icon(color="green", icon="bus", prefix="fa")
    ).add_to(m)

# Show map
m


In [129]:
engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# Query the most recent forecast per site
sql = """
SELECT DISTINCT ON (nom_du_site)
    nom_du_site,
    categorie,
    baignade_surveillee,
    adresse,
    numero_de_telephone,
    ST_AsText(geog_baignade) AS geog_baignade,
    forecast_time,
    temperature,
    humidite,
    vitesse_vent,
    description_meteo
FROM baignades_meteo
WHERE forecast_time >= NOW()
ORDER BY nom_du_site, forecast_time ASC
"""

with engine.connect() as conn:
    df = pd.read_sql_query(text(sql), conn)

# Convert geometry
df["geom_baignade"] = df["geog_baignade"].apply(wkt.loads)

# Base map
m = folium.Map(location=[43.296482, 5.36978], zoom_start=12)

# Add bathing sites with weather
for _, row in df.iterrows():
    lat, lon = row["geom_baignade"].y, row["geom_baignade"].x

    popup = (
        f"<b>{row['nom_du_site']}</b><br>"
        f"<i>{row['categorie']}</i><br>"
        f"<b>Surveill√©e:</b> {row['baignade_surveillee']}<br>"
        f"<b>Adresse:</b> {row['adresse']}<br>"
        f"<b>T√©l:</b> {row['numero_de_telephone']}<br><br>"
        f"<b>M√©t√©o:</b> {row['description_meteo']}<br>"
        f"üå° {row['temperature']} ¬∞C<br>"
        f"üíß {row['humidite']} %<br>"
        f"üí® {row['vitesse_vent']} km/h<br>"
        f"<small>Pr√©vu √†: {row['forecast_time']}</small>"
    )

    folium.Marker(
        location=[lat, lon],
        popup=popup,
        icon=folium.Icon(color="lightblue", icon="cloud", prefix="fa")
    ).add_to(m)

# Show map
m


In [124]:
engine = create_engine(f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}')

# SQL query
sql = """
SELECT 
    nom,
    arret_name,
    categorie,
    baignade_surveillee,
    adresse,
    numero_de_telephone,
    avg_temp,
    ST_AsText(geog_baignade) AS geog_baignade,
    ST_AsText(geog_arret) AS geog_arret
FROM baignades_with_nearest_stop_and_temp
"""

# Fetch the data
with engine.begin() as connection:
    df = pd.read_sql_query(text(sql), connection)

# Convert WKT to Shapely geometries
df["baignade_geom"] = df["geog_baignade"].apply(wkt.loads)
df["arret_geom"] = df["geog_arret"].apply(wkt.loads)

# Create map centered on Marseille
map_center = [43.296482, 5.36978]
m = folium.Map(location=map_center, zoom_start=12)

# Plot data
for _, row in df.iterrows():
    b_lat, b_lon = row["baignade_geom"].y, row["baignade_geom"].x
    a_lat, a_lon = row["arret_geom"].y, row["arret_geom"].x

    baignade_popup = f"""
    <b>{row['nom']}</b><br>
    Cat√©gorie: {row['categorie']}<br>
    Surveill√©e: {row['baignade_surveillee']}<br>
    Adresse: {row['adresse']}<br>
    Temp√©rature Moyenne: {row['avg_temp']:.1f} ¬∞C
    """
    folium.Marker(
        location=[b_lat, b_lon],
        popup=baignade_popup,
        icon=folium.Icon(color="blue", icon="tint", prefix="fa")
    ).add_to(m)

    arret_popup = f"<b>Arr√™t Proche:</b> {row['arret_name']}"
    folium.Marker(
        location=[a_lat, a_lon],
        popup=arret_popup,
        icon=folium.Icon(color="green", icon="bus", prefix="fa")
    ).add_to(m)

m


Lieux et dates (5 prochains jours) de Baignades sugg√©r√©s selon la temp√©rature

In [None]:
# SQL query to fetch bathing sites with ideal weather
sql = """
SELECT 
    nom_du_site,
    categorie,
    baignade_surveillee,
    adresse,
    numero_de_telephone,
    date,
    temperature,
    humidite,
    vitesse_vent,
    description_meteo,
    ST_AsText(geog_baignade) AS geog_baignade
FROM baignades_best_conditions
WHERE temperature >= 25 AND vitesse_vent <= 20
"""

# Fetch data
with engine.begin() as connection:
    df = pd.read_sql_query(text(sql), connection)

# Convert WKT to Shapely geometries
df["baignade_geom"] = df["geog_baignade"].apply(wkt.loads)

# Create a Folium map centered on Marseille
map_center = [43.296482, 5.36978]
m = folium.Map(location=map_center, zoom_start=12)

# Add markers
for _, row in df.iterrows():
    lat, lon = row["baignade_geom"].y, row["baignade_geom"].x
    popup = f"""
    <b>{row['nom_du_site']}</b><br>
    {row['categorie']} - Surveill√©e: {row['baignade_surveillee']}<br>
    Adresse: {row['adresse']}<br>
    ‚òÄÔ∏è {row['description_meteo']}<br>
    üå° Temp: {row['temperature']}¬∞C<br>
    üí® Vent: {row['vitesse_vent']} km/h<br>
    üíß Humidit√©: {row['humidite']}%<br>
    üìÖ {row['date']}
    """
    folium.Marker(
        location=[lat, lon],
        popup=popup,
        icon=folium.Icon(color="orange", icon="sun", prefix="fa")
    ).add_to(m)

m


Display Plages avec Temp√©rature Moyennes

In [126]:
# SQL query to fetch average temperature data
sql = """
SELECT 
    nom_du_site,
    date,
    avg_temp,
    ST_AsText(geog_baignade) AS geog_baignade
FROM avg_temp_per_baignade
"""

# Fetch data
with engine.begin() as connection:
    df = pd.read_sql_query(text(sql), connection)

# Convert WKT to Shapely geometries
df["baignade_geom"] = df["geog_baignade"].apply(wkt.loads)

# Create a Folium map centered on Marseille
map_center = [43.296482, 5.36978]
m = folium.Map(location=map_center, zoom_start=12)

# Add markers for each bathing site
for _, row in df.iterrows():
    lat, lon = row["baignade_geom"].y, row["baignade_geom"].x
    popup = f"""
    <b>{row['nom_du_site']}</b><br>
    üìÖ {row['date']}<br>
    üå° Avg Temp: {row['avg_temp']}¬∞C
    """
    folium.Marker(
        location=[lat, lon],
        popup=popup,
        icon=folium.Icon(color="blue", icon="thermometer-half", prefix="fa")
    ).add_to(m)

# Display the map
m


# Your Playground
Try our views

In [None]:
sql = """
# Your SQL query here
"""

""" Available views with columns:
- evenements_arrets( evenement, evenement_description, evenement_lieu, debut_evenement, fin_evenement, arret_name, geog_evenement, geog_arret, distance)
- restaurant_nearest_stops(restaurant_name, specialites, periode_ouverte, note, review_count, geog_restaurant, arret_id, arret_name, geog_arret, distance, stop_rank)
- nb_evenements_par_arret( arret_name, nb_evenements)
- nb_restaurants_par_arret( arret_name, nb_restaurants)
- mini_itineraire_event_good_restaurant_arret( evenement, evenement_description, evenement_adresse, date_debut, date_fin, restaurant_name, restaurant_description, specialites, periode_ouverte, note, review_count, arret_name, arret_id, distance_event_restaurant, distance_restaurant_arret, geog_evenement, geog_restaurant, geog_arret)
- restaurant_nearest_event( evenement, evenement_description, evenement_adresse, date_debut, date_fin, restaurant_name, note, review_count, restaurant_description, specialites, periode_ouverte, geog_restaurant, distance, geog_evenement)
- baignades_nearest_stops( nom, arret_name, categorie, baignade_surveillee, adresse, numero_de_telephone, geog_baignade, geog_arret, distance)
- baignades_meteo( nom_du_site, categorie, baignade_surveillee, adresse, numero_de_telephone, forecast_time, temperature, humidite, vitesse_vent, description_meteo, geog_baignade)
- baignades_best_conditions( nom_du_site, categorie, baignade_surveillee, adresse, numero_de_telephone, date, temperature, humidite, vitesse_vent, description_meteo, geog_baignade)
- baignades_with_nearest_stop_and_temp( nom_du_site, categorie, baignade_surveillee, adresse, numero_de_telephone, date, temperature, humidite, vitesse_vent, description_meteo, geog_baignade, arret_name, arret_id, distance)
- avg_temp_per_baignade( nom_du_site, date, avg_temp, geog_baignade)

"""

# Fetch data
with engine.begin() as connection:
    df = pd.read_sql_query(text(sql), connection)


# If you want to visualize the data on a map, you can use Folium

# Convert WKT to Shapely geometries
# df["<geography column>"] = df["<geography column>"].apply(wkt.loads)

# Create a Folium map centered on Marseille

# map_center = [43.296482, 5.36978]
# m = folium.Map(location=map_center, zoom_start=12)

# Add markers

# for _, row in df.iterrows():
#     lat, lon = row["<geography column>"].y, row["<geography column>"].x
    
#     folium.Marker(
#         location=[lat, lon],
#         icon=folium.Icon(color="orange", icon="sun", prefix="fa")
#     ).add_to(m)

# m
