In [29]:
import pandas as pd
import os
import chardet
from darwinutils.log import get_task_logger
from darwinutils.mapreduce import parallel_starmap_p
import numpy as np
import time

logger = get_task_logger(__name__)

global_df_lst = []

class CSV2DF(object):
    def __init__(self, max_byte_num_for_coding_detect=100*1024, max_thread_num=None, csv_max_read_lines = 50000):
        if max_thread_num is None:
            self._max_thread_num = max(os.cpu_count() - 4, 3)
        else:
            self._max_thread_num = max_thread_num
        self._max_byte_num_for_coding_detect = max_byte_num_for_coding_detect
        self._csv_max_read_lines = csv_max_read_lines

    def detect_coder(self, file_path):
        with open(file_path, 'rb') as f:
            if os.stat(file_path).st_size <= self._max_byte_num_for_coding_detect:
                detector = chardet.detect(f.read())
            else:
                detector = chardet.detect(f.read(self._max_byte_num_for_coding_detect))
        """There are some issues while using gb2312 so change to gb18030"""
        if 'gb2312' == detector['encoding'].lower():
            detector['encoding'] = 'gb18030'
        logger.debug("Detected Coder info is {}".format(detector))
        return detector['encoding']

    def read_csv_file(self, file_path, encoding, skiprows=None, read_rows=None, usecols=None):
        func = pd.read_csv
        try:
            file_df = func(file_path, encoding=encoding, skiprows=skiprows, nrows=read_rows,low_memory=False, usecols=usecols)
        except Exception as e:
            """Normally, it caused by out of range for skiprows"""
            print("{} - {} may cause out of range for file {}. Reason({})".format(
                skiprows, read_rows, os.path.basename(file_path), str(e)))
            file_df = None
        return file_df

    def map_column_name_idx(self, column_name_lst, map_column_name_lst):
        map_column_idx = []
        for column_name in map_column_name_lst:
            map_column_idx.append(column_name_lst.index(column_name))
        map_column_idx.sort()
        return map_column_idx
            
    def read_content(self, file_path, usecols=None, encoding=None):
        if not os.path.exists(file_path):
            logger.error("{} does not exist".format(file_path))
            return None
        if encoding is None:
            coder = self.detect_coder(file_path)
        else:
            coder = encoding
        skiprows = 0
        """Get Header"""
        header = self.read_csv_file(file_path, encoding=coder, skiprows=skiprows, read_rows=2)
        columns_name = header.columns.tolist()
        all_df = []
        print("Start Read: Coder:{}".format(coder))
        loop_num = 0
        if usecols is not None:
            if len(set(usecols).intersection(set(columns_name))) != len(usecols):
                print("Error: Wrong usecols setting: {} not in list".format(set(usecols).difference(set(usecols).intersection(set(columns_name)))))
                return None         
            tmp_cols = usecols
            usecols = self.map_column_name_idx(columns_name, usecols)    
            columns_name = list(map(lambda s:columns_name[s], usecols))

        while True:
            param_lst = []
            print("Start Batch Read")
            for cnt in range(self._max_thread_num):
                param_lst.append((file_path, coder, skiprows, self._csv_max_read_lines, usecols))
                skiprows += self._csv_max_read_lines
            file_df_lst = parallel_starmap_p(self.read_csv_file, param_lst)
            file_df_lst = list(file_df_lst)
            print("Batch Read Done")
            if file_df_lst[-1] is None:
                """Read complete"""
                all_df.extend(list(filter(lambda s: s is not None, file_df_lst)))
                break
            else:
                all_df.extend(file_df_lst)
                if file_df_lst[-1].shape[0] != self._csv_max_read_lines:
                    """Read complete"""
                    break
            loop_num += 1
        print("Merge {} pieces of DF together. Total Loop: {}".format(len(all_df), loop_num))
        if len(all_df) > 1:
            for df in all_df:
                df.columns = columns_name
        return all_df

In [30]:
from darwinutils.mapreduce import parallel_starmap_t
import collections
import datetime

