In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Layer
import mido
import pretty_midi
import time
import threading
import queue
import os
from collections import deque
import random
import soundfile as sf
import warnings
warnings.filterwarnings('ignore')


class CustomAttention(Layer):    
    def __init__(self, attention_dim=64, **kwargs):
        super(CustomAttention, self).__init__(**kwargs)
        self.attention_dim = attention_dim
        
    def build(self, input_shape):
        self.input_dim = input_shape[-1]
        self.query_dense = tf.keras.layers.Dense(self.attention_dim, activation='tanh')
        self.key_dense = tf.keras.layers.Dense(self.attention_dim, activation='tanh')
        self.value_dense = tf.keras.layers.Dense(self.input_dim, activation='tanh')
        super(CustomAttention, self).build(input_shape)
        
    def call(self, inputs):
        query = self.query_dense(inputs)
        key = self.key_dense(inputs)
        value = self.value_dense(inputs)
        attention_scores = tf.keras.backend.batch_dot(query, key, axes=[2, 2])
        attention_scores = tf.nn.softmax(attention_scores, axis=-1)
        attended = tf.keras.backend.batch_dot(attention_scores, value, axes=[2, 1])
        attended = attended + inputs
        
        return attended
    
    def get_config(self):
        config = super(CustomAttention, self).get_config()
        config.update({"attention_dim": self.attention_dim})
        return config

