In [1]:
import pandas as pd
import numpy as np
from datetime import datetime
import os

### Разведка 

In [5]:
test_chunk = 10
test_doc_Density = os.path.join('F:\\Projects\\Roman\\New\\02_Density_1_201910.csv.gz')
test_doc_CDensity = os.path.join('F:\\Projects\\Roman\\New\\02_CDensity_1_201910.csv.gz')

In [19]:
print ('\n\n==== 1. DENSITY ====\n\n')
for chunk in pd.read_csv(test_doc_Density, chunksize=test_chunk, delimiter=';'):
    print ('Columns: %s' % chunk.columns)
    print ('Data sample:')
    print (chunk)
    break
    
print ('\n\n==== 2. CDENSITY ====\n\n')
for chunk in pd.read_csv(test_doc_CDensity, chunksize=test_chunk, delimiter=';'):
    print ('Columns: %s' % chunk.columns)
    print ('Data sample:')
    print (chunk)
    break



==== 1. DENSITY ====


Columns: Index(['ts', 'zid', 'home_zid', 'customers_cnt_total', 'customers_cnt_long',
       'customers_cnt_work', 'customers_cnt_loc', 'customers_cnt_long_work',
       'customers_cnt_long_loc', 'customers_cnt_transit'],
      dtype='object')
Data sample:
                 ts  zid  home_zid  customers_cnt_total  customers_cnt_long  \
0  2019.10.31 11:30  131       193                   22                  17   
1  2019.10.31 11:30  197       193                    3                   1   
2  2019.10.31 11:30  197       168                    1                   1   
3  2019.10.31 11:30  197        91                    4                   4   
4  2019.10.31 11:30  197       405                    2                   1   
5  2019.10.12 08:00  197       193                   11                   5   
6  2019.10.12 08:00  197       168                    3                   2   
7  2019.10.12 08:00  197        91                    4                   4   
8  2019

### Утилиты для работы с данными

In [62]:
# Основная фильтрация - по колонкам, времени, zid, home_zid
def basic_filtrator(csv_gz_files, output_csv_file=None, columns=None, time_stamps=None, zid_list=None, zid_max=None, home_zid_list=None, home_zid_max=None, chunk_size=10000, chunk_message_step=5000, start_chunk=0, stop_iteration=None, mode='unite'):
    start_time = datetime.now()
    if columns is None:
            for chunk in pd.read_csv(csv_gz_files[0], chunksize=1, delimiter=';'):
                columns = chunk.columns
                break

    new_df = pd.DataFrame(columns=columns)
    l = 0
    for csv_gz_file in csv_gz_files:
        print ('=== Start basic filtration for doc %s ===\n\n' % csv_gz_file)
        
        i = 0
        for chunk in pd.read_csv(csv_gz_file, chunksize=chunk_size, delimiter=';', usecols=columns):
            if i < start_chunk:
                i+=1
                continue
                
            if (i % chunk_message_step) == 0:
                print ('%s | PROCESSING CHUNK #%s (row #%s) | Current df rows number: %s \n' % (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i, i*chunk_size, new_df.shape[0]))

            filtered_chunk = chunk.copy()
            if not time_stamps is None:
                filtered_chunk = filtered_chunk[filtered_chunk.ts.isin(time_stamps)]
            if not zid_max is None:
                filtered_chunk = filtered_chunk[filtered_chunk.zid <= zid_max]
            elif not zid_list is None:
                filtered_chunk = filtered_chunk[filtered_chunk.zid.isin(zid_list)]
                
            if not home_zid_max is None:
                filtered_chunk = filtered_chunk[filtered_chunk.home_zid <= home_zid_max]
            elif not home_zid_list is None:
                filtered_chunk = filtered_chunk[filtered_chunk.home_zid.isin(home_zid_list)]
            
            if mode == 'unite':
                new_df = new_df.append(filtered_chunk)
            if mode == 'single_files':
                if filtered_chunk.shape[0] > 0:
                    current_name = 'current_work_%s.csv' % l
                    filtered_chunk.to_csv(current_name, sep=';', index=False)
                    l+=1
            i+=1
            
            if not stop_iteration is None:
                if i > stop_iteration:
                    break
            
    if mode == 'unite':
        if output_csv_file is None:
            end_time = datetime.now()
            delta = (end_time - start_time).seconds
            print ('ENDED IN %s seconds' % delta)
            return new_df
        else:
            new_df.to_csv(output_csv_file,sep=';',index=False)
            end_time = datetime.now()
            delta = (end_time - start_time).seconds
            print ('ENDED IN %s seconds' % delta)

In [69]:
csv_gz_file = 'F:\\Projects\\Roman\\New\\02_Density_1_201910.csv.gz'
columns = ['ts','zid','home_zid', 'customers_cnt_total']
time_stamps = ['2019.10.31 11:30', '2019.10.31 11:35']
zid_list = [3]
home_zid_list = None
output_csv_file = 'F:\\Projects\\Roman\\New\\test_output.csv'
stop_iteration = 10

a = basic_filtrator(csv_gz_file = csv_gz_file,
                output_csv_file = output_csv_file,
                columns = columns,
                time_stamps = time_stamps,
                zid_list = zid_list,
                home_zid_list = home_zid_list,
                stop_iteration=stop_iteration)

