In [48]:
import pandas as pd
import numpy as np
import os
import re
from decimal import *
# generate random Gaussian values
from numpy.random import seed
from numpy.random import randn,randint
import matplotlib.pyplot as plt

### Calculate sorption parameters 

In [49]:
interval_list = [[[0.59,0.67],[],[0.68,0.72],[0.45,0.55],[0.56,0.58],[],[]], # II
                [[0.340,0.43],[],[0.44,0.47],[0.30,0.33],[0.20,0.29],[0.48,0.55],[0.100,0.200],[0.56,0.700],[0.71,100],[0,0.09]], #III
                [[0.20,0.29],[0.040,0.055],[0.30,0.35],[0.40,0.50],[0,0.19],[]], # V
                [[0.76,0.83],[],[0.40,0.75],[0.84,1.3],[1.4,3.0],[3.1,100],[]], # I
                [[0.561,0.659],[0.27,0.33],[0.66,0.69],[0.70,0.75],[0.50,0.56],[0.34,0.49],[0.76,0.95],[0.96,100],[0,0.26]], # IV               
                [[53,67],[],[40,52],[68,70],[0,39],[71,100],[0,30]], #T
                [[0.22,0.26],[],[0.17,0.21],[0.27,0.30],[0.31,0.43],[0.10,0.16],[0.44,0.55]]] # S(30/60)

In [50]:
def read_data(path_texts):
    return pd.read_excel(path_texts, sheet_name=1, header=0)

In [51]:
def tfg_converter(lines):
    #splitting to title string and numerical data
    title_raw = lines[:8]
    tf_raw = lines[8:170]
    #numerical data list forming
    tf_list = []
    for i in range(len(tf_raw)):
        line = re.sub(r"(?i)[<point time=alue=/>]", "", tf_raw[i])
        tf_str = line.replace('v',' ').replace('"',"").replace(",",".").split()
        tf_num = float(tf_str[1])
        tf_list.append(tf_num)
    #title string forming
    title_str = []
    for i in range(len(title_raw)):
        line = re.sub(r"(?i)[mesure<name></name>dciptiolghk=]", "", title_raw[i])
        ts=line.replace('"',"").replace(",",".").replace('\ufeff',"")
        title_str.append(ts)
    return title_str[1:], tf_list

In [52]:
def parse_covid_tfg(data_path):
    tree = ET.parse(data_path)
    root = tree.getroot()
    tf_arr = []
    meta_info = root.find('name').text
    for point in root.iter('point'):
        tf_arr.append(int(point.attrib['value']))
        
    return meta_info, np.array(tf_arr, dtype=np.float64)


In [53]:
def get_arr(filename):
    file = open(filename, encoding='utf-8')
    lines = file.readlines()
    lines = [line.rstrip() for line in lines]
    
    tfg_pred = tfg_converter(lines)
    meta_info = tfg_pred[0]

    tf_arr = np.array(tfg_pred[1])
    tf_arr = tf_arr[0] - tf_arr
    
    file.close()
    
    return meta_info, tf_arr

def get_arr_from_covid_tfg(filename):
    return parse_covid_tfg(filename)

In [54]:
#interval membership detector
def detect(list, x):
    getcontext().prec = 2
    fl = (x >= Decimal(str(list[0])))
    fr = (x <= Decimal(str(list[1])))
    if fl & fr:
        return 1
    else:
        return 0

In [55]:
#list of active intervals
def ind_tx(feature_intervals, fv):
    candidates = []
    for k in range(len(feature_intervals)):
        ivl = feature_intervals[k]
        if len(ivl) == 0:
            continue
        if detect(ivl, fv) == 1:
            candidates.append(k)
    if len(candidates) == 0:
        return -1
    # in case of nested intervals choose the smallest
    elif len(candidates) > 1:
        sorted_candidates = sorted([feature_intervals[k] for k in candidates], key=lambda tup: tup[1])
        final_candidate = sorted_candidates[0]
        return feature_intervals.index(final_candidate)

    return candidates[0]

In [56]:
def calculate_features(tf_arr):
    # precision installation
    getcontext().prec = 2

    # Noise level
    dev=2
    # seed random number generator
    seed(42)
    # generate some Gaussian values
    tf_arrN = randint(-dev,dev,size=len(tf_arr))+tf_arr    

    def integral_sum(a, b):
        dx = b - a
        y1 = tf_arr[a]
        y2 = tf_arr[b]

        return (y1 + y2) / 2.0 * dx

    def tau_l(ref, arr):
        i=1
        for k in range(60):
            if arr[i] <= ref:
                i=i+1
            else:
                tleft=i-1
                return tleft

    #a(40/60)
    f1 = Decimal(str(tf_arr[40])) / Decimal(str(tf_arr[60]))

    #a(30/60)
    f2 = Decimal(tf_arr[30]) / Decimal(tf_arr[60])

    #a(20/60)
    f3 = Decimal(tf_arr[20]) / Decimal(tf_arr[60])

    #a(40/70)
    f4 = Decimal(tf_arr[40]) / Decimal(tf_arr[70])

    #a(20/30)
    f5 = Decimal(tf_arr[20]) / Decimal(tf_arr[30])

    #T
    with localcontext() as ctx:
        ctx.rounding = ROUND_HALF_UP
        ref = Decimal(np.amax(tf_arr)/2).to_integral_value()
        f6 = tau_l(ref, tf_arr) * 2

    #S(30)/S(60)
    f7 = Decimal(integral_sum(4, 30)) / Decimal(integral_sum(4, 60))

    return [f1, f2, f3, f4, f5, f6, f7]

