In [8]:
# Import required libraries
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import seaborn as sns

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

class CarbonFootprintGenerator:
    def __init__(self):
        # Define base profiles
        self.base_profiles = [
            {
                "individual_id": "ECO001",
                "name": "Alex Green",
                "body_type": "normal",
                "sex": "female",
                "diet": "vegetarian",
                "occupation": "professional",
                "lifestyle": "active",
                "vehicle_type": "hybrid"
            },
            {
                "individual_id": "ECO002",
                "name": "Sam Chen",
                "body_type": "overweight",
                "sex": "male",
                "diet": "omnivore",
                "occupation": "student",
                "lifestyle": "moderate",
                "vehicle_type": "petrol"
            },
            {
                "individual_id": "ECO003",
                "name": "Maya Patel",
                "body_type": "normal",
                "sex": "female",
                "diet": "vegan",
                "occupation": "professional",
                "lifestyle": "active",
                "vehicle_type": "electric"
            }
        ]
        
        # Define activity parameters
        self.activity_ranges = {
            "distance_km": (5, 50),
            "tv_pc_hours": (1, 8),
            "internet_hours": (1, 10),
            "cooking_hours": (0.5, 3),
            "grocery_spending": (10, 100),
            "waste_bags": (1, 3)
        }
        
        # Build VAE
        self.vae = self._build_vae()
        
    def _build_vae(self):
        # Number of features we'll track
        n_features = 14  # Adjusted to match our actual features
        
        # Encoder
        encoder_inputs = keras.Input(shape=(7, n_features))
        x = layers.LSTM(32, return_sequences=True)(encoder_inputs)
        x = layers.LSTM(16)(x)
        
        # Latent space
        latent_dim = 8
        z_mean = layers.Dense(latent_dim)(x)
        z_log_var = layers.Dense(latent_dim)(x)
        
        # Sampling layer
        def sampling(args):
            z_mean, z_log_var = args
            epsilon = tf.random.normal(shape=tf.shape(z_mean))
            return z_mean + tf.exp(0.5 * z_log_var) * epsilon
        
        z = layers.Lambda(sampling)([z_mean, z_log_var])
        
        # Decoder
        decoder_inputs = keras.Input(shape=(latent_dim,))
        x = layers.Dense(16)(decoder_inputs)
        x = layers.RepeatVector(7)(x)
        x = layers.LSTM(32, return_sequences=True)(x)
        decoder_outputs = layers.TimeDistributed(layers.Dense(n_features))(x)
        
        # Create models
        vae = keras.Model(encoder_inputs, decoder_outputs)
        vae.compile(optimizer='adam', loss='mse')
        
        return vae
    
    def generate_daily_activities(self, profile, date, is_weekend):
        """Generate realistic daily activities for an individual"""
        
        # Base multipliers based on profile
        lifestyle_multiplier = {
            "active": 1.2,
            "moderate": 1.0,
            "sedentary": 0.8
        }.get(profile["lifestyle"], 1.0)
        
        # Weekend multiplier
        weekend_multiplier = 1.3 if is_weekend else 1.0
        
        # Generate activities with realistic variations
        def vary(base, range_tuple):
            min_val, max_val = range_tuple
            base_val = np.random.uniform(min_val, max_val)
            return base_val * lifestyle_multiplier * weekend_multiplier * np.random.normal(1, 0.1)
        
        activities = {
            "date": date.strftime("%Y-%m-%d"),
            "distance_km": vary(20, self.activity_ranges["distance_km"]),
            "tv_pc_hours": vary(4, self.activity_ranges["tv_pc_hours"]),
            "internet_hours": vary(5, self.activity_ranges["internet_hours"]),
            "cooking_hours": vary(1.5, self.activity_ranges["cooking_hours"]),
            "grocery_spending": vary(40, self.activity_ranges["grocery_spending"]),
            "waste_bags": round(vary(1.5, self.activity_ranges["waste_bags"])),
            "meals_vegetarian": 3 if profile["diet"] in ["vegetarian", "vegan"] else np.random.randint(0, 2),
            "meals_meat": 0 if profile["diet"] in ["vegetarian", "vegan"] else np.random.randint(1, 3),
            "meals_processed": np.random.randint(0, 2),
            "indoor_activity_hours": vary(3, (2, 6)),
            "outdoor_activity_hours": vary(2, (1, 4)),
            "public_transport": int(np.random.random() > 0.7),
            "air_travel": int(np.random.random() > 0.95),
        }
        
        # Calculate carbon footprint
        activities["carbon_footprint"] = self._calculate_footprint(activities, profile)
        
        return activities
    
    def _calculate_footprint(self, activities, profile):
        """Calculate carbon footprint based on activities"""
        footprint = 0
        
        # Transport emissions
        vehicle_emissions = {
            "petrol": 0.2,
            "diesel": 0.18,
            "hybrid": 0.12,
            "electric": 0.05
        }
        footprint += activities["distance_km"] * vehicle_emissions.get(profile["vehicle_type"], 0.15)
        
        # Energy usage
        footprint += (activities["tv_pc_hours"] + activities["internet_hours"]) * 0.1
        footprint += activities["cooking_hours"] * 0.5
        
        # Food emissions
        footprint += activities["meals_meat"] * 3.0
        footprint += activities["meals_processed"] * 2.0
        footprint += activities["meals_vegetarian"] * 1.0
        
        # Add some random variation
        footprint *= np.random.normal(1, 0.05)
        
        return round(footprint, 2)
    
    def generate_time_series(self, num_days=7):
        """Generate time series data for all individuals"""
        start_date = datetime.now() - timedelta(days=num_days)
        all_data = []
        
        for profile in self.base_profiles:
            individual_data = []
            
            for day in range(num_days):
                current_date = start_date + timedelta(days=day)
                is_weekend = current_date.weekday() >= 5
                
                daily_data = self.generate_daily_activities(profile, current_date, is_weekend)
                daily_data.update(profile)  # Add profile information
                individual_data.append(daily_data)
            
            all_data.extend(individual_data)
        
        # Convert to DataFrame
        df = pd.DataFrame(all_data)
        
        # Generate additional variations using VAE
        numerical_cols = df.select_dtypes(include=[np.number]).columns
        normalized_data = self._normalize_data(df[numerical_cols])
        
        # Generate variations
        variations = self.vae.predict(normalized_data)
        
        # Combine original and variations
        final_df = self._combine_data(df, variations, numerical_cols)
        
        return final_df
    
    def _normalize_data(self, df):
        """Normalize numerical data for VAE"""
        normalized = df.copy()
        for col in df.columns:
            normalized[col] = (df[col] - df[col].mean()) / (df[col].std() + 1e-10)
        # Ensure the shape matches the expected input shape for the VAE
        return normalized.values.reshape(-1, 7, len(df.columns))
    
    def _combine_data(self, original_df, variations, numerical_cols):
        """Combine original data with VAE variations"""
        variations = variations.reshape(-1, len(numerical_cols))
        variations_df = pd.DataFrame(variations, columns=numerical_cols)
        
        # Copy non-numerical columns
        for col in original_df.columns:
            if col not in numerical_cols:
                variations_df[col] = original_df[col].values
        
        return pd.concat([original_df, variations_df], ignore_index=True)
    
    def plot_time_series(self, df):
        """Plot time series data"""
        plt.figure(figsize=(15, 10))
        
        # Plot carbon footprint over time for each individual
        for individual in df['individual_id'].unique():
            individual_data = df[df['individual_id'] == individual]
            plt.plot(pd.to_datetime(individual_data['date']), 
                    individual_data['carbon_footprint'], 
                    label=f"{individual_data['name'].iloc[0]}")
        
        plt.title('Daily Carbon Footprint by Individual')
        plt.xlabel('Date')
        plt.ylabel('Carbon Footprint (kg CO2)')
        plt.legend()
        plt.xticks(rotation=45)
        plt.grid(True)
        plt.tight_layout()
        
        return plt.gcf()

# Usage example (copy this to a new cell to run)
def generate_and_visualize_data():
    # Initialize generator
    generator = CarbonFootprintGenerator()
    
    # Generate data
    print("Generating synthetic data...")
    df = generator.generate_time_series(num_days=7)
    
    # Save to CSV
    df.to_csv("carbon_footprint_timeseries.csv", index=False)
    print("Data saved to 'carbon_footprint_timeseries.csv'")
    
    # Create visualization
    fig = generator.plot_time_series(df)
    plt.show()
    
    # Print summary statistics
    print("\nSummary Statistics:")
    print(df.describe())
    
    return df

# Run this in a separate cell
df = generate_and_visualize_data()

Generating synthetic data...


KeyError: 'Exception encountered when calling Functional.call().\n\n\x1b[1m1634545486880\x1b[0m\n\nArguments received by Functional.call():\n  • inputs=tf.Tensor(shape=(3, 7, 14), dtype=float32)\n  • training=False\n  • mask=None'