# Lab work: Eco-driving

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor
from scipy.signal import savgol_filter
import math
from geopy.distance import geodesic

In [None]:
def parse_data(file_path):
    gps_data = []
    acc_data = []
    gyro_data = []
    rot_data = []
    
    try:
        df = pd.read_csv(file_path, header=None)
        
        for _, row in df.iterrows():
            values = row.tolist()
            
            if len(values) < 2:
                continue
                
            data_type = int(values[0])
            
            if data_type == 0:  
                if len(values) >= 7:
                    gps_data.append({
                        'timestamp': int(values[1]),
                        'latitude': float(values[2]),
                        'longitude': float(values[3]),
                        'altitude': float(values[4]),
                        'speed': float(values[5]),
                        'satellites': int(values[6])
                    })
            elif data_type == 1: 
                if len(values) >= 6:
                    acc_data.append({
                        'timestamp': int(values[1]),
                        'timestamp_us': int(values[2]),
                        'x': float(values[3]),
                        'y': float(values[4]),
                        'z': float(values[5])
                    })
            elif data_type == 2:  
                if len(values) >= 6:
                    gyro_data.append({
                        'timestamp': int(values[1]),
                        'timestamp_us': int(values[2]),
                        'x': float(values[3]),
                        'y': float(values[4]),
                        'z': float(values[5])
                    })
            elif data_type == 3:  
                if len(values) >= 7:
                    rot_data.append({
                        'timestamp': int(values[1]),
                        'timestamp_us': int(values[2]),
                        'i': float(values[3]),
                        'j': float(values[4]),
                        'k': float(values[5]),
                        'real': float(values[6])
                    })
                elif len(values) >= 6:
                    rot_data.append({
                        'timestamp': int(values[1]),
                        'timestamp_us': int(values[2]),
                        'i': float(values[3]),
                        'j': float(values[4]),
                        'k': float(values[5]),
                        'real': 0.0
                    })
        
        return {
            'gps': pd.DataFrame(gps_data) if gps_data else pd.DataFrame(),
            'acc': pd.DataFrame(acc_data) if acc_data else pd.DataFrame(),
            'gyro': pd.DataFrame(gyro_data) if gyro_data else pd.DataFrame(),
            'rot': pd.DataFrame(rot_data) if rot_data else pd.DataFrame()
        }
    except Exception as e:
        print(f"Error processing CSV file: {e}")
        return {
            'gps': pd.DataFrame(),
            'acc': pd.DataFrame(),
            'gyro': pd.DataFrame(),
            'rot': pd.DataFrame()
        }

In [None]:
def extract_features(data_dict):
    features = {}
    
    if not data_dict['gps'].empty:
        gps_df = data_dict['gps']
        
        distances = []
        gradients = []
        
        for i in range(1, len(gps_df)):
            point1 = (gps_df['latitude'].iloc[i-1], gps_df['longitude'].iloc[i-1])
            point2 = (gps_df['latitude'].iloc[i], gps_df['longitude'].iloc[i])
            distance = geodesic(point1, point2).meters
            distances.append(distance)
            
            if distance > 0:
                elevation_change = gps_df['altitude'].iloc[i] - gps_df['altitude'].iloc[i-1]
                gradient = elevation_change / distance
            else:
                gradient = 0
            gradients.append(gradient)
        
        gps_df.loc[1:, 'distance'] = distances
        gps_df.loc[1:, 'gradient'] = gradients
        gps_df.loc[0, 'distance'] = 0
        gps_df.loc[0, 'gradient'] = 0
        
        gps_df['speed_change'] = gps_df['speed'].diff().fillna(0)
        gps_df['acceleration'] = gps_df['speed_change'] / (gps_df['timestamp'].diff().fillna(1) / 1000)
        
        features['route'] = gps_df
    
    if not data_dict['acc'].empty:
        acc_df = data_dict['acc']
        
        acc_df['magnitude'] = np.sqrt(acc_df['x']**2 + acc_df['y']**2 + acc_df['z']**2)
        
        acc_df['jerk'] = acc_df['magnitude'].diff().fillna(0) / (acc_df['timestamp'].diff().fillna(1) / 1000)
        
        acc_df['jerk_smooth'] = savgol_filter(acc_df['jerk'], 
                                              min(21, len(acc_df) - (len(acc_df) % 2) - 1), 
                                              3)
        
        features['behavior'] = acc_df
    
    if not data_dict['gyro'].empty:
        gyro_df = data_dict['gyro']
        
        gyro_df['turning_rate'] = np.abs(gyro_df['z'])
        
        features['turning'] = gyro_df
    
    return features

