In [1]:
import traitlets
import os
import pandas as pd
import logging
import random
from functools import wraps
import sys 
sys.path.append('libs/')
from table_v3 import TableChart

LOG_FILE = '../data/log/etl.log'

def initLog(name='ETL'):
    handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes = 1024*1024, backupCount = 1000)
    fmt = '%(asctime)s - %(filename)s:%(lineno)s - %(name)s - %(message)s'
    formatter = logging.Formatter(fmt)   # 实例化formatter  
    handler.setFormatter(formatter)      # 为handler添加formatter  

    logger = logging.getLogger(name)    # 获取名为tst的logger  
    logger.addHandler(handler)           # 为logger添加handler  
    logger.setLevel(logging.DEBUG)  
    return logger

logger = initLog('ETL')
logger.setLevel(logging.DEBUG) 

def logged(message=None,level=logging.DEBUG,name='ETL'):
    def decorate(func):
        logname = name if name else func.__module__
        log = logging.getLogger(logname)
        logmsg = message if message else func.__name__
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            log.log(level,logmsg)
            return func(*args,**kwargs)
        return wrapper
    return decorate

        

In [2]:
import pandas as pd
import traitlets
from IPython.display import display,update_display
import ipywidgets as widgets
import bqplot as bp
import random
class EtlGUI(widgets.Box):
    
    _cmds = None
    _args = None
    _argsdict = traitlets.Dict()
    
    def __init__(self,cmds,kwargs=None):
        super(EtlGUI,self).__init__()
        self._cmds = cmds
        self._argsdict = kwargs 
        self._args = self.initArgs()
        self._btn_ok = widgets.Button(description=u'确认',layout=widgets.Layout(width='10%',))
        self.children = [self._lb_operation,self._args,self._btn_ok]
        self.layout = widgets.Layout(width="100%",display='flex-flow',flex_flow='row', justify_content=u'flex-start')
        self._btn_ok.on_click(self.execute)
        
    def execute(self,a):
        cmd_label = self._lb_operation.value
        action_handler = getattr(self._cmds,cmd_label)
        if not action_handler:
            raise Exception('label:{} is not a valid ETL command '.format(cmd_label))
        args = {}
        for arg in self._args.children:
            arg_name = arg.children[0].value
            arg_value = arg.children[1].value
            args[arg_name] = arg_value
        action_handler(**args)
    
    @property
    def cmdargs(self):
        return self._argsdict
    
    @cmdargs.setter
    def cmdargs(self,kwargs=None):
        self._argsdict = kwargs
    
    
    
    @traitlets.observe('_argsdict')
    def updateArgs(self,change=None):
        
        if not self._args:
            return 
        for arg in self._args.children:
            arg_name = arg.children[0].value
            if arg_name in self._argsdict:
                arg.children[1].options =  self._argsdict[arg_name]
                arg.children[1].value =  None

        
    def initArgs(self):
        kwargs = self._argsdict
        if 'id' in kwargs:
            del kwargs['id']
        width_auto =  '{}%'.format(100 / len(kwargs))
        input_args = []

        for arg in kwargs:
            value = kwargs[arg]
            if arg ==u'operation':
                self._lb_operation = widgets.Label(value,layout=widgets.Layout(width='10%'))
                continue
            if isinstance(value,int):
                widget_arg = widgets.IntText(value)
            elif isinstance(value,list):
                widget_arg = widgets.Dropdown(options=value,value=None)
                if arg =='col':
                    widget_arg.observe(self.colChoose,'value')
                    
            elif isinstance(value,str):
                widget_arg = widgets.Text(value)
            group_arg = widgets.HBox(layout=widgets.Layout(width=width_auto))
            width_arg_label = '{}px'.format(len(arg)*16)
            group_arg.children = [widgets.Label(arg,layout=widgets.Layout(width=width_arg_label)),widget_arg]
            input_args.append(group_arg)
        
        arg_hbox = widgets.HBox(layout=widgets.Layout(width='80%'))
        arg_hbox.children = input_args
        return arg_hbox
    
    def colChoose(self,change):
        value=change['new']
        self._cmds._table.colChoose([value])
        

class ColWidget(widgets.Box):
    
    value = traitlets.Unicode()
    def __init__(self,label='col',options=None,placehold='please choose a column'):
        super(ColWidget,self).__init__()
        
        self.layout = widgets.Layout(width="100%",display='flex-flow',flex_flow='row', justify_content=u'flex-start')
        self.placehold = placehold
        labels = widgets.Label(label,layout =widgets.Layout(width='64px') )
        if not(isinstance(options,list)):
            raise Exception('options must be a list')
        self.inputs = widgets.Dropdown(options =[self.placehold]+options,value=self.placehold )
        self.children = [labels,self.inputs]
        self.inputs.observe(self.eventColChoose,'value')   
        
    def eventColChoose(self,change):
        if change['new'] != self.placehold:
            self.value = change['new']

