<a href="https://colab.research.google.com/github/AhUhmm/mediterrania/blob/main/Generatore_piani_v2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
# Prima cella: importazione librerie e caricamento dati
import os
from google.colab import files
import json
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Union
from datetime import datetime
import random
from collections import defaultdict, Counter

In [6]:
class RecipeManager:
    def __init__(self, recipe_file: str):
        """Initialize recipe manager with existing recipe database"""
        with open(recipe_file, 'r', encoding='utf-8') as f:
            db = json.load(f)
            self.recipe_db = {
                'metadata': db['metadata'],
                'colazioni': db.get('colazioni', []),
                'pranzi': db.get('pranzi', []),
                'cene': db.get('cene', []),
                'spuntini': db.get('spuntini', [])
            }
        self.used_recipes = defaultdict(int)

    def get_meal_recipes(self, meal_type: str) -> List[Dict]:
        """Get recipes for specific meal type"""
        meal_map = {
            'Colazione': 'colazioni',
            'Pranzo': 'pranzi',
            'Cena': 'cene',
            'Spuntino Mattina': 'spuntini',
            'Spuntino Pomeriggio': 'spuntini'
        }
        return self.recipe_db[meal_map[meal_type]]

    def check_recipe_compatibility(self, recipe: Dict, user_data: Dict) -> bool:
        """Check if recipe is compatible with user preferences and restrictions"""
        # Check diet type compatibility
        diet_type = user_data.get('Preferenza Dietetica', 'nessuno')
        if diet_type == 'vegetariano' and not recipe['proprieta']['vegetariano']:
            return False
        if diet_type == 'vegano' and not recipe['proprieta']['vegano']:
            return False

        # Check allergies
        allergies = user_data.get('Allergie Alimentari', 'Nessuna')
        if allergies != 'Nessuna':
            allergen_list = [allergies] if isinstance(allergies, str) else allergies
            for allergen in allergen_list:
                if any(allergen.lower() in ing['nome'].lower() for ing in recipe['ingredienti']):
                    return False

        # Check excluded vegetables
        excluded = user_data.get('Esclusioni Verdure', 'Nessuna')
        if excluded != 'Nessuna':
            excluded_list = [x.strip() for x in excluded.split(',')]
            for veg in excluded_list:
                if any(veg.lower() in ing['nome'].lower() for ing in recipe['ingredienti']):
                    return False

        return True

    def adjust_portions(self, recipe: Dict, user_data: Dict) -> Dict:
        """Adjust recipe portions based on user objective"""
        adjusted = recipe.copy()

        if user_data.get('Obiettivo') == 'Aumentare massa muscolare':
            # Increase protein content by 30%
            protein_increase = 0.3

            # Adjust nutrients
            nutrients = adjusted['valori_nutrizionali']
            protein_increase_g = nutrients['proteine'] * protein_increase
            nutrients['proteine'] += protein_increase_g
            nutrients['calorie'] += protein_increase_g * 4  # 4 kcal per gram of protein

            # Adjust protein ingredient quantities
            protein_ingredients = ['carne', 'pesce', 'uova', 'legumi', 'formaggio']
            for ing in adjusted['ingredienti']:
                if any(p in ing['nome'].lower() for p in protein_ingredients):
                    ing['quantita'] *= (1 + protein_increase)

        return adjusted

    def get_recipe(self, meal_type: str, user_data: Dict) -> Dict:
        """Get suitable recipe with minimal repetition"""
        available_recipes = [
            recipe for recipe in self.get_meal_recipes(meal_type)
            if self.check_recipe_compatibility(recipe, user_data)
        ]

        if not available_recipes:
            raise ValueError(f"No compatible recipes found for {meal_type}")

        # Get the appropriate ID field based on meal type
        id_field = 'id' if meal_type in ['Colazione', 'Spuntino Mattina', 'Spuntino Pomeriggio'] else 'id_pasto'

        # Sort by usage count and select least used
        recipe = min(available_recipes,
                    key=lambda x: (self.used_recipes.get(x.get(id_field, ''), 0), random.random()))

        # Track usage using the appropriate ID
        recipe_id = recipe.get(id_field)
        if recipe_id:
            self.used_recipes[recipe_id] += 1

        # Adjust portions if needed
        return self.adjust_portions(recipe, user_data)