In [None]:
def model_energy_consumption(features):
    
    if 'route' in features and 'behavior' in features:
        route_times = features['route']['timestamp'].values
        behavior_times = features['behavior']['timestamp'].values
        
        combined_data = []
        
        for i, row in features['route'].iterrows():
            gps_time = row['timestamp']
            closest_acc_idx = np.argmin(np.abs(behavior_times - gps_time))
            
            entry = {
                'timestamp': gps_time,
                'speed': row['speed'],
                'acceleration': row['acceleration'],
                'gradient': row['gradient'],
                'acc_x': features['behavior']['x'].iloc[closest_acc_idx],
                'acc_y': features['behavior']['y'].iloc[closest_acc_idx],
                'acc_z': features['behavior']['z'].iloc[closest_acc_idx],
                'jerk': features['behavior']['jerk_smooth'].iloc[closest_acc_idx]
            }
            
            if 'turning' in features:
                turning_times = features['turning']['timestamp'].values
                closest_turn_idx = np.argmin(np.abs(turning_times - gps_time))
                entry['turning_rate'] = features['turning']['turning_rate'].iloc[closest_turn_idx]
            else:
                entry['turning_rate'] = 0
                
            combined_data.append(entry)
        
        combined_df = pd.DataFrame(combined_data)
        
        combined_df['energy_factor'] = calculate_energy_factors(combined_df)
        
        return combined_df
    
    return pd.DataFrame()

def calculate_energy_factors(df):
    mass = 1500
    g = 9.81
    rho = 1.225
    Cd = 0.3
    A = 2.2
    Cr = 0.013
    
    energy_factors = np.zeros(len(df))
    
    for i in range(len(df)):
        speed = df['speed'].iloc[i] / 3.6
        
        gradient_angle = np.arctan(df['gradient'].iloc[i])
        rolling_resistance = Cr * mass * g * np.cos(gradient_angle)
        
        aero_drag = 0.5 * rho * Cd * A * speed**2
        
        gradient_resistance = mass * g * np.sin(gradient_angle)
        
        acceleration_resistance = mass * df['acceleration'].iloc[i]
        
        total_resistance = rolling_resistance + aero_drag + gradient_resistance + acceleration_resistance
        
        energy_factors[i] = total_resistance * speed
    
    energy_factors = energy_factors - np.min(energy_factors)
    
    return energy_factors

In [None]:
def cluster_driving_behaviors(combined_data, n_clusters=3):
    
    features = ['speed', 'acceleration', 'jerk', 'turning_rate']
    X = combined_data[features]
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X_scaled)
    
    kmeans = KMeans(n_clusters=n_clusters, random_state=42)
    clusters = kmeans.fit_predict(X_scaled)
    
    combined_data['driver_cluster'] = clusters
    
    cluster_profiles = {}
    for cluster in range(n_clusters):
        cluster_data = combined_data[combined_data['driver_cluster'] == cluster]
        
        profile = {
            'avg_speed': cluster_data['speed'].mean(),
            'avg_acceleration': cluster_data['acceleration'].mean(),
            'avg_jerk': cluster_data['jerk'].mean(),
            'avg_turning_rate': cluster_data['turning_rate'].mean(),
            'energy_efficiency': cluster_data['energy_factor'].mean()
        }
        
        cluster_profiles[cluster] = profile
    
    return combined_data, cluster_profiles, X_pca, clusters

