In [1]:
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from scipy import signal
import os
import glob
from matplotlib import rc
import io
import cv2
from datetime import datetime
from influxdb import InfluxDBClient # module needed to get access and send query to InfluxDB
import re
rc("font", size=6, weight='bold')

# __Preprocessing Class__

In [2]:
from multiprocessing import Pool
class data_preprocessing():
    '''
    Currently, It only extracts the FFT image feature for each acceleration.(2019-04-04)
    Other feature extraction method can be added.
    
    This class uses multiprocessing module.
    Prefix multi_ means that this function uses Pool from multiprocessing
    Prefix single_ means that this function is allocated to process of multi_ functions.
    
    2 underscores(__) both sides of function's name means it's not recommended to be called directly.
    If it's called directly, error handling , closing Pool and validation of returns are not guaranteed.
    
    
    Fields:
    _user: InfluxDB user name
    _port: InfluxDB port
    _passwd: InfluxDB database password
    _addr: InfluxDB address
    _dbname: InfluxDB database name
    _client: instance of InfluxDBClient, which has informations above.
    '''
    def __init__(self, addr, port, username, passwd, dbname):
        '''
        Fields:
        _user: InfluxDB user name
        _port: InfluxDB port
        _passwd: InfluxDB database password
        _addr: InfluxDB address
        _dbname: InfluxDB database name
        _client: instance of InfluxDBClient, which has informations above.
        '''
        self._user = username
        self._port = port
        self._passwd = passwd
        self._addr = addr
        self._dbname = dbname
        try:
            self._client = InfluxDBClient(host=self._addr, port=self._port,username=self._user, password=self._passwd,database=self._dbname)
        except Exception as error:
            print('Error occured while ' + repr(error))
        
    def __create_fft_plot__(self, fs, NFFT, var, time):
        '''
        Create fft heatmap and return plot axis.
        '''
        my_x_ticks_1 = np.arange(0, 800, 200)  # Set axis-ticks.
        my_x_ticks_2 = np.arange(0, 800, 200)  # If use different dataset,
        my_x_ticks_3 = np.arange(-240, 0, 30)  # these may need to be changed.
        
        fig, ax1 = plt.subplots(nrows=1, ncols=1)
        
        pxx, freq, bins, im = ax1.specgram(var, NFFT=NFFT, mode='psd', noverlap=0, scale='dB', Fs=fs, vmin=-125, vmax=-15,
                                           cmap='gray')  # Plot spectrogram.

        ax1.set_xlabel('Time [s]', fontweight='bold')
        ax1.set_ylabel('Frequency [Hz]', fontweight='bold')
        ax1.set_xticks(my_x_ticks_2)
        plt.xticks(fontsize=6)
        return ax1
    
    def __ax_to_ndarray__(self, ax, **kwargs):
        ax.axis('off') # remove axis
        ax.figure.canvas.draw() # first, draw image on canvas
        trans = ax.figure.dpi_scale_trans.inverted()
        bbox = ax.bbox.transformed(trans)

        buff = io.BytesIO() # send image to buffer, not file
        plt.savefig(buff, format='png', dpi=ax.figure.dpi, bbox_inches=bbox, **kwargs)
        buff.seek(0) # set file pointer to 0
        im = plt.imread(buff) # read it back, it returns ndarray class
        return im
    
    

    '''
    Image Processing
    *****************************************************
    Connection to DB or I/O from DB
    '''
    def connect_to_InfluxDB(self, addr, port, username, passwd, dbname):
        '''
        This method is for connecting InfluxDB
        It needs 6 parameters, addr, port, username, passwd, dbname, and query.
        addr = IP address of Server which operates InfluxDB
        port = port number of InfluxDB
        username = username of InfluxDB
        passwd = password of InfluxDB
        dbname = database name
        '''
        try:
            self._client = InfluxDBClient(host=addr, port=port,username=username, password=passwd,database=dbname)
        except Exception as error:
            print('Error occured while ' + repr(error))
    def make_multi_query(self, dev_id = None, timespan = ('now() - 30m', 'now()'),  limit=None):
        ''' 
        Make mutli-query for multi_query_process().
        Need 3 parameters.
        timespan: tuples (past,recent) both are string which follows InfluxDB query grammar. e.g.) ('now() - 2h', 'now()'),(2019-03-31T10:00:00.000Z,2019-03-31T11:00:00.000Z) 
                    Default value is last 30 miniutes span.
        dev_id: List of devices to get data from. Default value is all devices
        limit: Integer number. Limit of number of data to get.
        
        query would be "select * from acc_data where dev_id=\'dev_id\'  time > now() - time limit limit
        '''
        # parameter validation check and correction.
        if dev_id == None:
            dev_id = self.get_dev_ids()
        elif all(isinstance(dev_id, str) for dev in dev_id) == False:
            pattern = re.compile('[0-9][0-9][0-9][0-9]') # Regular expression which accepts string which contains 4 numbers like 0000, 0123, 1430.
            for i in range(len(dev_id)):
                if type(dev_id[i]) is int:
                    dev_id[i] = '{:0>4}'.format(dev_id[i])
                elif pattern.match(dev_id[i]) == None:
                    try:
                        dev_id[i] = int(dev_id[i])
                        dev_id[i] = '{:0>4}'.format(dev_id[i])
                    except ValueError as ve:
                        print(ve)
                        print('Invalid string format. Please write only integer number with [0-9][0-9][0-9][0-9] format.')
                    except Exception as e:
                        print(e)
        
        if limit == None:
            query_list = list(map(lambda dev: f'select * from acc_data where dev_id = \'{dev}\' AND time > {timespan[0]} AND time < {timespan[1]}' 
                              ,dev_id))
        else:
            query_list = list(map(lambda dev: f'select * from acc_data where dev_id = \'{dev}\' AND time > {timespan[0]} AND time < {timespan[1]} limit {limit}' 
                              ,dev_id))
        
        return query_list
    def multi_read_from_DB(self, query_list, num_Proc=8, valid_check=True):
        '''
        It's for reading data and save images with multiprocessing.
        
        num_Proc : The number of processes in Pool. Default is 8
        query_list : List of query to request. Recommended to use data_preprocessing.make_multi_query_list()
        
        '''
        query_pool = Pool(num_Proc)
        df_list = query_pool.map(self.__single_read_from_DB__, query_list) # Error handling must be needed.
        if valid_check:
            df_list = self.__multi_data_valid_check__(df_list=df_list, proc_pool=query_pool)
        query_pool.close()
        return df_list
    def __single_read_from_DB__(self, query=None):
        '''
        This method is for reading data from InfluxDB described in self._client
        Need 1 parameter, query.
        
        query: query to send
        
        this method returns pandas DataFrame with columns=['dev_id','x','y','z', 'time']
        dev_id : device id
        x, y, z : acceleration value on each axis.
        time : epoch time(ms) when the acceleration value observed by device.
        '''
        def to_epoch(row):# str time to epoch
            unix_epoch = datetime(1970,1,1)
            try:
                log_dt = datetime.strptime(row, "%Y-%m-%dT%H:%M:%S.%fZ")
            except ValueError as ve:
                log_dt = datetime.strptime(row, "%Y-%m-%dT%H:%M:%SZ")
            except Exception as e:
                print(e)
            return int((log_dt -unix_epoch).total_seconds() * 1000)
        
        result = self._client.query(query)
        acc_dict_list = list()
        for data in result.get_points():
            acc_dict_list.append(data)
        acc_df = pd.DataFrame(acc_dict_list, columns=['dev_id', 'x','y','z','time'])
        acc_df['time'] = acc_df['time'].apply(to_epoch)
        
        return acc_df 
    
    def get_dev_ids(self):
        dev_result = self._client.query('show series')
        dev_ids = list(map(lambda x: x[0].split('=')[1], dev_result.raw['series'][0]['values']))
        return dev_ids
    def __single_data_valid_check__(self, df):
        valid = True
        if(len(df) <= 2):
            print('Data which device send has too small data')
            return False
        try:
            for i in range(1, 60000):
                if df['time'][i] - df['time'][i - 1] >=20:
                    print('invalid time gap:{} dropped.'.format(df['dev_id'][0]))
                    return False
            if valid:
                return True
            else:
                return False
        except IndexError:
            print(df['dev_id'][0], 'Index error. It might have insufficient data.')
            return False
    
    def __multi_data_valid_check__(self, df_list, proc_pool):
        '''
        Check data if it's valid.
        '''
        df_valid_l = proc_pool.map(self.__single_data_valid_check__, df_list)
        i = 0
        for nth, valid in enumerate(df_valid_l):
            if valid == False:
                df_list.pop(nth - i)
                i += 1
        return df_list
    
    def multi_save_images(self, df_list, fs=100, NFFT=None,save_dir='.', num_Proc=4):
        '''
        Save FFT heatmap images for each device.
        '''
        save_pool = Pool(num_Proc)
        param = list(map(lambda df: [df, fs, NFFT, save_dir], df_list))
        save_pool.starmap(self.__single_save_image__, param)
        save_pool.close()
    
    def __single_save_image__(self, data, fs=100, NFFT=None,save_dir='.'):
        filename,x,y,z,t = data['dev_id'][0], data['x'], data['y'], data['z'], data['time']
        x = x - np.mean(x) # Extracted by the mean
        y = y - np.mean(y)
        z = z - np.mean(z)
        t = (t-t[0])/1000 # Convert timestamp to [s]
        
        if NFFT == None:
            NFFT = 2*fs # Number of samples used in each window for FFT. We set two seconds window.
        else:
            NFFT = NFFT
        plt.close('all') # clearing current figures.
        ax = self.__create_fft_plot__(fs, NFFT, x, t)
        ay = self.__create_fft_plot__(fs, NFFT, y, t)
        az = self.__create_fft_plot__(fs, NFFT, z, t)
        x_img = self.__ax_to_ndarray__(ax)[:,:,0]
        y_img = self.__ax_to_ndarray__(ay)[:,:,0]
        z_img = self.__ax_to_ndarray__(az)[:,:,0]
        RGB_xyz = np.zeros(shape=[x_img.shape[0], x_img.shape[1], 3])
        RGB_xyz[:, :, 0] = x_img
        RGB_xyz[:, :, 1] = y_img
        RGB_xyz[:, :, 2] = z_img
        RGB_xyz = (RGB_xyz*255).astype('uint8')
        cv2.imwrite(save_dir + '/' + filename +'.png', RGB_xyz)
        plt.close('all') # avoid memory leak
    
    '''
    ***************************************************
    Prototypes
    '''
    
    
    def __read_from_dir_csv__(self, data_dir='.'):
        '''
        This method depict that object could get data from file system where python program is located.
        This method reads all .csv file in directory.
        **File type must be CSV.**
        
        It needs one parameter, data_dir.
        data_dir = Directory in which csv files are located. Default is current directory.
        '''
        qry = data_dir + '/*.csv'
        files = glob.glob(qry)
        
        
        return False # it will be substituted later.
    
    

