In [1]:
#截圖、攝影機拍照，存成 Json

from PIL import ImageGrab, Image
import os
import random
from base64 import b64decode, b64encode
import pandas as pd
import numpy as np

# 開資料夾
if "temp" not in os.listdir():
    os.mkdir("temp")
    
if "data" not in os.listdir():
    os.mkdir("./data")

# 攝影機
# pip install opencv-python
from cv2 import VideoCapture, cvtColor, COLOR_RGB2BGR

def getWebCamImg(): 
    global cap
    ret, frame = cap.read()
    frame = cvtColor(np.array(frame), COLOR_RGB2BGR)
    return Image.fromarray(frame).resize((1280, 720), Image.ANTIALIAS)

def getScreenShot():
    screenShot = ImageGrab.grab()
    screenShot = screenShot.resize((1280, 720), Image.ANTIALIAS)
    return screenShot

def getImgB64encode(img):
    randomVal = random.getrandbits(32)
    filename = "./temp/%d.png"%randomVal
    img.save(filename)
    img_encode = b64encode(open(filename, 'rb').read())
    os.remove(filename)
    return img_encode.decode()

['0.json', '1.json', '10.json', '11.json', '12.json', '13.json', '14.json', '2.json', '3.json', '4.json', '5.json', '6.json', '7.json', '8.json', '9.json']


In [2]:
#彙整起來
def grabImgByFrame(timeFrame, recordID="test"):
    if recordID not in os.listdir("./data/"):
        os.mkdir("data/"+ str(recordID))
    
    screenShot = getScreenShot()
    WebCam = getWebCamImg()

    outJson = {
        "webCam": getImgB64encode(WebCam),
        "screenShot": getImgB64encode(screenShot), 
    }

    import json
    filename = "data/"+ recordID+"/"+ str(timeFrame) + ".json"

    with open(filename, 'w') as f:
        json.dump(outJson,f)
    
    return filename

In [3]:
# screenshot task

import time
import threading

is_screenshot = False

def screenshot(is_screenshot):
  timeframe = 0
  global cap
  cap = VideoCapture(0)
  while True:
    grabImgByFrame(timeframe, user_id)
    time.sleep(0.7)
    timeframe += 1
    if not is_screenshot():
      break

def start_screenshot():
    is_screenshot = True
    screenshot_task = threading.Thread(target=screenshot, args=(lambda: is_screenshot, ))
    screenshot_task.start()

def stop_screenshot():
    try:
      global cap
      del cap
      is_screenshot = False
    except:
      pass

In [4]:
# 序列數據處理

from modwt import modwt, imodwt
from scipy import stats
from nolitsa import surrogates
from sklearn import preprocessing
import numpy as np

def signal_filter(signal):
    signal = np.array(signal)
    np.random.seed(20)
    signal_length = signal.shape[0]
    signal = preprocessing.scale(signal)
    (surrogate_signal, _, _) = surrogates.iaaft(signal)
    w = modwt(signal, 'sym5', 5)
    surrogate_w = modwt(surrogate_signal, 'sym5', 5)
    for j in range(w.shape[0]):
        t_score = abs((w[j] - surrogate_w[j].mean()) / surrogate_w[j].std())
        p = 1 - stats.t.cdf(t_score, 2 * signal_length - 2)
        threshold = w[j].std() * np.sqrt(2 * np.log(signal_length))
        for index in range(w[j].shape[0]):
            w[j][p[index] * 2 >= threshold] = surrogate_w[(j, index)]
        
    new_signal = imodwt(w, 'sym5')
    return new_signal

def get_theta_and_low_alpha_band_data(data):
    data = data.reshape((48, 16))
    fft_data = np.abs(np.fft.fft(data)).astype(int)
    
    band_data = np.zeros((fft_data.shape[0], 2))
    for i in range(fft_data.shape[0]):
        band_data[i, 0] = fft_data[i, 4:7 + 1].sum()
        band_data[i, 1] = fft_data[i, 8:9 + 1].sum()

    return band_data

def change_to_sequence_data(data, offset_index=16):
    sequence_data = []
    for start_index in range(0, data.shape[0] - data.shape[0] % offset_index, offset_index):
        sequence_data.append(np.array(data[start_index:start_index + offset_index]))
    
    return np.array(sequence_data)

In [5]:
# model
from tensorflow.keras.models import model_from_json

def load_model(model_name, model_weights_filename):
    with open("model/{}.json".format(model_name), "r") as json_file:
        model = model_from_json(json_file.read())

    model.load_weights("model/{}.h5".format(model_weights_filename))
    model.compile(loss="categorical_crossentropy",
                optimizer="adam",
                metrics=["categorical_accuracy"])
              
    return model

