In [1]:
import dask.dataframe as dd
import pandas as pd
import os
import shutil
import numpy as np


In [None]:
class SelectorData():
    
    source_feature = '' #путь до файла с фичами
    source_dataset = '' #путь до файла с id, vas_id, buy_time
    dest_dataset = '' #путь до файла результата
    
    cache_directory = '' #директория для временных файлов 
    
    use_cache = False
    is_train = True
    
    template_cache_files=''
    
    df_source_dataset = None
    df_dest_dataset = None
    
    def __init__( self, source_dataset, source_feature, dest_dataset, cache_directory, use_cache=False, is_train=True):
        self.source_dataset = source_dataset
        self.source_feature = source_feature
        self.dest_dataset = dest_dataset
        self.cache_directory = cache_directory
        self.use_cache = use_cache
        self.is_train = is_train
        self.template_cache_files = f'{self.cache_directory}/*_temp.csv'

        
    def __read_dataset(self):
        self.df_source_dataset = dd.read_csv(self.source_dataset, sep=',')
        self.df_source_dataset = self.df_source_dataset.rename(columns={'Unnamed: 0':'key_t'})
    
    def __read_feature(self):
        df_feats = dd.read_csv(self.source_feature, sep='\t')
        df_feats = df_feats.rename(columns={'Unnamed: 0':'key_f', 'buy_time':'feat_time'})
        return df_feats
        
    def file_index(self, i):
        return f'{i:03}'
    
    def __create_cache_directory(self):
        if not os.path.exists(self.cache_directory):
            os.mkdir(self.cache_directory)
        else:
            shutil.rmtree(self.cache_directory, ignore_errors=True)
            os.mkdir(self.cache_directory)            
            
    def __create_cache(self):
        if self.df_source_dataset is None:
            self.__read_dataset()
        df_feats =  self.__read_feature()
        df_result = dd.merge(self.df_source_dataset, df_feats, how='inner', left_on=['id'], right_on=['id'])
        self.__create_cache_directory()
        df_result.to_csv(self.template_cache_files , index=False, compute=True, name_function=self.file_index)
        del df_feats, df_result
        
    def __group_columns(self):
        if self.is_train:
            return ['key_t', 'id','vas_id','buy_time', 'target']
        else:
            return ['key_t', 'id','vas_id','buy_time']
        
    def __check_datasets(self):
        if (self.df_dest_dataset.shape[0]).compute() == (self.df_source_dataset.shape[0]).compute():
            print(f'Количество строк соответствует в датасетах')
            return True
        else:
            print(f'Количество строк НЕ СООТВЕТСВУЕТ в датасетах')
            return False
        
    def __merge_datasets(self):
        group_columns = self.__group_columns() #колонки слияний и группировок
        df = dd.read_csv(self.template_cache_files) #чтение временных Файлов соответствующих исходному датасету по id
        
        df['diff'] = np.abs(df['buy_time'] - df['feat_time']) # находим абсалютное отклонение даты предложения услуги и профиля
        
        #поиск минимального отклонения
        df_aggregation = df.groupby(group_columns)['diff'].min() 
        df = dd.merge(df, df_aggregation, on=group_columns, how='left', suffixes=('','_min'))
        df = df[df['diff']==df['diff_min']] 
        
        #оставляем только с минимальный отклонением или одну из двух минимальных дат, которая наступила раньше.
        df = dd.merge(self.df_source_dataset,df, on=group_columns, how='left')
        df_aggregation = df.groupby(group_columns)['diff_min'].count()
        df = dd.merge(df,df_aggregation, how='left', on=group_columns, suffixes=('','_count'))    
        df = df[np.logical_or((df['diff_min_count']==1),((df['buy_time']-df['feat_time'])>0))]
        self.df_dest_dataset = df.drop(columns=['diff_min', 'diff_min_count'], axis=1)
        del df , df_aggregation
        
    def transform(self, npartitions=1):
        if self.use_cache:
            self.__read_dataset()
        else:
            self.__create_cache()
        self.__merge_datasets()
        if self.__check_datasets():
            self.df_dest_dataset = self.df_dest_dataset.repartition(npartitions=npartitions)
            self.df_dest_dataset.to_csv(self.dest_dataset, index=False, compute=True, name_function=self.file_index)
            print(f'{self.dest_dataset} сохранен')
        return self.df_dest_dataset
 

In [48]:
SOURCE_FEATURES = 'features.csv'
SOURCE_DATASET = 'data_train.csv'
DEST_DATASET = 'train_dataset/*_train.csv'
CACHE_DIRECTORY = 'train_temp_csv'

cl_select_train = SelectorData(SOURCE_DATASET,SOURCE_FEATURES, DEST_DATASET, CACHE_DIRECTORY, use_cache=True)
%time df = cl_select_train.transform()

Количество строк соответствует в датасетах
train_dataset/*_train.csv сохранен
Wall time: 1h 8min 2s


In [None]:
del cl_select_train

In [6]:
SOURCE_FEATURES = 'features.csv'
SOURCE_DATASET = 'data_test.csv'
DEST_DATASET = 'test_dataset/*_test.csv'
CACHE_DIRECTORY = 'test_temp_csv'

cl_select_train = SelectorData(SOURCE_DATASET,SOURCE_FEATURES, DEST_DATASET, CACHE_DIRECTORY, is_train=False)
%time df=cl_select_train.transform()
df.head()

Количество строк соответствует в датасетах
test_dataset/*_test.csv сохранен
Wall time: 17min 27s


Unnamed: 0,key_t,id,vas_id,buy_time,key_f,feat_time,0,1,2,3,...,244,245,246,247,248,249,250,251,252,diff
0,116,406253,8.0,1546808400,2087733,1533502800,-94.679971,113.010888,-108.620786,60.403202,...,-315.770792,-25.996269,42.369552,-241.747724,-25.832889,-0.694428,-12.175933,-0.45614,0.0,13305600
1,199,2806263,2.0,1547413200,1176267,1541365200,349.810029,355.280888,335.869214,302.673202,...,-613.770792,-18.996269,-22.630448,-282.747724,-24.832889,-0.694428,-12.175933,-0.45614,1.0,6048000
2,329,3641829,5.0,1548018000,4129269,1534712400,26.060029,118.010888,12.119214,77.263202,...,-613.770792,-25.996269,-37.630448,-306.747724,-25.832889,-0.694428,-12.175933,-0.45614,0.0,13305600
3,706,631009,2.0,1546808400,3195637,1533502800,-96.799971,-111.569112,-110.740786,-164.176798,...,-613.770792,-25.996269,-33.630448,-269.747724,-24.832889,-0.694428,-11.175933,-0.45614,0.0,13305600
4,1524,3303517,1.0,1548018000,1198007,1544994000,26.080029,-81.229112,12.139214,-47.416798,...,2350.229208,-10.996269,-22.630448,5486.252276,13.167111,1.305572,-12.175933,-0.45614,0.0,3024000


In [7]:
import gc

del cl_select_train
gc.collect()

20