class DF_CLEAN:
    def __init__(self, head_flag=True):
        if len(global_df_lst) > 0:
            if head_flag:            
                self._columns = global_df_lst[0].columns.tolist()            
            else:
                slef._columns = ["c%04d" % x for x in range(len(global_df_lst[0].columns))]
                for df in global_df_lst:
                    df.columns = self._columns
            self._columns_dtype_dict = global_df_lst[0].columns.to_series().groupby(global_df_lst[0].dtypes).groups
        else:
            self._columns = None
        self._working_columns = []  # seems pool thread does not support a very long parameter list, have to use this method
        self._missing_value_columns = []
        self._factor_unknown = "unknown"
        self._factor_map_dict = collections.defaultdict(dict)
         
    def _get_missing_value_column_name_lst(self, df_idx):
        missing_value_column_name = []
        #print("Processing {} DF".format(df_idx))
        for name in self._working_columns:
            if global_df_lst[df_idx][name].isnull().any():
                missing_value_column_name.append(name)
        return missing_value_column_name
    
    def _reduce_lst(self, columns_lst, axis=0, method='union'):
        # axis 0 means row dealwith
        # axis 1 means column dealwith
        # method can use union and intersection, difference and so on
        value_c_lst = []
        if axis == 1:
            columns_zip = list(zip(*columns_lst))
            print("Total {} blocks".format(len(columns_zip)))
            for column_block in columns_zip:
                #print("Total {} part per block".format(len(column_block)))
                merged_block = set(column_block[0])
                for merged_part in column_block[1:]:
                    merged_block =eval("set.{}".format(method))(merged_block,set(merged_part)) 
                value_c_lst.append(list(merged_block))
        else:
            last_set = set(columns_lst[0])
            for columns_info in columns_lst[1:]:
                last_set = eval("set.{}".format(method))(last_set,set(columns_info)) 
            value_c_lst = list(last_set)
        return value_c_lst
        
    def check_missing_value_columns(self, column_names=None, df_num=None):
        if column_names is None:
            column_names = self._columns
        if column_names is None:
            print("Error: DF dose not have columns")
            return None
        param_lst = []
        if df_num is None:
            df_num = len(global_df_lst)
        self._working_columns = column_names
        for idx in range(df_num):
            param_lst.append([idx])

        column_name_list = parallel_starmap_p(self._get_missing_value_column_name_lst, param_lst)
        column_name_list = list(column_name_list)
        assert(len(column_name_list)==df_num)     
        self._missing_value_columns = self._reduce_lst(column_name_list, axis=0, method='union')
        return self._missing_value_columns
    
    def _get_same_value_column_name_lst(self, df_idx):
        same_value_column_name = []
        #print("Processing {} DF".format(df_idx))
        for name in self._working_columns:
            #if df_idx == 1:
            #    print("Check same value for column {}".format(name))
            if global_df_lst[df_idx][name].isnull().all():
                #elif global_df_lst[df_idx][name].isna().all():
                same_value_column_name.append(name)
            else:
                tmp_df = global_df_lst[df_idx][name].fillna(self._factor_unknown)
                if len(set(tmp_df.values)) == 1:
                    same_value_column_name.append(name)
            
        #if(df_idx == 1):
        #    print(same_value_column_name)
        return same_value_column_name
        
    def check_same_value_columns(self, column_names=None, df_num=None):
        if column_names is None:
            column_names = self._columns
        if column_names is None:
            print("Error: DF dose not have columns")
            return None
        param_lst = []
        if df_num is None:
            df_num = len(global_df_lst)
        self._working_columns = column_names
        for idx in range(df_num):
            param_lst.append([idx])
        column_name_list = parallel_starmap_p(self._get_same_value_column_name_lst, param_lst)
        column_name_list = list(column_name_list)
        #print(column_name_list)
        assert(len(column_name_list)==df_num)        
        return self._reduce_lst(column_name_list, axis=0, method='intersection')
    
    def _get_factor_values(self, df_idx):
        factor_column_values = []
        #if (df_idx == 0):
        #    print("Calculate Factor of columns: Processing {} DF".format(df_idx))
        #    print(len(self._working_columns))
        for name in self._working_columns:
            tmp_df = global_df_lst[df_idx][name].fillna(self._factor_unknown)
            tmp_lst = list(tmp_df.values)
            tmp_lst.append(self._factor_unknown)
            factor_column_values.append(list(set(tmp_lst)))

        return factor_column_values
    
    def _change_column_type(self, df_idx, column_type):
        #print("Change Column Types: Processing {} DF".format(df_idx))
        for name in self._working_columns:
            global_df_lst[df_idx][name] = global_df_lst[df_idx][name].astype(column_type)
    
    def get_factor_candidate_columns(self, column_names=None,df_num=None):        
        if df_num is None:
            df_num = len(global_df_lst)
        if column_names is None:
            column_names = self._columns_dtype_dict[np.dtype(object)].values

        self._working_columns = column_names
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx])
        column_name_list = parallel_starmap_p(self._get_factor_values, param_lst)
        column_name_list = list(column_name_list)

        assert(len(column_name_list)==df_num)  
        print("Reduce List")
        column_info_lst = self._reduce_lst(column_name_list, axis=1, method='union')
        return self._working_columns, column_info_lst
    
    @property
    def columns(self):
        return self._columns
    
    ########################################################
    def _set_bool_type_for_column(self, df_idx):
        for name in self._working_columns:
            #if(df_idx == 0):
            #    print("Column: {} Set to Bool".format(name))
            tmp_df = global_df_lst[df_idx][name].fillna(0)
            values = tmp_df.values.tolist()
            for idx, value in enumerate(values):
                if value != 0:
                    values[idx] = 1
            global_df_lst[df_idx][name] = values
            
            
    def set_column_to_bool(self, column_names=None,df_num=None):  
        if df_num is None:
            df_num = len(global_df_lst)
        if column_names is None:
            print("column_names is None")
            return None
     
        self._working_columns = column_names
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx])
        parallel_starmap_t(self._set_bool_type_for_column, param_lst)   
        
    def _set_factor_value_for_column(self, df_idx, value_dict):
        for name in self._working_columns:
            #if(df_idx == 0):
            #    print("Column: {} Set to Factor Value".format(name))
            tmp_df = global_df_lst[df_idx][name].fillna(self._factor_unknown)
            values = tmp_df.values.tolist()
            for idx, value in enumerate(values):
                try:
                    tmp_value = str(int(float(value)))
                    values[idx] = value_dict[name].get(tmp_value)
                except ValueError:
                    values[idx] = value_dict[name].get(value)
                if values[idx] is None:
                    print("Column {} Value {} not defined".format(name, str(value)))
                    values[idx] = value_dict[name][self._factor_unknown]
                    value_dict[name][value] = value_dict[name][self._factor_unknown]
            global_df_lst[df_idx][name] = values
    
    def set_column_to_factor(self, column_names=None,df_num=None, value_dict=None):  
        if df_num is None:
            df_num = len(global_df_lst)
        if column_names is None:
            print("column_names is None")
            return None
        if value_dict is None:
            print("value_dict is None")
            return None
     
        self._working_columns = column_names
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx, value_dict])
        parallel_starmap_t(self._set_factor_value_for_column, param_lst)   
        
    def _drop_columns(self, df_idx):
        global_df_lst[df_idx] = global_df_lst[df_idx].drop(self._working_columns, axis=1)
    
    def drop_columns(self, column_names=None,df_num=None):  
        if df_num is None:
            df_num = len(global_df_lst)
        if column_names is None:
            print("column_names is None")
            return None
     
        self._working_columns = column_names
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx])
        parallel_starmap_t(self._drop_columns, param_lst)   
        
    def _compare_two_columns(self, df_idx, src_column, target_column):
        rst = np.where(global_df_lst[df_idx][src_column] == global_df_lst[df_idx][target_column], True, False)
        return rst.all()
    
    def compare_two_columns(self, src_column, target_column, df_num=None):
        if df_num is None:
            df_num = len(global_df_lst)
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx, src_column, target_column])
        column_rst_list = parallel_starmap_p(self._compare_two_columns, param_lst)   
        column_rst_list = list(column_rst_list)
        return np.array(column_rst_list).all()
    
    def _transfer_datetime_to_int(self, df_idx, datetime_format, basetime, scale, prefix):
        print_flag = 1
        if prefix is None:
            prefix = ''
        for name in self._working_columns:
            if(df_idx == 0):
                print("Transfer Time on: {}".format(name))
            tmp_df = global_df_lst[df_idx][name].fillna(0)
            values = tmp_df.values.tolist()
            for idx, value in enumerate(values):
                if value == '0':
                    values[idx] = 0
                    continue
                if value != 0:
                    # Remove in future
                    try:
                        tmp_value = str(int(value))
                    except ValueError:
                        tmp_value = str(value)
                    if tmp_value == '' or tmp_value.find(' ')==0:
                        values[idx] = 0
                        continue
                    tmp_value=prefix+tmp_value  

                    try:
                        tmp_dateitme = datetime.datetime.strptime(tmp_value, datetime_format)
                    except Exception as e:
                        print("Error while transfer string to datatime in column {}, line {} value {}. Reason: {}".format(name, idx, value, str(e)))
                        values[idx] = 0
                        continue
                    
                    if scale=="Day":
                        values[idx] = (tmp_dateitme - basetime).days
                    elif scale=="Hour":
                        values[idx] = (tmp_dateitme - basetime).days*24 + int((tmp_dateitme - basetime).seconds/3600)
                    elif scale=="Minute":
                        values[idx] = (tmp_dateitme - basetime).days*24*60 + int((tmp_dateitme - basetime).seconds/60)
                    elif scale=="Second":
                        values[idx] = (tmp_dateitme - basetime).total_seconds
                    else:
                        print("Unsupported scale {}".format(scale))
                        return None
            if(df_idx == 0):
                print(values[:10])
            global_df_lst[df_idx][name] = values
            
        
    def transfer_datetime_to_int(self, column_names=None, datetime_format=None, basetime=None, scale=None, df_num=None, prefix=None):
        if df_num is None:
            df_num = len(global_df_lst)
        if column_names is None:
            print("column_names is None")
            return None
        if basetime is None or type(basetime).__name__ != 'datetime':
            print("basetime is None or format is not datetime")
            return None
        if datetime_format is None:
            print("datetime_format is None")
            return None
        if scale is None or scale not in ["Day", "Hour", "Minute", "Second"]:
            print("scale is None or value is not right")
            return None
        self._working_columns = column_names
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx, datetime_format, basetime, scale, prefix])
        parallel_starmap_t(self._transfer_datetime_to_int, param_lst)   
        
    def _update_value_difference_by_columns(self, df_num, refer_column_name, new_column_names=None):
        if new_column_names is None:
            for name in self._working_columns:
                #if(df_idx == 0):
                #    print("Transfer Time on: {} Set to Factor Value".format(name))
                global_df_lst[df_idx][name] = global_df_lst[df_idx][name] - global_df_lst[df_idx][refer_column_name]
        else:
            for idx, name in self._working_columns:
                #if(df_idx == 0):
                #    print("Transfer Time on: {} Set to Factor Value".format(name))
                global_df_lst[df_idx][new_column_names[idx]] = global_df_lst[df_idx][name] - global_df_lst[df_idx][refer_column_name]
        
    def update_value_difference_by_columns(self, refer_column_name, columns_names, new_column_names=None, df_num=None):
        if df_num is None:
            df_num = len(global_df_lst)
        if column_names is None:
            print("column_names is None")
            return None
        if type(refer_column_name).__name__ != 'str':
            print("refer_column_name type error")
            return None
        if new_column_names is not None and len(new_column_names) != len(columns_names):
            print("new_column_names param is not correct")
            return None
        
        self._working_columns = column_names
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx, refer_column_name, new_column_names])
        parallel_starmap_t(self._update_value_difference_by_columns, param_lst)   
        
    def _fill_value_to_nan_cell(self, df_idx, value): 
        for name in self._working_columns:
            if df_idx==0:
                print("Filling column {}".format(name))
            global_df_lst[df_idx][name] = global_df_lst[df_idx][name].fillna(value)
    
    def fill_value_to_nan_cell(self, column_names=None, value=0, df_num=None):
        if df_num is None:
            df_num = len(global_df_lst)
        if column_names is None:
            column_names = self._columns
        self._working_columns = column_names
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx, value])
        parallel_starmap_t(self._fill_value_to_nan_cell, param_lst)   
        
    def fill_fact_map_dict(self, column_names=None, factor_value=None):
        if column_names is None or factor_value is None:
            print("Wrong input parameter")
            return None
        if len(column_names) != len(factor_value):
            print("Column name list length {} not equate to factor value list length {}".format(len(column_names),len(factor_value)))
            return None
        exist_column_dict_name = self._factor_map_dict.keys()
        if len(exist_column_dict_name) == 0:
            new_column_lst = column_names
        else:
            new_column_lst = list(set(column_names).difference(set(exist_column_dict_name)))
        new_column_lst = list(map(lambda s: column_names.index(s), new_column_lst))
        for column_idx in new_column_lst:
            information =  list(factor_value[column_idx])
            tmp_lst = []
            for v in information:
                if v == ' ' or v =='':
                    continue
                try:
                    tmp_lst.append(str(int(float(v))))
                except:
                    tmp_lst.append(str(v))
            information = list(set(tmp_lst))
            del information[information.index(self._factor_unknown)]
            self._factor_map_dict[column_names[column_idx]][self._factor_unknown] = 0
            information.sort()
            for idx, key in enumerate(information):
                self._factor_map_dict[column_names[column_idx]][key] = idx+1
        exit_column_lst = set(column_names).intersection(set(exist_column_dict_name))
        exit_column_lst = list(map(lambda s: column_names.index(s), exit_column_lst))
        for column_idx in exit_column_lst:
            information =  list(factor_value[column_idx])
            tmp_lst = []
            for v in information:
                if v == ' ' or v =='':
                    continue
                try:
                    tmp_lst.append(str(int(float(v))))
                except:
                    tmp_lst.append(str(v))
            information = list(set(tmp_lst))
            information.sort()
            current_num = max(list(self._factor_map_dict[column_names[column_idx]].values())) + 1
            for idx, key in enumerate(information):
                if self._factor_map_dict[column_names[column_idx]].get(key) is None:
                    self._factor_map_dict[column_names[column_idx]][key] = current_num
                    current_num += 1
        return self._factor_map_dict
    
    def _get_df_by_value_lst(self, df_idx, column_name, reverse_flag, values):
        print("Process get_df_by_value. df_idx {}".format(df_idx))
        if reverse_flag:
            return global_df_lst[df_idx][~global_df_lst[df_idx][column_name].isin(values)]
        else:
            return global_df_lst[df_idx][global_df_lst[df_idx][column_name].isin(values)]
        
        
    def get_df_by_value_lst(self, column_name=None, values=None, reverse_flag=False, df_num=None, concat_flag=True):
        if column_name is None or type(column_name).__name__ != 'str':
            print("Wrong column_name parameter")
            return None
        if values is None or type(values).__name__ != 'list':
            print("Wrong values parameter")
            return None
        if df_num is None:
            df_num = len(global_df_lst)
        param_lst = []
        for idx in range(df_num):
            param_lst.append([idx, column_name, reverse_flag, values])
        df_lst = parallel_starmap_t(self._get_df_by_value_lst, param_lst)   
        df_lst = list(df_lst)
        return_df = None
        for df in df_lst:
            if not df.empty:
                if concat_flag:
                    if return_df is None:
                        return_df = df
                    else:
                        return_df = pd.concat([return_df,df],ignore_index=True)
                else:
                    if return_df is None:
                        return_df = [df]
                    else:
                        return_df.append(df)
        return return_df