class FilterWidget(widgets.Box):
    value = traitlets.Unicode()
    apply = traitlets.Bool(False)
    
    def __init__(self,options=None,dtypes=None,placehold='please choose a column'):
        super(FilterWidget,self).__init__()
        self._optType = dict(zip(options,dtypes))
        self.layout = layout=widgets.Layout(display='flex-flow',flex_flow='column')
        cmdSection = widgets.Box(layout=widgets.Layout(display='flex-flow',flex_flow='row', justify_content=u'flex-start'))
        self.wColWidget = ColWidget(options=options,placehold=u'',label='请选择列')
        self.btnAdd = widgets.Button(description=u'增加条件')
        self.btnApply = widgets.Button(description=u'应用筛选')
        self.btnCancel = widgets.Button(description=u'取消筛选')
        cmdSection.children=[self.wColWidget,self.btnAdd,self.btnApply,self.btnCancel]
        self.children = [cmdSection]
        self.btnAdd.on_click(self.andFilter,False)
        self.btnApply.on_click(self.filterApply,False)
        self.btnCancel.on_click(self.filterCancel,False)
    
    def andFilter(self,a):
        partCol = widgets.Label(self.wColWidget.value,layout=widgets.Layout(width='100px'))
        partOp = widgets.Dropdown(options=['==','>','>=','<','<=','!=','in','between'],layout=widgets.Layout(width='100px'))
        partInput = widgets.Text(placehold=u'请输入筛选范围')
        partAnd = widgets.Label('and',layout=widgets.Layout(width='100px'))
        addCondition = widgets.Box(layout=widgets.Layout(display='flex-flow',flex_flow='row', justify_content=u'flex-start'))
        addCondition.children = [partCol,partOp,partInput,partAnd ]
        self.children = [c for c in self.children] + [addCondition]
    
    def filterApply(self,a):
        cond = []
        for b in self.children[1:]:
            field = b.children[0].value
            op = b.children[1].value
            value = b.children[2].value
            fldType = self._optType.get(field)
            if str(fldType) in ['category','object']:
                cond.append("({} {} \"{}\")".format(field , op ,value)) 
            #elif Todo:handle other type data    
            else:
                cond.append("({} {} {})".format(field , op ,value))
        self.value = ' and '.join(cond)
        self.apply = True
        
    def filterCancel(self,a):
        self.value = ''
        self.apply = False    
        

In [3]:
class EtlScript(traitlets.HasTraits):
    
    _supportType = ['float32','float64','float', 'int','int8','int16','int32',
                    'int64', 'bool', 'datetime64','timedelta','category','object','dict','list']

    #批次读取大小
    _chunksize = 1000
    #数据抽样样本保存路径（hd5格式）
    _cachepath = '../data/cache/'
    #保持数据的元数据
    _metadata = None
    #保存当前正在处理的数据
    _df = pd.DataFrame()
    #数据是否有更新
    _data_change=traitlets.Float()
    
    #应用筛选后保存原始数据
    _df_origin = None
    #表示当前筛选应用状态
    _filter_applied = False
    #
    _filter_expression = ""
    
    def __init__(self):
        pass
    
    #虚方法，子类实现
    def dataRefreshed(self):
        pass
    
    @logged()
    def eventTrigger(func):
        @wraps(func)
        def wrapper(self,*args, **kwargs):
            result = func(self,*args, **kwargs)
            self.dataRefreshed()
        return wrapper 
    
    @logged()
    @eventTrigger
    def drop(self,*args,**kwargs):
        '''
            drop col:column_ref
        '''
        col_name = kwargs.get('col')
        if col_name in self._df:
            self._df.drop(col_name,axis=1,inplace=True)
        else:
            raise Exception('col is not valid, must be in list[{}]'.format(self._df.columns.values.tolist()))

    @logged()
    @eventTrigger  
    def rename(self,*args,**kwargs):
        '''
            rename: col: column_name to:new_col_name
        '''
        col_name = kwargs.get('col')
        col_new_name = kwargs.get('to')
        if col_name in self._df and col_new_name not in self._df:
            self._df.rename({col_name:col_new_name},inplace=True,axis='columns')
        else:
            raise Exception('col name[{}] must be in list{} and new name[{}] must be not'\
                            .format(col_name,self._df.columns.values.tolist(),col_new_name))  
            
    @logged()
    @eventTrigger
    def settype(self,*args,**kwargs):
        '''
            settype col:column_ref to:new_col_type
        '''
        col_name = kwargs.get('col')
        col_new_type = kwargs.get('to')
        if col_name not in self._df or col_new_type not in self._supportType:
            raise Exception('col name[{}] must be in list{} and new type must be in {} '\
                            .format(col_name,self._df.columns.values.tolist(),col_new_name,self._supportType))  
        if col_new_type not in ['dict','list']:
            self._df[col_name] = self._df[col_name].astype(col_new_type,errors='ignore')
        else:
            #todo suppurt dict and list data type
            pass

    @logged()
    @eventTrigger
    def filter(self,*args,**kwargs):
        '''
            filter where:conditional expressions, new:apply filter based on  the beginning or last filter result  
        '''

        where = kwargs.get('where')
        new = kwargs.get('new')
        if isinstance(new,bool) and new:
            self.cancelFilter()
        if not(self._filter_applied):
            self._df_origin =self._df.copy()
            self._filter_applied = True
        self._filter_expression = (self._filter_expression +" & " + where) if self._filter_expression else where
        self._df.query(where,inplace=True)
    
    @logged()
    @eventTrigger
    def cancelFilter(self,*args,**kwargs): 
        '''
            cancelFilter nop
        '''
        if self._filter_applied:
            self._filter_applied = False
            self._df = self._df_origin
            self._df_origin = None
            self._filter_expression = ''
        else:
            logger.info('no filter, no effect of this cancel filter operation')

    @logged()
    @eventTrigger
    def delete(self,*args,**kwargs):   
        where = kwargs.get('where')
        


