# Notebook Test Practices

> A test for bpm, spo2 and respirat.

In [1]:
# | default_exp core

In [2]:
class Color:
    def __init__(self, color):
        self.color = color

    def _repr_markdown_(self):
        style = (
            f"background-color: {self.color}; width: 50px; height: 50px; margin: 10px"
        )
        return f'<div style="{style}"></div>'


Color("green")

<div style="background-color: green; width: 50px; height: 50px; margin: 10px"></div>

In [3]:
# | hide
from __future__ import annotations
from nbdev.showdoc import *
from fastcore.test import *

In [4]:
def inc(x):
    return x + 1


test_eq(inc(3), 4)


def divide(x, y):
    return x / y


test_fail(lambda: divide(1, 0), contains="division by zero")

In [5]:
# | export
class Number:
    "A number."

    def __init__(self, num):
        self.num = num

    def __add__(self, other):
        "Sum of this and `other`."
        return Number(self.num + other.num)

    def __repr__(self):
        return f"Number({self.num})"

In [6]:
show_doc(Number.__add__)


---

### Number.__add__

>      Number.__add__ (other)

Sum of this and `other`.

In [7]:
Number(5) + Number(4)

Number(9)

## All together

In [8]:
# | hide
import numpy as np


# | export
def all(
    a,  # Input array or object that can be converted to an array.
    axis: int
    | tuple
    | None = None,  # Axis or axes along which a logical AND reduction is performed (default: all).
    out: np.ndarray
    | None = None,  # Alternate output array in which to place the result.
    keepdims: bool = np._NoValue,  # Leave reduced one-dimensional axes in the result?
    where=np._NoValue,  # Elements to include in reduction. See `numpy.ufunc.reduce` for details. New in version 1.20.0.
) -> (
    np.ndarray | bool
):  # A new boolean or array, or a reference to `out` if its specified.
    "Test whether all array elements along a given axis evaluate to `True`."
    ...

In [9]:
class MyArray(np.ndarray):
    def all(self, axis=None, out=None):
        ...

In [10]:
x = [[True, False], [True, True]]
y = MyArray((2, 2))
y[:] = x
np.all(y)  # No TypeError since `keepdims` isn't passed
test_fail(
    lambda: np.all(y, keepdims=True),
    contains="all() got an unexpected keyword argument 'keepdims'",
)

In [11]:
# | hide
import nbdev

nbdev.nbdev_export()

In [12]:
# | export
def say_hello(to):
    "Say hello to somebody"
    return f"Hello {to}!"

In [13]:
say_hello("Isaac")


'Hello Isaac!'

## hr detect server

In [19]:
# | export
# the sample is from http://git.xinktech.com/yscz/alg/hr_detect_server/uploads/81dac5cbf6904113c61dee9305e9eada/sample.mp4
video_path = "/home/wangli/projects/hr_detect_server/sample.mp4"
server_ip = "localhost:8293"  # ip and port of the server.
hr_server_url = "http://{}/".format(server_ip)  # url of the server.

In [31]:
import base64
import time
import json
import os
import cv2
import requests


# | export
def send_frame(frame, frame_id):
    image = cv2.imencode(".jpg", frame)[1]
    base64_data = str(base64.b64encode(image))[2:-1]
    datas = {"image": base64_data, "image_id": frame_id, "timestamp": int(time.time())}

    result = requests.post(
        hr_server_url + "process_framebyframe/", data=json.dumps(datas)
    )
    if result.status_code == 200:
        result = result.json()
        return result
    else:
        return None


# | export
def detect_pulse(device):
    records = {"bpm": [], "spo2": [], "respirat": []}

    if os.path.isdir(device):
        cap = cv2.VideoCapture(device + "//img_%04d.png")
    else:
        cap = cv2.VideoCapture(device)

    result = requests.post(
        hr_server_url + "start_hr_detect/?camera_fps=30&heart_interval=100&save=False"
    )
    print("start statue:", result)

    frame_num = 0
    while cap.isOpened():
        ret, frame = cap.read()

        if ret:  # 若是读取成功
            frame_num += 1
            res = send_frame(frame, "{}".format(frame_num))
            print(res)

        else:
            cap.release()
            break

    time.sleep(2)  # left 2s for the server to process the received frames.
    res = requests.get(hr_server_url + "get_bpm/")
    if res.json() != {}:
        records["bpm"] += list(res.json().values())
        print(res.json())
    res = requests.get(hr_server_url + "get_respirat/")
    if res.json() != {}:
        records["respirat"] += list(res.json().values())
        print("respirat:", res.json())
    res = requests.get(hr_server_url + "get_spo2/")
    if res.json() != {}:
        records["spo2"] += list(res.json().values())
        print("spo2 level:", res.json())

    return records

In [32]:
results = detect_pulse(video_path)


start statue: <Response [200]>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
{'241': [0, [180.17578125], [2.6407256722450256], True]}
spo2 level: {'241': [0, [91.49830183645716], [0], True]}

In [34]:
import plotly.graph_objects as go
import numpy as np


def get_signal_snr(records, name):
    sig = [r[1][0] for r in records[name]]
    snr = [r[2][0] for r in records[name]]

    return sig, snr


# | export
def plot_physics(records):
    names = ["bpm", "spo2", "respirat"]
    colors = ["red", "green", "blue"]
    trances = []
    for i in range(len(names)):
        name = names[i]
        sig, snr = get_signal_snr(records, name)
        if name == "respirat":
            start = 16
        else:
            start = 8
        x = np.arange(start=start, stop=start + len(sig))
        trace1 = go.Scatter(
            x=x, y=sig, mode="lines+markers", name=names[i], line=dict(color=colors[i])
        )
        trace2 = go.Scatter(
            x=x,
            y=snr,
            mode="lines+markers",
            name=names[i] + " snr",
            line=dict(dash="dash", color=colors[i]),
        )
        trances.append(trace1)
        trances.append(trace2)

    fig = go.Figure(data=trances)

    # 设置图表标题和轴标签
    fig.update_layout(title="Signal Curves", xaxis_title="X-axis", yaxis_title="Y-axis")

    # 显示图表
    fig.show()

In [35]:
plot_physics(results)


In [33]:
print(results)


{'bpm': [[0, [180.17578125], [2.6407256722450256], True]], 'spo2': [[0, [91.49830183645716], [0], True]], 'respirat': []}