class SensorToMIDISystem:
    def __init__(self, model_path="best_model.h5", data_path="midiMusic.csv", output_dir="MIDI", sequence_length=20, feature_dim=9):
        self.model_path = model_path
        self.data_path = data_path
        self.output_dir = output_dir
        self.sequence_length = sequence_length
        self.feature_dim = feature_dim
        os.makedirs(self.output_dir, exist_ok=True)
        self.model = None
        self.data_buffer = deque(maxlen=sequence_length)
        self.sensor_data = None
        self.current_index = 0
        self.running = False
        self.current_class = None
        self.previous_class = None
        self.class_stability_counter = 0
        self.min_stability_frames = 3
        self.midi_data = pretty_midi.PrettyMIDI()
        self.current_time = 0.0
        self.tempo = 120  
        self.active_notes = []
        self.load_model()
        self.load_sensor_data()
        self.initialize_music_mapping()
        
    def load_model(self):
        print("Loading pre-trained model...")
        try:
            custom_objects = {'CustomAttention': CustomAttention}
            self.model = load_model(self.model_path, custom_objects=custom_objects)
            print("Model loaded successfully!")
            self.model.summary()
        except Exception as e:
            print(f"Error loading model: {e}")
            raise
    
    def load_sensor_data(self):
        print("Loading sensor data...")
        try:
            self.sensor_data = pd.read_csv(self.data_path)
            print(f"Loaded {len(self.sensor_data)} data points")
            self.feature_cols = ['Temperature', 'Humidity', 'Acc_X', 'Acc_Y', 'Acc_Z','Gyro_X', 'Gyro_Y', 'Gyro_Z', 'LDR']
            self.features = self.sensor_data[self.feature_cols].values
            print(f"Features shape: {self.features.shape}")
            
        except Exception as e:
            print(f"Error loading sensor data: {e}")
            raise
    
    def initialize_music_mapping(self):
        self.weather_classes = ['Sunny', 'Rainy', 'Stormy', 'Windy']
        self.music_mapping = {
            0: { 'name': 'Sunny', 'scale': [60, 62, 64, 65, 67, 69, 71], 'chord_type': 'major', 'base_note': 60, 'instrument': 1, 'tempo': 120, 'velocity': 80, 'rhythm_pattern': [1, 0, 1, 0, 1, 0, 1, 0], 'chord_progression': [60, 65, 67, 60], 'color': 'bright' }, 1: { 'name': 'Rainy', 'scale': [57, 59, 60, 62, 64, 65, 67], 'chord_type': 'minor', 'base_note': 57, 'instrument': 46, 'tempo': 90, 'velocity': 60, 'rhythm_pattern': [1, 0, 0, 1, 0, 1, 0, 0], 'chord_progression': [57, 60, 62, 57], 'color': 'melancholic' }, 2: { 'name': 'Stormy', 'scale': [62, 63, 66, 67, 70, 71, 74], 'chord_type': 'diminished', 'base_note': 38, 'instrument': 39, 'tempo': 140, 'velocity': 110, 'rhythm_pattern': [1, 1, 0, 1, 1, 0, 1, 1], 'chord_progression': [62, 67, 70, 62], 'color': 'dark' }, 3: { 'name': 'Windy', 'scale': [64, 66, 68, 69, 71, 73, 75], 'chord_type': 'major7', 'base_note': 64, 'instrument': 73, 'tempo': 110, 'velocity': 90, 'rhythm_pattern': [1, 0, 1, 1, 0, 1, 0, 1], 'chord_progression': [64, 69, 71, 64], 'color': 'airy' }
        }
    
    def normalize_sequence(self, sequence):
        sequence = np.array(sequence)
        normalized = (sequence - np.mean(sequence, axis=0, keepdims=True)) / (np.std(sequence, axis=0, keepdims=True) + 1e-8)
        return normalized
    
    def predict_weather_class(self, sensor_sequence):
        if len(sensor_sequence) < self.sequence_length:
            return None
        input_data = np.array(sensor_sequence).reshape(1, self.sequence_length, self.feature_dim)
        input_data = self.normalize_sequence(input_data)
        prediction = self.model.predict(input_data, verbose=0)
        predicted_class = np.argmax(prediction[0])
        confidence = np.max(prediction[0])
        
        return predicted_class, confidence, prediction[0]
    
    def smooth_class_transition(self, new_class):
        if self.current_class is None:
            self.current_class = new_class
            self.class_stability_counter = 0
            return new_class
        
        if new_class == self.current_class:
            self.class_stability_counter = min(self.class_stability_counter + 1, 10)
            return self.current_class
        else:
            self.class_stability_counter -= 1
            if self.class_stability_counter <= -self.min_stability_frames:
                self.previous_class = self.current_class
                self.current_class = new_class
                self.class_stability_counter = 0
                return new_class
            else:
                return self.current_class
    
    def generate_chord(self, weather_class, base_note=None):
        mapping = self.music_mapping[weather_class]
        if base_note is None:
            base_note = mapping['base_note']
        
        chord_type = mapping['chord_type']
        chord_notes = []
        if chord_type == 'major':
            chord_notes = [base_note, base_note + 4, base_note + 7]
        elif chord_type == 'minor':
            chord_notes = [base_note, base_note + 3, base_note + 7]
        elif chord_type == 'diminished':
            chord_notes = [base_note, base_note + 3, base_note + 6]
        elif chord_type == 'major7':
            chord_notes = [base_note, base_note + 4, base_note + 7, base_note + 11]
        
        return chord_notes
    
    def generate_melody_note(self, weather_class, step=0):
        mapping = self.music_mapping[weather_class]
        scale = mapping['scale']
        rhythm = mapping['rhythm_pattern']
        if rhythm[step % len(rhythm)] == 0:
            return None
        note_index = (step // 2) % len(scale)
        if random.random() < 0.3:  
            note_index = (note_index + random.choice([-1, 1])) % len(scale)
        
        return scale[note_index]
    
    def add_midi_note(self, note, velocity, start_time, duration, instrument=1):        
        if len(self.midi_data.instruments) == 0 or self.midi_data.instruments[-1].program != instrument:
            new_instrument = pretty_midi.Instrument(program=instrument)
            self.midi_data.instruments.append(new_instrument)
        
        for instr in self.midi_data.instruments:
            if instr.program == instrument:
                midi_note = pretty_midi.Note(
                    velocity=velocity,
                    pitch=note,
                    start=start_time,
                    end=start_time + duration
                )
                instr.notes.append(midi_note)
                break
    
    def process_sensor_data_step(self, step_index):
        if step_index >= len(self.features):
            return False
        current_reading = self.features[step_index]
        self.data_buffer.append(current_reading)
        if len(self.data_buffer) == self.sequence_length:
            result = self.predict_weather_class(list(self.data_buffer))
            if result is not None:
                predicted_class, confidence, probabilities = result
                stable_class = self.smooth_class_transition(predicted_class)
                self.generate_music_step(stable_class, step_index, confidence)
                class_name = self.music_mapping[stable_class]['name']
                print(f"Step {step_index}: {class_name} (confidence: {confidence:.3f})")
                
                return True
        
        return True
    
    def generate_music_step(self, weather_class, step_index, confidence):
        mapping = self.music_mapping[weather_class]
        beat_duration = 60.0 / mapping['tempo']  
        note_duration = beat_duration * 0.8  
        melody_note = self.generate_melody_note(weather_class, step_index)
        
        if melody_note is not None:
            self.add_midi_note(
                note=melody_note,velocity=int(mapping['velocity'] * confidence),start_time=self.current_time,duration=note_duration,instrument=mapping['instrument']
            )
        
        if step_index % 4 == 0:  
            chord_progression = mapping['chord_progression']
            chord_root = chord_progression[(step_index // 4) % len(chord_progression)]
            chord_notes = self.generate_chord(weather_class, chord_root)    
            for chord_note in chord_notes:
                self.add_midi_note(
                    note=chord_note,
                    velocity=int(mapping['velocity'] * 0.6),  
                    start_time=self.current_time,
                    duration=beat_duration * 2,  
                    instrument=1  
                )
        if step_index % 8 == 0:  
            bass_note = mapping['base_note'] - 12  
            self.add_midi_note(
                note=bass_note,
                velocity=int(mapping['velocity'] * 0.8),
                start_time=self.current_time,
                duration=beat_duration * 4,  
                instrument=33  
            )
        self.current_time += beat_duration / 2  
    
    def run_simulation(self, max_steps=None, step_delay=0.1):
        print("Starting real-time sensor-to-MIDI simulation...")
        print(f"Processing {len(self.features)} data points...")
        self.running = True
        processed_steps = 0
        try:
            while self.running and self.current_index < len(self.features):
                if max_steps and processed_steps >= max_steps:
                    break
                success = self.process_sensor_data_step(self.current_index)
                if not success:
                    break
                self.current_index += 1
                processed_steps += 1
                time.sleep(step_delay)
                if processed_steps % 50 == 0:
                    print(f"Processed {processed_steps} steps...")
        
        except KeyboardInterrupt:
            print("Simulation interrupted by user")
        finally:
            self.running = False
            print(f"Simulation completed. Processed {processed_steps} steps.")
    
    def save_midi_file(self, filename=None):
        if filename is None:
            filename = os.path.join(self.output_dir, "sensor_music.mid")
        tempo_changes = [pretty_midi.TempoChange(tempo=120, time=0)]
        self.midi_data.tempo_changes = tempo_changes
        self.midi_data.write(filename)
        print(f"MIDI file saved as: {filename}")
        return filename
    
    def midi_to_audio(self, midi_file, audio_file=None, sample_rate=44100):
        if audio_file is None:
            audio_file = os.path.join(self.output_dir, "sensor_music.wav")
        try:
            midi_data = pretty_midi.PrettyMIDI(midi_file)
            audio = midi_data.synthesize(fs=sample_rate)
            sf.write(audio_file, audio, sample_rate)
            print(f"Audio file saved as: {audio_file}")
            return audio_file
        
        except Exception as e:
            print(f"Error converting MIDI to audio: {e}")
            return None
    
    def generate_summary_report(self):
        report_file = os.path.join(self.output_dir, "generation_report.txt")
        with open(report_file, 'w') as f:
            f.write("Sensor-to-MIDI Music Generation Report\n")
            f.write("=" * 50 + "\n\n")
            f.write(f"Total steps processed: {self.current_index}\n")
            f.write(f"Total music duration: {self.current_time:.2f} seconds\n\n")
            f.write("Weather Class Mapping:\n")
            for class_id, mapping in self.music_mapping.items():
                f.write(f"  {class_id}: {mapping['name']}\n")
                f.write(f"    - Instrument: {mapping['instrument']}\n")
                f.write(f"    - Tempo: {mapping['tempo']} BPM\n")
                f.write(f"    - Base Note: {mapping['base_note']}\n")
                f.write(f"    - Chord Type: {mapping['chord_type']}\n\n")
            
            f.write("Generated Files:\n")
            f.write("  - sensor_music.mid (MIDI file)\n")
            f.write("  - sensor_music.wav (Audio file)\n")
            f.write("  - generation_report.txt (This report)\n")
        
        print(f"Report saved as: {report_file}")

def main():
    print("Initializing Sensor-to-MIDI Music System...")
    system = SensorToMIDISystem(
        model_path="best_model.h5",data_path="midiMusic.csv",output_dir="MIDI"
    )
    print("Starting music generation simulation...")
    system.run_simulation(max_steps=200, step_delay=0.05)
    print("Saving generated music...")
    midi_file = system.save_midi_file()
    audio_file = system.midi_to_audio(midi_file)
    system.generate_summary_report()
    print("\nMusic generation completed!")
    print(f"Files saved in '{system.output_dir}' directory:")
    print("  - sensor_music.mid (MIDI file)")
    print("  - sensor_music.wav (Audio file)")
    print("  - generation_report.txt (Generation report)")

if __name__ == "__main__":
    main()