=== Start basic filtration for doc F:\Projects\Roman\New\02_Density_1_201910.csv.gz ===


PROCESSING CHUNK #0



In [34]:
# Получение таблицы по указанным zid и timestamps, с фильтрацией по указанным home_zid
def calculate_basic_sum_table(csv_gz_files, output_csv_file=None, statistics_column='customers_cnt_total', time_stamps=None, zid_list=None, zid_max=None, home_zid_list=None, home_zid_max=None, chunk_size=10000, chunk_message_step=5000, stop_iteration=None, mode='with_external'):
    # modes:
    # without_external - ignore home_zid=-1
    # with_external - include home_zid=-1
    # external_only - use only home_zid=-1
    
    start_time = datetime.now()
    basic_columns=['ts','zid','home_zid'], 

    # 1. Create empty structure table
    template_array = []
    for zid in zid_list:
        template_array.append([zid]+[0]*len(time_stamps))
    
    final_df = pd.DataFrame(template_array, columns=['zid']+time_stamps)
    
    # 2. go through data
    aggregated_rows = 0
    for csv_gz_file in csv_gz_files:
        print ('=== Start calculate_basic_sum_table for doc %s ===\n\n' % csv_gz_file)
        
        i = 0
        for chunk in pd.read_csv(csv_gz_file, chunksize=chunk_size, delimiter=';', usecols=['ts','zid','home_zid',statistics_column]):
            if (i % chunk_message_step) == 0:
                print ('%s | PROCESSING CHUNK #%s (row #%s) | Current aggregated rows number: %s \n' % (datetime.now().strftime('%Y-%m-%d %H:%M:%S'), i, i*chunk_size, aggregated_rows))

            filtered_chunk = chunk.copy()
            filtered_chunk = filtered_chunk[filtered_chunk.ts.isin(time_stamps)]
        
            if not zid_max is None:
                filtered_chunk = filtered_chunk[filtered_chunk.zid <= zid_max]
            elif not zid_list is None:
                filtered_chunk = filtered_chunk[filtered_chunk.zid.isin(zid_list)]
            
            if not home_zid_max is None:
                filtered_chunk = filtered_chunk[filtered_chunk.home_zid <= home_zid_max]
            elif not home_zid_list is None:
                filtered_chunk = filtered_chunk[filtered_chunk.home_zid.isin(home_zid_list)]
            
            if mode == 'without_external':
                filtered_chunk = filtered_chunk[filtered_chunk.home_zid != -1]
            if mode == 'external_only':
                filtered_chunk = filtered_chunk[filtered_chunk.home_zid == -1]
            
            
            if filtered_chunk.shape[0] > 0:
                print ('======')
                print (filtered_chunk)
                print ('======')
            
            aggregated_rows += filtered_chunk.shape[0]
            # count statistics immediately?
            filtered_grouped_chunk = filtered_chunk[['ts','zid',statistics_column]].groupby(['ts','zid']).sum()
            
            for index, row in filtered_grouped_chunk.iterrows():
                final_df.loc[((final_df['zid'] == index[1])),index[0]] += row.values[0]
            
            i+=1
            
            if not stop_iteration is None:
                if i > stop_iteration:
                    break
    
    if not (output_csv_file is None):
        final_df.to_csv(output_csv_file,sep=';',index=False)
        end_time = datetime.now()
        delta = (end_time - start_time).seconds
        print ('ENDED IN %s seconds. Aggregated %s rows' % (delta, aggregated_rows))
    else:
        end_time = datetime.now()
        delta = (end_time - start_time).seconds
        print ('ENDED IN %s seconds. Aggregated %s rows' % (delta, aggregated_rows))
        return final_df

In [35]:
import pandas as pd
import numpy as np
from datetime import datetime
import os

csv_gz_files = ['F:\\Projects\\Roman\\New\\02_Density_1_201910.csv.gz']
statistics_column = 'customers_cnt_total'
time_stamps = ['2019.10.23 02:00']
zid_list = [1]
home_zid_list = None
output_csv_file = 'F:\\Projects\\Roman\\New\\test_output2.csv'
stop_iteration = None
mode='external_only'

a = calculate_basic_sum_table(csv_gz_files = csv_gz_files,
                            output_csv_file = output_csv_file,
                            statistics_column = statistics_column,
                            time_stamps = time_stamps,
                            zid_list = zid_list,
                            home_zid_list = home_zid_list,
                            stop_iteration=stop_iteration,
                            mode=mode)

=== Start calculate_basic_sum_table for doc F:\Projects\Roman\New\02_Density_1_201910.csv.gz ===


2021-01-18 19:59:33 | PROCESSING CHUNK #0 (row #0) | Current aggregated rows number: 0 

                        ts  zid  home_zid  customers_cnt_total
22162551  2019.10.23 02:00    1        -1                 2927
2021-01-18 20:01:02 | PROCESSING CHUNK #5000 (row #50000000) | Current aggregated rows number: 1 



KeyboardInterrupt: 

In [20]:
final_df

NameError: name 'final_df' is not defined