In [1]:
import pandas as pd
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import os
import requests
import urllib.parse
from dotenv import load_dotenv

load_dotenv()

# config
INPUT_SIZE = 10
HIDDEN_SIZE = 8
CLASSES = 3
CSV_FILENAME = 'mitbih_test.csv'

# google cloud config
GCP_API_KEY = os.getenv("GCP_API_KEY") 
GCP_BUCKET_NAME = "heart-data-repo-1" 
GCP_OBJECT_NAME = "mitbih_test.csv"

def get_google_cloud_url():
    """Constructs the authenticated Google Cloud Storage API URL."""
    encoded_name = urllib.parse.quote(GCP_OBJECT_NAME, safe='')
    url = f"https://storage.googleapis.com/storage/v1/b/{GCP_BUCKET_NAME}/o/{encoded_name}?alt=media&key={GCP_API_KEY}"
    return url

def force_download_data(): 
    # to make sure that if the file is not available locally that we can download it on google cloud
    if os.path.exists(CSV_FILENAME) and os.path.getsize(CSV_FILENAME) > 1000:
        print(f"Data present: {CSV_FILENAME}")
        return

    target_url = get_google_cloud_url()
    print(f"Downloading data from Google Cloud Storage...")
    
    try:
        response = requests.get(target_url)
        if response.status_code == 200:
            with open(CSV_FILENAME, 'wb') as f:
                f.write(response.content)
            print("Download successful!")
        else:
            print(f"Download failed (Status: {response.status_code})")
            print(f"Response: {response.text}")
    except Exception as e:
        print(f"Network error: {e}")

def load_data():
    force_download_data()
    
    if not os.path.exists(CSV_FILENAME):
        print("Error, File not found.")
        return None, None
        
    print("Loading dataset...")
    df = pd.read_csv(CSV_FILENAME, header=None)
    
    # only keep roles that deal with normal (0), s-type (1), and v-type (2) 
    df = df[df[187].isin([0.0, 1.0, 2.0])] 
    
    # Train on FIRST 16,000 samples
    X = df.iloc[:16000, 20:20+INPUT_SIZE].values
    # Slice y to match X length
    y = df.iloc[:16000, 187].values.astype(int)
    
    # one hot encoding, converting column numbers to vectors  
    y = tf.keras.utils.to_categorical(y, num_classes=CLASSES) 
    return X, y

def quantize_and_export_manual(model):
    #  LAYER 1 (Hidden)
    weights1, biases1 = model.layers[0].get_weights()
    
    w1_max = np.max(np.abs(weights1))
    # maps largest weight to maximum 8-bit integer value
    w1_scale = w1_max / 127.0 
    
    # scale up floats to 8 bit integers by dividing by scale and truncating
    w1_int = (weights1 / w1_scale).astype(np.int8) 
    
    #  LAYER 2 (Output) 
    weights2, biases2 = model.layers[1].get_weights() 
    
    w2_max = np.max(np.abs(weights2))
    # different scale than w1 because the max is different
    w2_scale = w2_max / 127.0 
    w2_int = (weights2 / w2_scale).astype(np.int8)
    
    #  GENERATE C HEADER CONTENT
    c_code = "// Auto-generated by train_tinyml.py\n"
    c_code += "#ifndef MODEL_WEIGHTS_H\n#define MODEL_WEIGHTS_H\n\n"
    c_code += "#include <stdint.h>\n\n"

    c_code += f"// Layer 1 (Hidden) - Explicit Quantization\n"
    c_code += f"const float W1_SCALE = {w1_scale:.12f};\n"
    c_code += f"const int8_t W1[{INPUT_SIZE}][{HIDDEN_SIZE}] = {{\n"
    for i in range(INPUT_SIZE):
        row = ", ".join([f"{w:4d}" for w in w1_int[i]])
        c_code += f"    {{{row}}},\n"
    c_code += "};\n"
    c_code += f"const float B1[{HIDDEN_SIZE}] = {{ " + ", ".join([f"{b:.6f}" for b in biases1]) + " };\n\n"

    c_code += f"// Layer 2 (Output) - Explicit Quantization\n"
    c_code += f"const float W2_SCALE = {w2_scale:.12f};\n"
    # must print 8-bit int for pico to run more efficiently, but pico will scale it back up before running inference.
    c_code += f"const int8_t W2[{HIDDEN_SIZE}][{CLASSES}] = {{\n"
    for i in range(HIDDEN_SIZE):
        row = ", ".join([f"{w:4d}" for w in w2_int[i]])
        c_code += f"    {{{row}}},\n"
    c_code += "};\n"
    c_code += f"const float B2[{CLASSES}] = {{ " + ", ".join([f"{b:.6f}" for b in biases2]) + " };\n"
    
    c_code += "\n#endif // MODEL_WEIGHTS_H\n"

    #  SAVE TO FILE (ROBUST PATH FINDING) 
    possible_paths = [
        "../firmware/src/model_weights.h",        # If running from training/ folder
        "firmware/src/model_weights.h",           # If running from root folder
        "src/model_weights.h",                    # If running from inside firmware/
        "/content/model_weights.h"                # Google Colab Default
    ]

    saved = False
    for path in possible_paths:
        dir_name = os.path.dirname(path)
        if os.path.exists(dir_name) or dir_name == "/content":
            try:
                with open(path, "w") as f:
                    f.write(c_code)
                print(f"SUCCESS: Auto-updated header file at: {os.path.abspath(path)}")
                print("You can now Build the firmware immediately!")
                saved = True
                break
            except Exception as e:
                pass 

    if not saved:
        filename = "model_weights.h"
        with open(filename, "w") as f:
            f.write(c_code)
        print(f"WARNING: Could not find firmware folder automatically.")
        print(f"Saved '{filename}' to current directory.")
        print("ACTION: Please move this file into 'firmware/src/' manually.")

def main():
    X, y = load_data() 
    if X is None: return

    print(f"Training on {len(X)} samples...")
    
    model = Sequential([
        Dense(HIDDEN_SIZE, input_dim=INPUT_SIZE, activation='relu'),
        # converting scores to probabilities
        Dense(CLASSES, activation='softmax') 
    ])
    
    # we use adam optimizer becayse idk?
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    # Train
    model.fit(X, y, epochs=20, batch_size=32, validation_split=0.2, verbose=1)
    
    loss, acc = model.evaluate(X, y, verbose=1)
    print(f"Model Accuracy: {acc*100:.2f}%")
    
    # Run the automated export
    quantize_and_export_manual(model)

if __name__ == "__main__":
    main()

ModuleNotFoundError: No module named 'dotenv'