In [7]:
class MealPlanGenerator:
    def __init__(self, recipe_file: str):
        self.recipe_manager = RecipeManager(recipe_file)
        self.days = ['Lunedì', 'Martedì', 'Mercoledì', 'Giovedì', 'Venerdì', 'Sabato', 'Domenica']
        self.meal_types = ['Colazione', 'Spuntino Mattina', 'Pranzo', 'Spuntino Pomeriggio', 'Cena']

        # Mapping dei nomi dei campi dal JSON alla versione senza parentesi
        self.field_mapping = {
            'Altezza (cm)': 'Altezza',
            'Peso (kg)': 'Peso',
            'BMI': 'BMI'
        }

        # Definizione delle unità di misura per i valori nutrizionali
        self.nutrient_units = {
            'calorie': 'kcal',
            'proteine': 'g',
            'carboidrati': 'g',
            'grassi': 'g',
            'fibre': 'g',
            'sodio': 'mg'
        }

    def normalize_user_data(self, user_data: Dict) -> Dict:
        """Normalizza i nomi dei campi dei dati utente"""
        normalized_data = {}
        for key, value in user_data.items():
            if key in self.field_mapping:
                normalized_data[self.field_mapping[key]] = value
            else:
                normalized_data[key] = value
        return normalized_data

    def calculate_daily_nutrients(self, daily_plan: Dict) -> Dict:
        """Calculate total daily nutritional values with units"""
        daily_totals = {
            'calorie': {'value': 0, 'unit': 'kcal'},
            'proteine': {'value': 0, 'unit': 'g'},
            'carboidrati': {'value': 0, 'unit': 'g'},
            'grassi': {'value': 0, 'unit': 'g'},
            'fibre': {'value': 0, 'unit': 'g'},
            'sodio': {'value': 0, 'unit': 'mg'}
        }

        for meal in daily_plan.values():
            nutrients = meal['recipe']['valori_nutrizionali']
            for key in daily_totals:
                if key in nutrients:
                    daily_totals[key]['value'] += nutrients[key]

        return daily_totals

    def generate_weekly_plan(self, user_data: Dict) -> Dict:
        """Generate personalized weekly meal plan"""
        normalized_user_data = self.normalize_user_data(user_data)
        weekly_plan = {}

        for day in self.days:
            daily_plan = {}

            for meal_type in self.meal_types:
                try:
                    recipe = self.recipe_manager.get_recipe(meal_type, normalized_user_data)
                    # Tutte le ricette usano id_pasto
                    recipe_id = recipe.get('id_pasto')

                    daily_plan[meal_type] = {
                        'recipe': recipe,
                        'recipe_id': recipe_id,
                        'meal_type': meal_type
                    }
                except ValueError as e:
                    print(f"Warning for user {normalized_user_data.get('ID Utente')}, {day}, {meal_type}: {e}")
                    continue

            daily_nutrients = self.calculate_daily_nutrients(daily_plan)
            weekly_plan[day] = {
                'meals': daily_plan,
                'daily_totals': daily_nutrients
            }

        return weekly_plan

    def generate_ml_dataset(self, survey_data: Dict) -> Dict:
        """Generate machine learning ready dataset"""
        dataset = {
            "metadata": {
                "generated_date": datetime.now().isoformat(),
                "total_users": len(survey_data['data']),
                "recipe_metadata": self.recipe_manager.recipe_db['metadata'],
                "nutrient_units": self.nutrient_units
            },
            "meal_plans": {}
        }

        for user in survey_data['data']:
            user_id = f"user_{user['ID Utente']}"
            try:
                weekly_plan = self.generate_weekly_plan(user)

                # Calculate weekly averages
                weekly_nutrients = defaultdict(lambda: {'value': 0.0, 'unit': ''})
                for day_data in weekly_plan.values():
                    for nutrient, data in day_data['daily_totals'].items():
                        weekly_nutrients[nutrient]['value'] += data['value']
                        weekly_nutrients[nutrient]['unit'] = data['unit']

                for nutrient in weekly_nutrients:
                    weekly_nutrients[nutrient]['value'] /= len(self.days)

                dataset['meal_plans'][user_id] = {
                    "user_features": {
                        "demographic": {
                            "gender": user['Sesso'],
                            "age": user['Età']
                        },
                        "biometric": {
                            "height": user['Altezza (cm)'],
                            "weight": user['Peso (kg)'],
                            "bmi": user['BMI']
                        },
                        "preferences": {
                            "objective": user['Obiettivo'],
                            "diet_type": user['Preferenza Dietetica'],
                            "excluded_vegetables": user.get('Esclusioni Verdure', 'Nessuna'),
                            "milk_preferences": user.get('Preferenze Latte Vegetale', 'Nessuna'),
                            "allergies": user.get('Allergie Alimentari', 'Nessuna')
                        },
                        "fruit_consumption": {
                            "portions": user.get('Porzioni Frutta al Giorno', 0),
                            "timing": user.get('Momento Consumo Frutta', [])
                        }
                    },
                    "meal_plan": {
                        "weekly_schedule": {
                            day: {
                                meal_type: {
                                    "recipe_id": meal_data['recipe_id'],
                                    "nutritional_values": {
                                        k: {'value': v, 'unit': self.nutrient_units[k]}
                                        for k, v in meal_data['recipe']['valori_nutrizionali'].items()
                                    }
                                }
                                for meal_type, meal_data in day_data['meals'].items()
                            }
                            for day, day_data in weekly_plan.items()
                        },
                        "nutritional_analysis": {
                            "daily_averages": dict(weekly_nutrients),
                            "meets_targets": {
                                "calories": self._check_calorie_target(
                                    weekly_nutrients['calorie']['value'], user),
                                "protein": self._check_protein_target(
                                    weekly_nutrients['proteine']['value'], user),
                                "carbs": self._check_carb_target(
                                    weekly_nutrients['carboidrati']['value'], user)
                            }
                        }
                    }
                }

            except Exception as e:
                print(f"Error generating plan for user {user_id}: {str(e)}")
                continue

        return dataset

    def _check_calorie_target(self, avg_calories: float, user_data: Dict) -> bool:
        """Check if average calories meet user target"""
        target = self._calculate_target_calories(user_data)
        return {
            'meets_target': 0.9 * target <= avg_calories <= 1.1 * target,
            'target': {'value': target, 'unit': 'kcal'},
            'actual': {'value': avg_calories, 'unit': 'kcal'}
        }

    def _check_protein_target(self, avg_protein: float, user_data: Dict) -> bool:
        """Check if average protein meets user target"""
        weight = float(user_data['Peso (kg)'])
        if user_data['Obiettivo'] == 'Aumentare massa muscolare':
            min_protein = 2.0 * weight  # 2g per kg bodyweight
        else:
            min_protein = 1.2 * weight  # 1.2g per kg bodyweight

        return {
            'meets_target': avg_protein >= min_protein,
            'target': {'value': min_protein, 'unit': 'g'},
            'actual': {'value': avg_protein, 'unit': 'g'}
        }

    def _check_carb_target(self, avg_carbs: float, user_data: Dict) -> bool:
        """Check if average carbs meet user target"""
        weight = float(user_data['Peso (kg)'])
        if user_data['Obiettivo'] == 'Perdere peso':
            max_carbs = 3.0 * weight  # 3g per kg bodyweight
            return {
                'meets_target': avg_carbs <= max_carbs,
                'target': {'value': max_carbs, 'unit': 'g'},
                'actual': {'value': avg_carbs, 'unit': 'g'}
            }
        return {
            'meets_target': True,
            'target': {'value': None, 'unit': 'g'},
            'actual': {'value': avg_carbs, 'unit': 'g'}
        }

    def _calculate_target_calories(self, user_data: Dict) -> float:
        """Calculate target calories based on user data"""
        weight = float(user_data['Peso (kg)'])
        height = float(user_data['Altezza (cm)'])
        age = int(user_data['Età'])

        # Basic BMR calculation using Harris-Benedict formula
        if user_data['Sesso'] == 'M':
            bmr = 88.362 + (13.397 * weight) + \
                  (4.799 * height) - (5.677 * age)
        else:
            bmr = 447.593 + (9.247 * weight) + \
                  (3.098 * height) - (4.330 * age)

        # Activity factor (using moderate as default)
        activity_factor = 1.55

        # Calculate TDEE (Total Daily Energy Expenditure)
        tdee = bmr * activity_factor

        # Adjust based on objective
        if user_data['Obiettivo'] == 'Perdere peso':
            return tdee * 0.85  # 15% deficit
        elif user_data['Obiettivo'] == 'Aumentare massa muscolare':
            return tdee * 1.1   # 10% surplus
        else:
            return tdee         # Maintenance

