In [None]:
# --------------------------
import multiprocessing as mp
import os
import pandas as pd
import subprocess
import numpy as np
pd.set_option('display.max_row', 1000)
# --------------------------

# -------------------------
# infile = '16_new.csv'
infile = '/Users/l.sergeev/Downloads/old_experiment_4_raw.csv'

columns_2 = [
    'subscription_type'
#    ,'progress'
    ,'from'
#    ,'track_duration'
#    ,'duration'
    ,'progress'
    ,'track_id'
]

chunk_size = 10_000

autoclean = True
verbose = False
# -------------------------

In [None]:
data = pd.read_csv(infile,nrows=10)
date_file = data['dtEvent'][0]
event_file = data['eventName'][0]
version = "5.1"

In [None]:
outfile = '/Users/l.sergeev/Downloads/result_android' + '_' + event_file + '_' + date_file + '.csv'
outfile

In [None]:
# ------------------------------------------------------------------------------------
try:
    f = open(infile)
except IOError:
    print('Unable to open file ' + "'" + infile + "'")
    exit(0)

wc_l = subprocess.run(["wc", "-l", infile], stdout=subprocess.PIPE, universal_newlines=True)
total_rows = int(wc_l.stdout.split()[0])  # определяем количество строк во входном csv
total_rows -= 1  # без заголовка

print('total_rows:', '{0:,}'.format(total_rows).replace(',', '_'))

n_procs = mp.cpu_count()  # определяем число доступных ядер
print('n_procs:', n_procs, end='\n\n')
# ------------------------------------------------------------------------------------


# ---------------------------------------------
def str_to_list(s):
    return s[1:-1].replace('\'', '').split(',')
# ---------------------------------------------


# -------------------------------------------------------
def remove_file(filename, no_warn=False):
    if os.path.exists(filename):
        os.remove(filename)
    elif not no_warn:
        print('File \'' + filename + '\' does not exist')
# -------------------------------------------------------


class Timer:
    import time

    def __init__(self):
        self.t1 = self.t2 = 0
        self.dt = 0

    def start(self):
        self.t1 = self.time.perf_counter()

    def stop(self):
        self.t2 = self.time.perf_counter()
        self.dt += self.t2 - self.t1

    def reset(self):
        self.__init__()

    def __str__(self):
        return f'{self.dt:.3f} s'


def process_chunk(proc_id, n_procs, total_rows, chunk_size, infile, outfile):
    # удалить старые выходные файлы каждого процесса, если таковые имеются
    # result_0.csv, result_1.csv, и т.д.
    remove_file(outfile.split('.csv')[0] + '_' + str(proc_id) + '.csv', no_warn=True)

    # кол-во строк на обработку proc_id-м процессом
    n_rows_per_process = total_rows // n_procs

    # с какой строки обрабатывать
    start_row = n_rows_per_process * proc_id

    if proc_id == n_procs - 1:
        n_rows_per_process += total_rows % n_procs

    if proc_id != 0:
        start_row += 1

    if verbose:
        print('process ', proc_id, ': processing ', n_rows_per_process,
              ' rows [', start_row + int(proc_id == 0), ',', start_row + n_rows_per_process - int(proc_id != 0), ']', sep='')

    chunker = pd.read_csv(
        infile,
        skiprows=range(1, start_row),
        nrows=n_rows_per_process,
        chunksize=chunk_size,
    )

    proc_outfile = outfile.split('.csv')[0] + '_' + str(proc_id) + '.csv'

    if verbose:
        processed = 0

    # 0-й процесс пишет в result.csv новый header
    if proc_id == 0:
        hdr = pd.read_csv(infile, nrows=0)
        del hdr['params.name']
        del hdr['params.value']
        for field in columns_2:
            hdr[field] = []
        hdr.to_csv(outfile, index=None)

    for chunk in chunker:
        chunk = chunk.loc[chunk.idAppVersionTitle.str.contains(version)]

        columns_1 = list(chunk.columns)
        columns_1.remove('params.name')
        columns_1.remove('params.value')

        df = pd.DataFrame(chunk[columns_1], columns=columns_1)

        to_insert = {k: [] for k in columns_2}

        for ind, row in chunk.iterrows():
            # ---------------------------------------
            if verbose:
                TIME['to_list'].start()

            names = str_to_list(row['params.name'])
            values = str_to_list(row['params.value'])

            if verbose:
                TIME['to_list'].stop()
            # ---------------------------------------

            # -----------------------------------------
            if verbose:
                TIME['dict'].start()

            names_ind = {}
            for k, v in enumerate(names):
                names_ind[v] = k

            indices = [names_ind[k] for k in columns_2]

            if verbose:
                TIME['dict'].stop()
            # -----------------------------------------

            for t in columns_2:
                if t in names_ind:
                    i = names_ind[t]
                    to_insert[t].append(values[i])

        # -------------------------
        if verbose:
            TIME['merge'].start()

        for _ in columns_2:
            df[_] = to_insert[_]

        if verbose:
            TIME['merge'].stop()
        # -------------------------

        if verbose:
            processed += len(df.index)
            print('process ', proc_id, ': ', processed, ' rows processed', sep='')

        df.to_csv(proc_outfile, mode='a', header=False, index=False)

    if verbose:
        print('process ', proc_id, ': FINISHED', sep='', end='\t[')
        print('TIME:', end=' ')
        for k, v in TIME.items():
            print(k, '-', v, end=', ')
        print('\b\b]')


# --------------------------------------------------------------------------------------------------------
remove_file(outfile, no_warn=True)

TIME = {}
for field in ['to_list', 'dict', 'merge']:
    TIME[field] = Timer()

processes = []  # массив процессов

for proc_id in range(n_procs):
    p = mp.Process(target=process_chunk, args=(proc_id, n_procs, total_rows, chunk_size, infile, outfile))
    processes.append(p)

# запускаем процессы в обратном порядке,
# т.к. последним потребуется скипать строки при чтении csv
for p in processes[::-1]:
    p.start()

for proc_id in range(len(processes)):
    # дожидаемся очередного процесса
    processes[proc_id].join()

    # дописываем его выходной файл в конец итогового файла result.csv (мерджим)
    os.system('cat ' + outfile.split('.csv')[0] + '_' + str(proc_id) + '.csv >> ' + outfile)

    if autoclean:
        remove_file(outfile.split('.csv')[0] + '_' + str(proc_id) + '.csv')

print('Finished !')
# --------------------------------------------------------------------------------------------------------