In [31]:
test_read_csv = CSV2DF()
global_df_lst = test_read_csv.read_content("csv201803_clear_2.csv")
clean_df = DF_CLEAN()
missing_columns = clean_df.check_missing_value_columns()
print("Missing:\n{}".format(missing_columns))
print("Type of Columns: {}".format(list(clean_df._columns_dtype_dict.keys())))
print(clean_df._columns_dtype_dict[np.dtype(object)])

Start Read: Coder:ascii
Start Batch Read
Batch Read Done
Start Batch Read
Batch Read Done
Start Batch Read
4050000 - 50000 may cause out of range for file csv201803_clear_2.csv. Reason(No columns to parse from file)
4000000 - 50000 may cause out of range for file csv201803_clear_2.csv. Reason(No columns to parse from file)
4150000 - 50000 may cause out of range for file csv201803_clear_2.csv. Reason(No columns to parse from file)
3900000 - 50000 may cause out of range for file csv201803_clear_2.csv. Reason(No columns to parse from file)
3750000 - 50000 may cause out of range for file csv201803_clear_2.csv. Reason(No columns to parse from file)
4100000 - 50000 may cause out of range for file csv201803_clear_2.csv. Reason(No columns to parse from file)
3850000 - 50000 may cause out of range for file csv201803_clear_2.csv. Reason(No columns to parse from file)
3650000 - 50000 may cause out of range for file csv201803_clear_2.csv. Reason(No columns to parse from file)
3300000 - 50000 may c