## Test code

In [3]:
test_class = data_preprocessing(addr='155.230.28.170', port=8086, username='sslab', passwd='1231', dbname='kmaeq')
save_dir = '/home/knusslab/Sang/Imgdata'

In [4]:
timespan=('now() - 2h', 'now()')
dev_ids=['0025','0071','0086']
limit=60000
qry_list = test_class.make_multi_query(timespan=timespan, dev_id=None, limit=limit)

In [5]:
if __name__ == '__main__':
    start = datetime.now()
    df_list = test_class.multi_read_from_DB(query_list=qry_list, num_Proc=16, valid_check=True)
    end = datetime.now()

    print(end - start)

Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
invalid time gap:0060 dropped.
Data which device send has too small data
invalid time gap:0033 dropped.
Data which device send has too small data
invalid time gap:0016 dropped.
invalid time gap:0055 dropped.
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
Data which device send has too small data
invalid time gap:0227 dropped.
invalid time gap:0254 dropped.
Data which device send has too small data
Data which devic

In [33]:
ids = test_class.get_dev_ids()

In [7]:
test_class.multi_save_images(df_list=df_list, fs=100, NFFT=200, num_Proc=16, save_dir='../../Imgdata')

In [None]:
test_class.__save_single_image__(data=df_list[0], fs=100, NFFT=200)