In [None]:
def develop_control_strategy(combined_data, cluster_profiles):
    
    features = ['speed', 'acceleration', 'gradient', 'turning_rate', 'driver_cluster']
    X = pd.get_dummies(combined_data[features], columns=['driver_cluster'])
    y = combined_data['energy_factor']
    
    model = RandomForestRegressor(n_estimators=100, random_state=42)
    model.fit(X, y)
    
    feature_importance = dict(zip(X.columns, model.feature_importances_))
    
    control_strategies = {}
    
    for cluster, profile in cluster_profiles.items():
        if profile['avg_jerk'] > 0.5:
            strategy = {
                'acceleration_smoothing': True,
                'smooth_factor': min(1.0, profile['avg_jerk'] / 2),
                'anticipation_distance': 100 + (profile['avg_speed'] * 5),
                'eco_mode_bias': 0.7
            }
        elif profile['avg_acceleration'] > 1.0:
            strategy = {
                'acceleration_smoothing': True,
                'smooth_factor': 0.4,
                'anticipation_distance': 80 + (profile['avg_speed'] * 3),
                'eco_mode_bias': 0.8
            }
        elif profile['avg_turning_rate'] > 0.3:
            strategy = {
                'acceleration_smoothing': False,
                'smooth_factor': 0.2,
                'anticipation_distance': 120 + (profile['avg_speed'] * 4),
                'eco_mode_bias': 0.5
            }
        else:
            strategy = {
                'acceleration_smoothing': False,
                'smooth_factor': 0.1,
                'anticipation_distance': 60 + (profile['avg_speed'] * 2),
                'eco_mode_bias': 0.3
            }
        
        control_strategies[cluster] = strategy
    
    return model, feature_importance, control_strategies

In [None]:
# Main function
def run_eco_driving_analysis(data_file):
    data_dict = parse_data(data_file)
    
    features = extract_features(data_dict)
    
    combined_data = model_energy_consumption(features)
    
    if combined_data.empty:
        return "Insufficient data for analysis"
    
    n_clusters = 3
    clustered_data, cluster_profiles, X_pca, clusters = cluster_driving_behaviors(combined_data, n_clusters)
    
    model, feature_importance, control_strategies = develop_control_strategy(clustered_data, cluster_profiles)
    
    results = {
        'data': clustered_data,
        'cluster_profiles': cluster_profiles,
        'pca_data': X_pca,
        'clusters': clusters,
        'energy_model': model,
        'feature_importance': feature_importance,
        'control_strategies': control_strategies
    }
    
    return results

In [None]:
# Visualization function
def visualize_results(results):
    
    plt.figure(figsize=(12, 10))
    
    plt.subplot(2, 2, 1)
    scatter = plt.scatter(results['pca_data'][:, 0], results['pca_data'][:, 1], 
                         c=results['clusters'], cmap='viridis', alpha=0.8)
    plt.title('Driver Behavior Clusters')
    plt.xlabel('PCA Component 1')
    plt.ylabel('PCA Component 2')
    plt.colorbar(scatter, label='Cluster')
    
    plt.subplot(2, 2, 2)
    cluster_energy = results['data'].groupby('driver_cluster')['energy_factor'].mean()
    cluster_energy.plot(kind='bar', color='teal')
    plt.title('Average Energy Consumption by Driver Cluster')
    plt.xlabel('Driver Cluster')
    plt.ylabel('Energy Factor')
    
    plt.subplot(2, 2, 3)
    importance_df = pd.DataFrame({
        'Feature': list(results['feature_importance'].keys()),
        'Importance': list(results['feature_importance'].values())
    }).sort_values('Importance', ascending=False)
    
    importance_df.plot(x='Feature', y='Importance', kind='bar', color='coral')
    plt.title('Feature Importance for Energy Consumption')
    plt.xticks(rotation=45, ha='right')
    plt.tight_layout()
    
    plt.subplot(2, 2, 4)
    plt.scatter(results['data']['timestamp'], results['data']['energy_factor'], 
               c=results['data']['driver_cluster'], cmap='viridis', alpha=0.7)
    plt.title('Energy Consumption Over Time')
    plt.xlabel('Time')
    plt.ylabel('Energy Factor')
    plt.colorbar(label='Driver Cluster')
    
    plt.tight_layout()
    plt.savefig('eco_driving_results.png')
    
    plt.figure(figsize=(10, 6))
    strategies = results['control_strategies']
    clusters = list(strategies.keys())
    
    metrics = ['smooth_factor', 'eco_mode_bias']
    for i, metric in enumerate(metrics):
        values = [strategies[c][metric] for c in clusters]
        plt.subplot(1, 2, i+1)
        plt.bar(clusters, values, color='darkgreen')
        plt.title(f'Control Strategy: {metric}')
        plt.xlabel('Driver Cluster')
        plt.ylabel('Value')
    
    plt.tight_layout()
    plt.savefig('control_strategies.png')