In [32]:
label_df = pd.read_csv("label.csv")

In [33]:
month_type_11 = label_df.loc[label_df['month']==201803]


In [34]:
clean_df = DF_CLEAN()
month_type_11_uuid = list(month_type_11['UUID'].values)
print(len(month_type_11_uuid))
rst = clean_df.get_df_by_value_lst(column_name='UUID', values=month_type_11_uuid)

597
Process get_df_by_value. df_idx 0Process get_df_by_value. df_idx 1
Process get_df_by_value. df_idx 2
Process get_df_by_value. df_idx 3
Process get_df_by_value. df_idx 4
Process get_df_by_value. df_idx 5

Process get_df_by_value. df_idx 6Process get_df_by_value. df_idx 7

Process get_df_by_value. df_idx 8
Process get_df_by_value. df_idx 9
Process get_df_by_value. df_idx 10
Process get_df_by_value. df_idx 11
Process get_df_by_value. df_idx 12
Process get_df_by_value. df_idx 13
Process get_df_by_value. df_idx 14
Process get_df_by_value. df_idx 15
Process get_df_by_value. df_idx 16
Process get_df_by_value. df_idx 17
Process get_df_by_value. df_idx 18
Process get_df_by_value. df_idx 19
Process get_df_by_value. df_idx 20
Process get_df_by_value. df_idx 21
Process get_df_by_value. df_idx 22
Process get_df_by_value. df_idx 23
Process get_df_by_value. df_idx 24
Process get_df_by_value. df_idx 25
Process get_df_by_value. df_idx 26
Process get_df_by_value. df_idx 27
Process get_df_by_value. d

