In [None]:
import RPi.GPIO as GPIO
import time
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal

fsrPin = 0                     # FSR pin connected to analog input 0
fsrThreshold = 50              # Minimum force threshold to consider as pressure
touchStartTime = 0             # Time when FSR is first touched

count = 0                      # phase accumulator / counter counts up and rolls over to 0
dacVal = 0                     # data to send to DAC

currentTime = int(time.time() * 1000)  # Get the current time in milliseconds

fsrReading = analogRead(fsrPin)

# Convert the analog reading to a force value
force = int(np.interp(fsrReading, [0, 1023], [0, 100]))  # Maps the reading to a force range (0-100)

# Print the time and force value
if serial is not None:
    serial.write(f"Time: {currentTime} ms\t".encode())
    serial.write(f"Force: {force}\n".encode())

# Check if the force exceeds the threshold and start tracking touch duration
if force >= fsrThreshold and touchStartTime == 0:
    touchStartTime = currentTime  # Record the touch start time

# Check if the force drops below the threshold and calculate touch duration
if force < fsrThreshold and touchStartTime > 0:
    touchDuration = currentTime - touchStartTime  # Calculate touch duration
    if serial is not None:
        serial.write(f"Touch Duration: {touchDuration} ms\n".encode())
    touchStartTime = 0  # Reset the touch start time


# Set up GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setup(2, GPIO.OUT)
GPIO.setup(3, GPIO.OUT)
GPIO.setup(4, GPIO.OUT)
GPIO.setup(5, GPIO.OUT)
GPIO.setup(6, GPIO.OUT)
GPIO.setup(7, GPIO.OUT)
GPIO.setup(8, GPIO.OUT)
GPIO.setup(9, GPIO.OUT)

# ANN model definition
scaler = MinMaxScaler()
model = Sequential()
model.add(Dense(20, input_dim=1, activation='relu'))
model.add(Dense(20, activation='relu'))
model.add(Dense(3))

# Load the data for RMS prediction
data1 = {
    'Forces': ['10 N', '15 N', '20 N', '25 N', '35 N', '30 N', '40 N', '45 N', '50 N', '55 N', '60 N', '65 N', '70 N', '75 N'],
    'rms 1': [0.09, 0.22, 0.1, 0.19, 0.09, 0.11, 0.1, 0.08, 0.08, 0.09, 0.08, 0.07, 0.06, 0.05],
    'rms 2': [0.25, 0.29, 0.34, 0.2, 0.2, 0.14, 0.22, 0.12, 0.14, 0.09, 0.09, 0.06, 0.12, 0.1],
    'rms 3': [0.2, 0.14, 0.07, 0.11, 0.22, 0.13, 0.12, 0.11, 0.13, 0.09, 0.07, 0.1, 0.09, 0.05]
}

df1 = pd.DataFrame(data1)
X1 = df1[['Forces']]
y1 = df1[['rms 1', 'rms 2', 'rms 3']]
X1['Forces'] = X1['Forces'].str.replace(' N', '').astype(int)
X_scaled1 = scaler.fit_transform(X1)
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X_scaled1, y1, epochs=200, batch_size=10, verbose=1)

# Load the data for HOFMs prediction
data2 = {
    'Forces': ['10 N', '15 N', '20 N', '25 N', '35 N', '30 N', '40 N', '45 N', '50 N', '55 N', '60 N', '65 N', '70 N', '75 N'],
    'HOFMs 1': [0.07, 0.21, 0.08, 0.17, 0.06, 0.09, 0.07, 0.06, 0.07, 0.07, 0.06, 0.04, 0.04, 0.02],
    'HOFMs 2': [0.25, 0.29, 0.34, 0.2, 0.2, 0.14, 0.22, 0.12, 0.14, 0.09, 0.09, 0.06, 0.12, 0.1],
    'HOFMs 3': [0.2, 0.14, 0.07, 0.11, 0.22, 0.13, 0.12, 0.11, 0.13, 0.09, 0.07, 0.1, 0.09, 0.05]
}

df2 = pd.DataFrame(data2)
X2 = df2[['Forces']]
y2 = df2[['HOFMs 1', 'HOFMs 2', 'HOFMs 3']]
X2['Forces'] = X2['Forces'].str.replace(' N', '').astype(int)
X_scaled2 = scaler.fit_transform(X2)
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X_scaled2, y2, epochs=200, batch_size=10, verbose=1)

num_samples = touchDuration     # Number of samples in the generated EMG signal
num_channels = 1                # Number of channels in the EMG signal
num_clusters = 3                # Number of clusters in the Gaussian Mixture Model

def generate_zero_mean_gmm(num_clusters, num_samples):
    covariances = [np.eye(num_samples) for _ in range(num_clusters)]
    weights = np.ones(num_clusters) / num_clusters
    gmm_samples = np.concatenate([
        multivariate_normal.rvs(mean=np.zeros(num_samples), cov=covariances[i], size=int(num_samples * weights[i]))
        for i in range(num_clusters)
    ])
    return gmm_samples

LUT = []
force_input = force
accurate_rms = np.mean(model.predict(scaler.transform([[force_input]]))[0], axis=0)
accurate_hofms = np.mean(model.predict(scaler.transform([[force_input]]))[0], axis=0)
gmm_samples = generate_zero_mean_gmm(num_clusters, num_samples)
emg_signal = accurate_hofms * accurate_rms + gmm_samples
time = np.arange(num_samples)
LUT = emg_signal * 100 + 400
# Generate the sine wave value based on the count
dacVal = LUT[count % len(LUT)]

def setup():
    GPIO.output(2, GPIO.LOW)
    GPIO.output(3, GPIO.LOW)
    GPIO.output(4, GPIO.LOW)
    GPIO.output(5, GPIO.LOW)
    GPIO.output(6, GPIO.LOW)
    GPIO.output(7, GPIO.LOW)
    GPIO.output(8, GPIO.LOW)
    GPIO.output(9, GPIO.LOW)

def loop():
    global count, dacVal

    dacVal = int(LUT[count % len(LUT)])

    for i in range(2, 10):
        GPIO.output(i, (dacVal >> (i - 2)) & 1)

    print(dacVal)

    count += 1
    time.sleep(0.005)

if __name__ == '__main__':
    setup()
    while True:
        loop()