In [None]:
# Simulation method
class AdaptiveEcoDrivingController:
    def __init__(self, energy_model, control_strategies):
        self.energy_model = energy_model
        self.control_strategies = control_strategies
        self.current_cluster = None
        self.driver_history = []
        self.strategy = None
    
    def update_driver_profile(self, recent_driving_data):
        features = ['speed', 'acceleration', 'jerk', 'turning_rate']
        X = recent_driving_data[features]
        
        scaler = StandardScaler()
        X_scaled = scaler.fit_transform(X)
        
        kmeans = KMeans(n_clusters=len(self.control_strategies), random_state=42)
        kmeans.fit(X_scaled)
        
        clusters = kmeans.predict(X_scaled)
        self.current_cluster = np.bincount(clusters).argmax()
        
        self.strategy = self.control_strategies[self.current_cluster]
        
        return self.current_cluster
    def recommend_actions(self, current_state):
        if self.strategy is None:
            return None
        
        state = {
            'speed': current_state.get('speed', 0),
            'gradient': current_state.get('gradient', 0),
            'turning_rate': current_state.get('turning_rate', 0),
            'acceleration': current_state.get('acceleration', 0)
        }
        
        actions = {}
        
        target_acceleration = state['acceleration']
        if self.strategy['acceleration_smoothing']:
            if target_acceleration > 0:
                target_acceleration *= (1 - self.strategy['smooth_factor'])
        
        actions['target_acceleration'] = target_acceleration
        
        if state['gradient'] > 0.03:
            actions['speed_adjustment'] = -max(0, state['speed'] * 0.1 * self.strategy['eco_mode_bias'])
        elif state['gradient'] < -0.03:
            actions['speed_adjustment'] = min(5, state['speed'] * 0.05 * (1 - self.strategy['eco_mode_bias']))
        else:
            actions['speed_adjustment'] = 0
        
        if state['turning_rate'] > 0.2:
            actions['speed_adjustment'] -= state['turning_rate'] * 5 * self.strategy['smooth_factor']
        
        base_energy = self.predict_energy_consumption(state)
        adjusted_state = state.copy()
        adjusted_state['acceleration'] = target_acceleration
        adjusted_state['speed'] += actions['speed_adjustment']
        adjusted_energy = self.predict_energy_consumption(adjusted_state)
        
        actions['expected_energy_saving'] = max(0, base_energy - adjusted_energy)
        
        return actions
    
    def predict_energy_consumption(self, state):
        X = pd.DataFrame([state])
        
        for i in range(len(self.control_strategies)):
            X[f'driver_cluster_{i}'] = 1 if i == self.current_cluster else 0
        
        energy = self.energy_model.predict(X)[0]
        
        return energy

if __name__ == "__main__":
    data_file = "sensor_data.txt"
    
    results = run_eco_driving_analysis(data_file)
    
    visualize_results(results)
    
    controller = AdaptiveEcoDrivingController(
        results['energy_model'],
        results['control_strategies']
    )
    
    sample_state = {
        'speed': 60,
        'gradient': 0.02,
        'turning_rate': 0.1,
        'acceleration': 1.2
    }
    
    sample_recent_data = results['data'].tail(100)
    driver_cluster = controller.update_driver_profile(sample_recent_data)
    
    actions = controller.recommend_actions(sample_state)
    
    print(f"Driver Cluster: {driver_cluster}")
    print(f"Recommended Actions: {actions}")
    
    results['data'].to_csv('eco_driving_data.csv', index=False)
    
    with open('cluster_profiles.txt', 'w') as f:
        for cluster, profile in results['cluster_profiles'].items():
            f.write(f"Cluster {cluster}:\n")
            for key, value in profile.items():
                f.write(f"  {key}: {value:.4f}\n")
            f.write("\n")
    
    with open('control_strategies.txt', 'w') as f:
        for cluster, strategy in results['control_strategies'].items():
            f.write(f"Cluster {cluster}:\n")
            for key, value in strategy.items():
                f.write(f"  {key}: {value}\n")
            f.write("\n")