In [35]:
label_id = [1]*len(rst.index)
rst['label'] = label_id

In [36]:
np.random.seed(133)
rst_idx = []
while len(rst_idx) < 100:
    size = 100 - len(rst_idx)
    rst_idx.extend(list(np.random.randint(len(rst.index), size=(size))))
    rst_idx = list(set(rst_idx))
test_df = pd.DataFrame(rst, index=rst_idx)

In [37]:
test_df.to_csv("csv201803_test_type1.csv", index=False, encoding='utf-8')

In [38]:
train_idx = list(set(range(len(rst.index))).difference(set(rst_idx)))
print(len(train_idx))
train_df = pd.DataFrame(rst, index=train_idx)
train_df.to_csv("csv201803_train_type1.csv", index=False, encoding='utf-8')

497


In [39]:
rst = clean_df.get_df_by_value_lst(column_name='UUID', values=month_type_11_uuid, reverse_flag=True, concat_flag=False)

Process get_df_by_value. df_idx 0
Process get_df_by_value. df_idx 1
Process get_df_by_value. df_idx 2
Process get_df_by_value. df_idx 3
Process get_df_by_value. df_idx 4
Process get_df_by_value. df_idx 5
Process get_df_by_value. df_idx 6
Process get_df_by_value. df_idx 7
Process get_df_by_value. df_idx 8
Process get_df_by_value. df_idx 9
Process get_df_by_value. df_idx 10
Process get_df_by_value. df_idx 11
Process get_df_by_value. df_idx 12
Process get_df_by_value. df_idx 13
Process get_df_by_value. df_idx 14
Process get_df_by_value. df_idx 15
Process get_df_by_value. df_idx 16
Process get_df_by_value. df_idx 17
Process get_df_by_value. df_idx 18
Process get_df_by_value. df_idx 19
Process get_df_by_value. df_idx 20
Process get_df_by_value. df_idx 21
Process get_df_by_value. df_idx 22
Process get_df_by_value. df_idx 23
Process get_df_by_value. df_idx 24Process get_df_by_value. df_idx 25