In [57]:
def collect_output_text(features, interval_list, kuchmagic_table):
    text_out = []
    intervals = []
    text_alarm='out of intervals'

    for i, f in enumerate(features):
        #get index of interval where the value is
        row = ind_tx(interval_list[i], f)
        if row == -1:
            text_out.append(text_alarm)
        else:
            text_out.append(kuchmagic_table.iloc[row][i+1].strip())
        intervals.append(row)
    return text_out, intervals

In [58]:
# get meta info and final list of texts from tfg file
# used for output of text values of parameters
def get_text_from_tfg(data_path, interval_list, kuchmagic_table):
    meta_info = []
    text = []

    try:
        meta_info, tf_arr = get_arr(data_path)
    except:
        meta_info, tf_arr = get_arr_from_covid_tfg(data_path)

    text = []
    if tf_arr.size > 0:
        features = calculate_features(tf_arr)
        text, _ = collect_output_text(features, interval_list, kuchmagic_table)

    return meta_info, text

In [59]:
# if a measurment is taken on the left hand, only the token with description for left hand will be added to final text
# the same way for the right hand
def parse_left_right(texts):
    new_texts = []
    for text in texts:
        new_text = []
        for token in text.split("/"):
            if any(i for i in['слева', 'справа', 'левая', 'правая'] if i in f) and 'слева и справа' in token:
                continue
            if any(i for i in['слева', 'левая'] if i in f) and \
               any(i for i in['справа', 'правая'] if i in token):
                continue
            if any(i for i in['справа', 'правая'] if i in f) and \
               any(i for i in['слева', 'левая'] if i  in token):
                continue
            new_text.append(token)
        new_texts.append("/".join(new_text))
    return new_texts

### Calculate health index

In [60]:
health_index_interval = [[[89.5, 100], [80.5, 89.4], [70.0, 80.4], [59.5, 69.9], [54.5, 59.4], [49.5, 54.4], [29.5, 49.4], [10, 29.4]]]

In [61]:
def read_data_health_index(path_texts):
    return pd.read_excel(path_texts, sheet_name=2, header=0)

In [62]:
# get final list of texts and corresponding intervals 
# used for health index calculation
def get_features_and_intervals(filename, interval_list, kuchmagic_table):
    text = []
    tf_arr = []
    meta_info = []

    try:
        meta_info, tf_arr = get_arr(filename)
    except:
        meta_info, tf_arr = get_arr_from_covid_tfg(filename)

    text = []
    if tf_arr.size > 0:
        features = calculate_features(tf_arr)
        text, intervals = collect_output_text(features, interval_list, kuchmagic_table)
    
    return features, intervals, tf_arr, meta_info

In [63]:
def calculate_health_index(filename, interval_list):

    min_sp = 6.9
    max_sp = 20.2

    factor_map = [[1, 1, 1.5, 1.5, 2, 2.5, 3],
                 [1, 1, 1.5, 1.5, 2, 2, 2.5, 2.5, 2.5, 3],
                 [1, 1, 1.5, 2, 3, 3],
                 [1, 1, 1.5, 1.5, 2, 2.5, 3],
                 [1, 1, 1.5, 1.5, 1.5, 2, 2, 2.5, 3],
                 [1, 1, 1.5, 2, 2, 2.5, 3],
                 [1, 1, 1.5, 1.5, 2, 2.5, 2.5]]

    features, intervals, tf_arr, meta_info = get_features_and_intervals(filename, interval_list, data)

    i0, i1, i2, i3, i4, i5, i6 = intervals

    SP = (factor_map[0][i0]*1) + \
         (factor_map[1][i1]*1) + \
         (factor_map[2][i2]*1) + \
         (factor_map[3][i3]*0.9) + \
         (factor_map[4][i4]*1) + \
         (factor_map[5][i5]*1.0) + \
         (factor_map[6][i6]*1)
    
    norm_result = (SP - min_sp) / (max_sp - min_sp)

    health_index = (1 - norm_result) * 100
    with localcontext() as ctx:
        ctx.prec = 3
        health_index = Decimal(health_index) * Decimal(1.0)

    return health_index, tf_arr, meta_info

### Подключение к серверу

In [64]:
#pip install anvil-uplink

In [65]:
import anvil.server
import anvil.media
import anvil.mpl_util
anvil.server.connect("YLCFGEBMNC4EME322FFSBF5V-AMCZWDEPAXOA6IYT")

In [66]:
data = read_data('https://raw.githubusercontent.com/BogoroditskayaEkaterina/electro-nose/main/kuch_magic.xlsx')
health_data = read_data_health_index('https://raw.githubusercontent.com/BogoroditskayaEkaterina/electro-nose/main/kuch_magic.xlsx')

In [68]:
@anvil.server.callable
def make_plot(tf_arr):
    x = list(range(0, len(tf_arr)))
    y = tf_arr
    
    plt.clf()
    plt.figure(figsize=(10,5), dpi=80)
    plt.grid()
    
    plt.xlabel('time')
    plt.ylabel('frequency')
    plt.plot(x, y)
    return anvil.mpl_util.plot_image()

@anvil.server.callable
def get_health_index(filename):
    with anvil.media.TempFile(filename) as filename:
        health_indices, tf_arr, meta_info = calculate_health_index(filename, interval_list)
        
        temp = []
        for i in tf_arr:
            temp.append(float(i))
            
    return(meta_info, float(health_indices), temp, collect_output_text([health_indices], health_index_interval, health_data))