In [8]:
def validate_json_file(content: str, file_type: str) -> tuple[bool, Union[str, dict]]:
    """
    Valida la struttura del file JSON in base al tipo

    Args:
        content: Contenuto del file JSON
        file_type: Tipo di file ('survey' o 'recipes')

    Returns:
        tuple[bool, Union[str, dict]]: (successo, dati caricati o messaggio di errore)
    """
    try:
        data = json.loads(content)

        if file_type == 'survey':
            # Verifica la struttura del file survey
            if 'metadata' not in data:
                return False, "File survey non valido: manca la chiave 'metadata'"
            if 'data' not in data:
                return False, "File survey non valido: manca la chiave 'data'"

            # Verifica che ci siano i campi necessari nei dati utente
            required_fields = {
                'ID Utente', 'Sesso', 'Età', 'Altezza (cm)', 'Peso (kg)',
                'BMI', 'Obiettivo', 'Preferenza Dietetica'
            }

            for user in data['data']:
                missing_fields = required_fields - set(user.keys())
                if missing_fields:
                    return False, f"Dati utente incompleti: mancano i campi {missing_fields}"

                # Validazione dei valori
                if user['Sesso'] not in ['M', 'F']:
                    return False, f"Valore non valido per il campo 'Sesso' dell'utente {user['ID Utente']}"
                if not (18 <= user['Età'] <= 100):
                    return False, f"Valore non valido per il campo 'Età' dell'utente {user['ID Utente']}"
                if not (140 <= user['Altezza (cm)'] <= 220):
                    return False, f"Valore non valido per il campo 'Altezza' dell'utente {user['ID Utente']}"
                if not (40 <= user['Peso (kg)'] <= 200):
                    return False, f"Valore non valido per il campo 'Peso' dell'utente {user['ID Utente']}"
                if user['Obiettivo'] not in ['Perdere peso', 'Mantenere peso', 'Aumentare massa muscolare']:
                    return False, f"Valore non valido per il campo 'Obiettivo' dell'utente {user['ID Utente']}"

        elif file_type == 'recipes':
            # Verifica la struttura del ricettario
            required_keys = ['metadata', 'colazioni', 'pranzi', 'cene', 'spuntini']
            missing_keys = [key for key in required_keys if key not in data]
            if missing_keys:
                return False, f"File ricette non valido: mancano le chiavi {missing_keys}"

            # Verifica la struttura delle ricette
            for meal_type in ['colazioni', 'pranzi', 'cene', 'spuntini']:
                for recipe in data[meal_type]:
                    # Verifica che ogni ricetta abbia gli attributi necessari
                    required_recipe_fields = {
                        'id_pasto', 'nome', 'ingredienti', 'valori_nutrizionali',
                        'proprieta', 'tempo_preparazione', 'difficolta', 'porzioni'
                    }
                    missing_fields = required_recipe_fields - set(recipe.keys())
                    if missing_fields:
                        return False, f"Ricetta '{recipe.get('nome', 'sconosciuta')}' incompleta: mancano i campi {missing_fields}"

                    # Verifica la struttura dei valori nutrizionali
                    required_nutrients = {'calorie', 'proteine', 'carboidrati', 'grassi'}
                    if not all(nutrient in recipe['valori_nutrizionali'] for nutrient in required_nutrients):
                        return False, f"Valori nutrizionali incompleti per la ricetta '{recipe['nome']}'"

                    # Verifica la presenza di ingredienti
                    if not recipe['ingredienti']:
                        return False, f"Nessun ingrediente specificato per la ricetta '{recipe['nome']}'"

                    # Verifica le proprietà necessarie
                    required_properties = {'vegano', 'vegetariano', 'senza_glutine', 'basso_sodio', 'adatto_diabetici'}
                    if not all(prop in recipe['proprieta'] for prop in required_properties):
                        return False, f"Proprietà mancanti per la ricetta '{recipe['nome']}'"

        else:
            return False, f"Tipo di file non riconosciuto: {file_type}"

        return True, data

    except json.JSONDecodeError as e:
        return False, f"File JSON non valido: {str(e)}"
    except Exception as e:
        return False, f"Errore durante la validazione: {str(e)}"