Process get_df_by_value. df_idx 26
Process get_df_by_value. df_idx 27
Process get_df_by_value. df_id

In [41]:
total = 0
for df in rst:
    #print(len(df.index))
    total += len(df.index)
print(total)

2928242


In [42]:
with open("csv201803_train.csv", "w") as f:
    train_df.to_csv(f, mode="w", index=False, header=True)     
    each_df_times = 4
    each_sample_num = len(train_df.index)*5
    for cnt, df in enumerate(rst):
        print("Write df {}".format(cnt))
        label_id = [0]*len(df.index)
        df['label'] = label_id
        rst_idx = []
        while len(rst_idx) < each_sample_num*each_df_times:
            size = each_sample_num*each_df_times - len(rst_idx)
            rst_idx.extend(list(np.random.randint(len(df.index), size=(size))))
            rst_idx = list(set(rst_idx))
        window_size = int(round(len(rst_idx)/each_df_times))
        start_point = 0
        for idx in range(each_df_times):
            tmp_df = pd.DataFrame(df, index=rst_idx[start_point:min(start_point+window_size, len(rst_idx))])
            tmp_df.to_csv(f, mode="a", index=False, header=False)
            train_df.to_csv(f, mode="a", index=False, header=False)     
            start_point += window_size
            if start_point >= len(rst_idx):
                break
    

