In [646]:
# -*- coding: UTF-8 -*-
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
import time
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from tqdm import tqdm
from scipy.fftpack import fft
from matplotlib.pylab import mpl
import csv
import sqlite3 
import array
import sqlite3
import pprint


%matplotlib qt5
plt.rcParams['xtick.direction'] = 'in'
plt.rcParams['ytick.direction'] = 'in'
mpl.rcParams['axes.unicode_minus'] = False  #显示负号

In [647]:
os.chdir(r'E:\data\vallen')
path_pri = r'E:\data\vallen\Ni-tension test-electrolysis-1-0.01-AE-20201031.pridb'
path_tra = r'E:\data\vallen\Ni-tension test-electrolysis-1-0.01-AE-20201031.tradb'
# Ni-tension test-electrolysis-1-0.01-AE-20201031
# Ni-tension test-pure-1-0.01-AE-20201030

In [648]:
conn_tra = sqlite3.connect(path_tra)
conn_pri = sqlite3.connect(path_pri)
result_tra = conn_tra.execute("Select Time, Chan, Thr, SampleRate, Samples, TR_mV, Data, TRAI FROM view_tr_data").fetchall()
result_pri = conn_pri.execute("Select SetID, Time, Chan, Thr, Amp, RiseT, Dur, Eny, RMS, Counts, TRAI FROM view_ae_data").fetchall()

In [518]:
valid_idx = []
for idx, i in enumerate(result_pri):
    if i[-1] not in [None, 0]:
        valid_idx.append(idx)
valid_pri = np.array(result_pri)[valid_idx]

freq_max = []
result_tra = sorted(result_tra, key=lambda x: x[-1])
for idx, i in enumerate(tqdm(result_tra)):
    sig = np.multiply(array.array('h', bytes(i[-2])), i[-3] * 1000)
    thr = i[2]
    Fs = i[3]
    Ts = 1 / Fs
    if valid_pri[idx][-2] > 1:
        valid_wave_idx = np.where(abs(sig) >= thr)[0]
        valid_data = sig[valid_wave_idx[0]:(valid_wave_idx[-1] + 1)]
        N = valid_data.shape[0]
        fft_y = fft(valid_data)
        abs_y = np.abs(fft_y)
        normalization = abs_y / N
        normalization_half = normalization[range(int(N / 2))]
        frq = (np.arange(N) / N) * Fs
        half_frq = frq[range(int(N / 2))]
        try:
            freq_max.append(half_frq[np.argmax(normalization_half)])
        except:
            freq_max.append(frq[0])

100%|█████████████████████████████████████████████████████████████████████████| 64482/64482 [00:02<00:00, 25604.03it/s]


In [521]:
# save features to file
with open('Ni-tension test-electrolysis-1-0.01-AE-20201031.txt', 'w') as f:
    f.write(
        'SetID, TRAI, Time, Chan, Thr, Amp, RiseT, Dur, Eny, RMS, Counts, Frequency(Hz)\n')
    # ID, Time(s), Chan, Thr(μV), Thr(dB), Amp(μV), Amp(dB), RiseT(s), Dur(s), Eny(aJ), RMS(μV), Counts, Frequency(Hz)
    j = 0
    for i in valid_pri:
        if i[-2] > 1:
            f.write('{}, {}, {:.8f}, {}, {:.7f}, {:.7f}, {:.2f}, {:.2f}, {:.7f}, {:.7f}, {}, {:.7f}\n'.format(
                i[0], i[-1], i[1], i[2], i[3], i[4], i[5], i[6], i[7], i[8], i[9], freq_max[j]))
            j += 1

## Plot

In [None]:
titles = ['Original Waveform',
          'Bilateral amplitude spectrum (normalized)', 
          'Unilateral amplitude spectrum (normalized)']
colors = ['purple', 'green', 'blue']
x_label = ['Time (s)', 'Freq (Hz)', 'Freq (Hz)']
y_label = ['Amplitude (μV)', '|Y(freq)|', '|Y(freq)|']
xs = [time_label, frq, half_frq]
ys = [valid_data, normalization, normalization_half]