In [4]:
class CsvData(EtlScript):
    
   # _supportType = ['float32','float64','float', 'int','int8','int16','int32','int64', 'bool', 
   #                'datetime64','timedelta','category','object','dict','list']

    #批次读取大小
    #_chunksize = 1000
    #_cachepath = '../data/cache/'
    #_df = pd.DataFrame()
    #_data_change=traitlets.Float()

    #_samples = 10000
    _datapath = traitlets.Unicode()
    _sep = ','
    _header = 0
    _codec = None
        
    @logged()
    def __init__(self,datapath,samples=10000,sep=',',header=0,codec='utf8', chunksize=1000,usecache=True,cachepath=None):
        if samples and isinstance(samples,int) and (samples>0):
            self._samples = samples
        if chunksize and isinstance(chunksize,int) and (chunksize>0):
            self._chunksize = chunksize
        if header and isinstance(header,int) and (header>0):
            self._header = header
        if isinstance(usecache,bool):
            self._usecache = usecache
        if sep:
            self._sep = sep
        if codec:
            self._codec = codec
        if os.path.isfile(datapath):
            self._datapath = datapath
            self._linetotal = self.estimateLen()
        if cachepath and os.path.isdir(cachepath):
            self._cachepath = cachepath

    @logged()
    def refreshdataType(self):
        self._metadata = {}
        for col in self._df:
            fld = {}
            fld['type'] = self._df[col].dtype
            fld['sumary'] = self._df[col].describe()
            self._metadata[col] = fld
                                    
                                    
    
    @logged()
    @EtlScript.eventTrigger
    def loadSamples(self,samples=None,usecache=True,cachename=None):
        if isinstance(usecache,bool):
            self._usecache = usecache
        if not(self._usecache):
            self.takeSamples()
        else:
            if cachename :
                filename = cachename
            else:
                name = os.path.splitext(os.path.split(self._datapath)[1])[0]
                filename = os.path.join(self._cachepath,name+'.hd5')
            try:
                self._df = pd.read_hdf(filename)
                logger.info('success in loading cache from file:{}'.format(filename))
            except Exception as err:
                logger.exception('fail in loading cache from file:{}\nbegin taking {} samples,\nbecause '
                                 .format(filename,self._samples,str(err)))              
                self.takeSamples()      
        self.refreshdataType()
        return self

    @logged()
    def takeSamples(self,samples=None):
        logger.debug('begin taking sample')
        if samples and isinstance(samples,int) and (samples>0):
            self._samples = samples
        chunker = pd.read_csv(self._datapath,sep=self._sep,header=self._header,chunksize=self._chunksize,encoding=self._codec)
        sampleSize = round(self._samples / (self._linetotal / self._chunksize))
        for piece in chunker:
            self._df = pd.concat([self._df,piece.sample(n=sampleSize)])
        realSize = self._df.shape[0]
        last = self._samples - realSize
        self._df = pd.concat([self._df,piece.sample(n=last)])
        if self._usecache:
            self.cacheSamples()
        self.object2category()
        return self
    
    @logged()
    def cacheSamples(self):
        logger.debug('begin caching sample')
        name = os.path.splitext(os.path.split(self._datapath)[1])[0]
        filename = os.path.join(self._cachepath,name+'.hd5')
        if os.access(self._cachepath, os.W_OK):
            if not(os.path.isfile(filename)) or os.access(filename,os.W_OK):
                self._df.to_hdf(filename,'default')
            else:
                raise Exception('can not create file:{}'.format(filename))
        return self
    
    @logged()    
    def estimateLen(self):
        with open(self._datapath) as file:
            size = 0
            for i,line in enumerate(file):
                size = size +len(line)
                if i >150:
                    break
            lineSize = size /150
            fileSize = os.path.getsize(self._datapath)
            return int(fileSize / lineSize)

   
    #对object类型的列，若唯一值占比小于50%，则转化为category类型，以节约存储
    @logged()
    @EtlScript.eventTrigger
    def object2category(self):
        num_total = self._df.shape[0]
        for col in self._df:
            if self._df[col].dtype == 'object':
                num_unique = len(self._df[col].unique())                
                if num_unique < num_total *0.5:
                    self._df[col] = self._df[col].astype('category')
        #self.dataRefreshed()
        return self

    @logged()
    @EtlScript.eventTrigger
    def convertType(self,col,typename):
        if typename not in self._supportType:
            raise Exception('type must be in list[{}]'.format(','.join( self._supportType)))
        if col not in self._df:
            raise Exception('col[{}] is not a column of the data'.format(col))
        self._df[col] = self._df[col].astype(typename,errors='ignore')
        return self
    
    @property
    def header(self):
        return self._df.columns.values.tolist()
    
    @header.setter
    @EtlScript.eventTrigger
    def header(self,header=[]):
        self._df.columns = header
        #self.dataRefreshed()
    
    @property
    def data(self):
        return self._df.values.tolist()
    
    @property
    def datatypes(self):
        return [self._metadata[f]['type'] for f in self.header]
    
    #数据更新标识
    @logged()
    def dataRefreshed(self):
        self.refreshdataType()
        self._data_change = random.random()
    

        