def load_file(file_type: str, default_filename: str) -> tuple[bool, dict]:
    """
    Verifica l'esistenza del file di default e chiede all'utente se vuole usarlo
    solo se esiste

    Args:
        file_type: Tipo di file ('survey' o 'recipes')
        default_filename: Nome del file di default

    Returns:
        tuple[bool, dict]: (successo, dati caricati)
    """

    # Verifica se il file esiste
    if os.path.exists(default_filename):
        while True:
            use_default = input(f"\nVuoi utilizzare il file {default_filename} in memoria? (s/n): ").lower()

            if use_default not in ['s', 'n']:
                print("Per favore, rispondi 's' per sì o 'n' per no.")
                continue

            if use_default == 's':
                try:
                    with open(default_filename, 'r', encoding='utf-8') as f:
                        content = f.read()
                    valid, result = validate_json_file(content, file_type)
                    if valid:
                        print(f"File {default_filename} caricato con successo")
                        return True, result
                    else:
                        print(f"Errore nel file di default: {result}")
                        print("Procediamo con il caricamento di un nuovo file")
                        break
                except Exception as e:
                    print(f"Errore nella lettura del file: {str(e)}")
                    print("Procediamo con il caricamento di un nuovo file")
                    break
            else:
                break
    else:
        print(f"\nNessun file {default_filename} trovato in memoria")

    # Se arriviamo qui, dobbiamo caricare un nuovo file
    print(f"\nCarica un nuovo file {file_type}...")
    while True:
        uploaded = files.upload()
        if not uploaded:
            print("Nessun file caricato. Riprova.")
            continue

        filename = list(uploaded.keys())[0]
        if not filename.endswith('.json'):
            print("Il file deve essere in formato JSON. Riprova.")
            continue

        content = uploaded[filename].decode('utf-8')
        valid, result = validate_json_file(content, file_type)

        if valid:
            print(f"File {filename} caricato con successo")
            # Salva il nuovo file come default per il prossimo utilizzo
            with open(default_filename, 'w', encoding='utf-8') as f:
                f.write(content)
            return True, result
        else:
            print(f"Errore: {result}")
            continue