for i, [x, y, title, color, xlabel, ylabel] in enumerate(zip(xs, ys, titles, colors, x_label, y_label)):
    plt.subplot(311 + i)
    plt.plot(x, y, color)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(title, color=color)
plt.tight_layout()

## Validation

In [None]:
valid_data, thr, valid_data.shape[0]

In [645]:
# Features
# Time, Chan, Thr, SampleRate, Samples, TR_mV, Data, TRAI
i = result_tra[14148]
sig = np.multiply(array.array('h', bytes(i[-2])), i[-3] * 1000)
time = np.linspace(i[0], i[0] + pow(i[-5], -1) * (i[-4] - 1), i[-4])

thr = i[2]
valid_wave_idx = np.where(abs(sig) >= thr)[0]
start = time[valid_wave_idx[0]]
end = time[valid_wave_idx[-1]]
duration = (end - start) * pow(10, 6)
max_idx = np.argmax(abs(sig))
amplitude = max(abs(sig))
rise_time = (time[max_idx] - start) * pow(10, 6)
valid_data = sig[valid_wave_idx[0]:(valid_wave_idx[-1] + 1)]
energy = np.sum(np.multiply(pow(valid_data, 2), pow(10, 6) / i[3]))
RMS = math.sqrt(energy / duration)
count, idx = 0, 1
N = len(valid_data)
for idx in range(1, N):
    if valid_data[idx - 1] <= thr < valid_data[idx]:
        count += 1
# while idx < N:
#     if min(valid_data[idx - 1], valid_data[idx]) <= thr < max((valid_data[idx - 1], valid_data[idx])):
#         count += 1
#         idx += 2
#         continue
#     idx += 1
print(i[0], amplitude, rise_time, duration, energy / pow(10, 4), count, i[-1])

2469.08934745 468.8239626513659 95.94999983164598 2673.149999736779 544.8985155804419 702 14149


In [536]:
# Frquency
i = result_tra[478]
sig = np.multiply(array.array('h', bytes(i[-2])), i[-3] * 1000)
thr = i[2]
Fs = i[3]
Ts = 1 / Fs
# if valid__pri[idx][-2] > 1:
valid_wave_idx = np.where(abs(sig) >= thr)[0]
valid_data = sig[valid_wave_idx[0]:(valid_wave_idx[-1] + 1)]
N = valid_data.shape[0]
fft_y = fft(valid_data)
abs_y = np.abs(fft_y)
normalization = abs_y / N
normalization_half = normalization[range(int(N / 2))]
frq = (np.arange(N) / N) * Fs
half_frq = frq[range(int(N / 2))]
# freq_max.append(half_frq[np.argmax(normalization_half)])
i[-1], half_frq[np.argmax(normalization_half)]

(479, 677966.1016949152)

## Read

In [5]:
def sqlite_read(path):
    """
    python读取sqlite数据库文件
    """
    mydb = sqlite3.connect(path)                # 链接数据库
    mydb.text_factory = lambda x: str(x, 'gbk', 'ignore')
    cur = mydb.cursor()                         # 创建游标cur来执行SQL语句

    # 获取表名
    cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
    Tables = cur.fetchall()                     # Tables 为元组列表
    print(Tables)

    i = 0
    while True:
        try:
            tbl_name = Tables[i][0]                     # 获取第一个表名
        except:
            break
        # 获取表的列名
        cur.execute("SELECT * FROM {}".format(tbl_name))
        col_name_list = [tuple[0] for tuple in cur.description]
        pprint.pprint(col_name_list)
        i += 1

#     # 获取表结构的所有信息
#     cur.execute("PRAGMA table_info({})".format(tbl_name))
#     pprint.pprint(cur.fetchall())

In [None]:
sqlite_read(path_tra)

In [None]:
conn.execute("PRAGMA table_info(view_tr_data)").fetchall()