def predict(data):
    data = np.array(data)

    data = data.reshape((768))
    data = signal_filter(data)
    data = data.reshape((3, 256))
    data = get_theta_and_low_alpha_band_data(data)
    data = change_to_sequence_data(data)
    print(data.shape)
    predictions[user_id] += model.predict(data).reshape((3)).tolist()

In [6]:
# 藍芽連線到 ESP32 task

import serial
import time

flag_inputdata = 0

def SerialWrite(command):
    ser.write(bytes(command,"utf-8"))
    ser.flushInput()

def reconnect():
     ser.close()
     error = 1
     while error == 1:
        try:
            ser.open()
            error = 0
        except:
            error = 1

def get_esp32_data():
    oo = 0
    while 1:
        oo = oo + 1
        time.sleep(0.5)

        if ser.in_waiting:  #收到('\n')指令旗標
            data = ser.readline().decode()[:-2].split(",")
            predict(data)
            print(predictions)
            oo = 0
        if oo > 3:
            reconnect()
            oo = 0

def start_record_esp32():
    SerialWrite("s")
    flag_inputdata = 1

    global is_get_esp32_data
    
    is_get_esp32_data = True
    get_esp32_data_task = threading.Thread(target=get_esp32_data)
    get_esp32_data_task.start()

def stop_record_esp32():
    SerialWrite("e")
    flag_inputdata = 0

    global is_get_esp32_data

    is_get_esp32_data = False

In [7]:
import matplotlib.pyplot as plt
from io import BytesIO
import base64
import numpy as np
from PIL import Image
import cv2

def get_base64_image():
    plt.figure(figsize=(3, 2), dpi=400)
    plt.plot([time for time in range(len(predictions[user_id]))], predictions[user_id])
    plt.savefig("./{}/focused_score.png".format(user_id))

    image = np.array(Image.open("./data/{}/focused_score.png".format(user_id)).resize((960, 540)))
    image = Image.fromarray(image)
    buff = BytesIO()
    image.save(buff, format="png")
    base64_image = base64.b64encode(buff.getvalue()).decode("utf-8")

    return base64_image

# Flask API 的部分

In [8]:
import numpy as np
import pandas as pd
import json
import webbrowser
import hashlib
import datetime

from flask import Flask, request, abort
app = Flask(__name__)

from flask_cors import CORS
CORS(app)

# 連線 ESP32
# ser = serial.Serial("COM6", 9600, timeout=2)

# 開啟 model
model = load_model("model", "model")

# 開啟網頁
webbrowser.open("index.html")

md5 = hashlib.md5()

user_id = ""
is_screenshot = False
predictions = {}

@app.route("/startRecord")
def start_record(local_user_id=""):
    global user_id

    if local_user_id != "":
        user_id = local_user_id
        predictions[user_id] = []
        start_screenshot()
        start_record_esp32()

        return "", 204

    else:
        md5.update(str(datetime.datetime.now()).encode())
        user_id = str(md5.hexdigest())
        predictions[user_id] = []
        start_screenshot()
        start_record_esp32()

        return json.dumps({"user_id": user_id}), 200

@app.route("/stopRecord")
def stop_record():
    stop_record_esp32()
    stop_screenshot()
    md5.update(str(datetime.datetime.now()).encode())
    hash_value = md5.hexdigest()
    with open("./data/{}/{}_{}.csv".format(user_id, user_id, hash_value), 'w') as f:
        for user_focused_score in predictions[user_id]:
            f.writelines(str(user_focused_score) + "\n")

    return "", 204

@app.route("/getPrediction")
def get_prediction():
    base64_image = get_base64_image()
    return json.dumps({"base64Image": base64_image}), 200

@app.route("/getLastPredictionValue")
def get_last_prediction_value():
    return json.dumps({"last_predictionvalue": predictions[user_id][-1]}), 200

@app.route("/getImg")
def getImg():
    return json.dumps({"status": "Good", "filename":filename})

@app.route("/getImageJsonPath")
def get_image_json_path():
    user_file_path = os.listdir("data/{}/".format("e69158f93111fe0b12001e00d79a5d54"))

    image_json_files_path = []
    for index in range(len(user_file_path)):
        if ".json" in user_file_path[index]:
            image_json_files_path.append(user_file_path[index])
    
    return json.dumps({"imageJsonPaths": image_json_files_path})

# uvicorn filename:app --port 8001 --workers 1 --proxy-headers
if __name__ == "__main__":
    app.run(host = '0.0.0.0', port=13523)

SerialException: could not open port 'COM6': OSError(22, '信號等待逾時。', None, 121)

## 然後
只要在瀏覽器打上 http://localhost:13520 就可以呼叫API囉<br>
比方 http://localhost:13520/test 就會回應 "Your test is successful."<br>
http://localhost:13520/getImg 就會回應 Json 圖片檔案