def main():
    """Main execution function"""
    #from google.colab import files

    # Caricamento dati survey
    print("CARICAMENTO DATI SURVEY")
    survey_success, survey_data = load_file('survey', 'survey_data.json')
    if not survey_success:
        print("Errore nel caricamento dei dati survey")
        return

    # Caricamento ricettario
    print("\nCARICAMENTO RICETTARIO")
    recipes_success, recipe_data = load_file('recipes', 'ricettario.json')
    if not recipes_success:
        print("Errore nel caricamento del ricettario")
        return

    try:
        print("\nInizializzazione generatore piani alimentari...")
        # Creiamo un file temporaneo con le ricette per il RecipeManager
        temp_recipe_file = 'temp_recipes.json'
        with open(temp_recipe_file, 'w', encoding='utf-8') as f:
            json.dump(recipe_data, f, ensure_ascii=False)

        generator = MealPlanGenerator(temp_recipe_file)

        print("Generazione dataset piani alimentari...")
        dataset = generator.generate_ml_dataset(survey_data)

        # Save output
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_file = f'meal_plans_dataset_{timestamp}.json'
        print(f"\nSalvataggio dataset in {output_file}...")
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(dataset, f, ensure_ascii=False, indent=2)
        print("Dataset salvato con successo")

        # Download the generated file
        print("\nDownload del dataset generato...")
        files.download(output_file)

    except Exception as e:
        print(f"\nErrore durante l'elaborazione: {str(e)}")

    finally:
        # Pulizia file temporaneo
        import os
        if os.path.exists(temp_recipe_file):
            os.remove(temp_recipe_file)

if __name__ == "__main__":
    main()

CARICAMENTO DATI SURVEY

Nessun file survey_data.json trovato in memoria

Carica un nuovo file survey...


Saving survey_responses (4).json to survey_responses (4).json
File survey_responses (4).json caricato con successo

CARICAMENTO RICETTARIO

Nessun file ricettario.json trovato in memoria

Carica un nuovo file recipes...


Saving ricette.json to ricette.json
File ricette.json caricato con successo

Inizializzazione generatore piani alimentari...
Generazione dataset piani alimentari...

Salvataggio dataset in meal_plans_dataset_20241106_094936.json...
Dataset salvato con successo

Download del dataset generato...


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>