Write df 0


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  


Write df 1
Write df 2
Write df 3
Write df 4
Write df 5
Write df 6
Write df 7
Write df 8
Write df 9
Write df 10
Write df 11
Write df 12
Write df 13
Write df 14
Write df 15
Write df 16
Write df 17
Write df 18
Write df 19
Write df 20
Write df 21
Write df 22
Write df 23
Write df 24
Write df 25
Write df 26
Write df 27
Write df 28
Write df 29
Write df 30
Write df 31
Write df 32
Write df 33
Write df 34
Write df 35
Write df 36
Write df 37
Write df 38
Write df 39
Write df 40
Write df 41
Write df 42
Write df 43
Write df 44
Write df 45
Write df 46
Write df 47
Write df 48
Write df 49
Write df 50
Write df 51
Write df 52
Write df 53
Write df 54
Write df 55
Write df 56
Write df 57
Write df 58


In [43]:
with open("csv201803_test.csv", "w") as f:
    test_df.to_csv(f, mode="w", index=False, header=True)     
    each_df_times = 1
    each_sample_num = 100
    for cnt, df in enumerate(rst):
        print("Write df {}".format(cnt))
        rst_idx = []
        while len(rst_idx) < 100:
            size = 100 - len(rst_idx)
            rst_idx.extend(list(np.random.randint(len(df.index), size=(size))))
            rst_idx = list(set(rst_idx))

        tmp_df = pd.DataFrame(df, index=rst_idx)
        tmp_df.to_csv(f, mode="a", index=False, header=False)

Write df 0
Write df 1
Write df 2
Write df 3
Write df 4
Write df 5
Write df 6
Write df 7
Write df 8
Write df 9
Write df 10
Write df 11
Write df 12
Write df 13
Write df 14
Write df 15
Write df 16
Write df 17
Write df 18
Write df 19
Write df 20
Write df 21
Write df 22
Write df 23
Write df 24
Write df 25
Write df 26
Write df 27
Write df 28
Write df 29
Write df 30
Write df 31
Write df 32
Write df 33
Write df 34
Write df 35
Write df 36
Write df 37
Write df 38
Write df 39
Write df 40
Write df 41
Write df 42
Write df 43
Write df 44
Write df 45
Write df 46
Write df 47
Write df 48
Write df 49
Write df 50
Write df 51
Write df 52
Write df 53
Write df 54
Write df 55
Write df 56
Write df 57
Write df 58


In [27]:
print(test_df)

                                 UUID  OUUID  ORD_NO   CLR_DT  RE_BUS_CNL  \
1    97d1edeef744463da8ed8c3dc382b32d      0       0  30240.0           2   
5    dd39a6fd79e44fe2a355ba1371ea4a8b      0       0  31680.0           2   
520  0f6843a9e4eb4f378b9822c0df2a0744      0       0  33120.0           2   
17   d9cd34c1003a4d00b4f705e9442d85a6      0       0  33120.0           2   
25   d382f0af9c304736b52e7279d4b5db1d      0       0  33120.0           2   
31   b88ca558e1834d57a4bead7351817732      0       0  31680.0           2   
35   4d328f4068fe4e649f51eaf0cc2e0259      0       0  31680.0           2   
40   9c9e4daf7a014863a6ee77d47713fca6      0       0  33120.0           2   
47   74bd8aec0560432a9af21fbbbe132f33      0       0  33120.0           2   
55   5d38fbcc22474c6faafcf7a40204f9e6      0       0  31680.0           2   
56   1064557857e84dbca2cb008fca2e5f52      0       0  31680.0           2   
59   8bc353340ea646a8844f159ccb055275      0       0  31680.0           2   