In [14]:
class EtlManager():
    
    _data = None
    _output = None
    _show = False
    _cmd = None
    
    def __init__(self,path):
        self._data = self.loadData(path)
        self._output = self.initTable()
        self._data.observe(self.eventDatachanged,'_data_change')
        
    
    @logged()
    def eventDatachanged(self,change):
        self.refreshOutput()
    
    def refreshOutput(self):
        header  = zip(self._data.header,self._data.datatypes)
        self._output.model.header = [f for f in header]
        self._output.model.data = self._data.data
    
    @logged()
    def loadData(self,path,samples=30,chunksize=11,usecache=True):
        data = CsvData(path,samples=samples,sep=',',chunksize=chunksize,usecache=usecache)
        data.loadSamples()
        return data
    
    def initTable(self):
        header  = zip(self._data.header,self._data.datatypes)
        return TableChart(header=[f for f in header],body=self._data.data)
    
    def run(self):
        self.cmdPrompt()
        
    def cmdPrompt(self):
        if not(self._cmd):
            self._cmd = FilterWidget(options=self._data.header,dtypes=self._data.datatypes,placehold=u'请选择列')
            self._cmd.observe(self.applyFilter,'value')
        display(self._cmd)

    def showTable(self,enable=True):
        if isinstance(enable,bool):
            self._show = enable
        else:
            raise Exception('display must be bool value')
        if self._show:
            self.show_handle = display(self._output)
        else:
            self._output.close()
            self._show = False
    
    def applyFilter(self,change):
        strFilter = change['new']
        if strFilter:
            self._data.filter(where=strFilter)
        else:
            self._data.cancelFilter()
        
        

In [15]:
m = EtlManager('../data/iris.csv')
m.showTable()

A Jupyter Widget

In [37]:
m._data.settype(col='species',to='category')

In [22]:
m._data.filter(where='(species!="setosa" & petalLength>2.1 & petalWidth>1.9)')

In [23]:
m._data.cancelFilter()

In [11]:
m._data.drop(col='sepalLength')

In [16]:
m.run()

A Jupyter Widget

eventColChoose:species
eventColChoose:petalLength


In [17]:
m._cmd.value

'(species == "setosa")'