# Testing large csv table in compressed format

## read in chunks

In [1]:
import pandas as pd
import numpy as np

Checked externally, that the largest number of species id is 117,582,065 and the largest of fishid (named as wdpaid) 5,090,435. Therefore a `np.uint32` (0 to 4,294,967,295, four bytes) should be sufficient to hold the numbers. This reduces the default memory consumption by half (default is `np.int64`, which takes four bytes, range 0 to 18,446,744,073,709,551,615)

In [None]:
int_types = ["uint8", "int8", "int16", "uint16", "uint32", "int32", 'int64']
for it in int_types:
    print(np.iinfo(it))

The species lookup table contains information not needed. Trim

In [None]:
sis = pd.read_csv("sis_2017.csv")

columns=['id_no', 'kingdom', 'phylum', 'class', 'order_', 'family', 'genus', 'code']
sis_clean = pd.DataFrame(sis[columns])

sis_clean = sis_clean.astype(dtype={"id_no": "uint32",
                                    'kingdom': "category",
                                    'phylum': "category",
                                     'class': "category",
                                     'order_': "category",
                                     'family': "category",
                                     'genus': "category",
                                     'code': "category"})

Use the `id_no` as the index

In [None]:
sis_clean_index = sis_clean.set_index('id_no')

Load species in chunks, testing

In [None]:
species_2 = pd.read_csv("test_result2017_reso10_memo.csv.gz", chunksize=40000, 
                        skipinitialspace=True,
                        dtype={'ID_NO': np.uint32, 'WDPAID': np.uint32})

In [None]:
b1 = species_2.get_chunk()
b2 = species_2.get_chunk()

Merging two dataframes performance

In [None]:
%timeit b1.merge(sis_clean, left_on='ID_NO', right_on='id_no')

Use index join performance

In [None]:
%timeit b1.join(sis_clean_index, how='inner', on='ID_NO')

It appears that the index join method seems slightly quicker - proceed

In [None]:
# def analyse_chunk(df_chunk, species_attributes=sis_clean_index):
#     # join species attributes, which has id_no as index and filtering
#     df_chunk = df_chunk.join(species_attributes, how='inner', on='ID_NO')
#     # pivot and return
#     return df_chunk.pivot_table(index='WDPAID', columns='class', values='ID_NO', fill_value=0, aggfunc=np.count_nonzero)
    

`isin()` seems very fast

In [None]:
%timeit b1[b1.ID_NO.isin(sis_clean_index.index)]

In [None]:
%timeit b1.groupby('WDPAID').count()

In [None]:
bb = b1.groupby('WDPAID').count()

## Single machine version

In [1]:
import pandas as pd
import numpy as np

In [12]:
def analyse_chunk_mk2(df_chunk, species_filter):
    # find relevent species entries using the species_filter, which has id_no as index and filtering
    df_chunk = df_chunk[df_chunk.ID_NO.isin(species_filter.index)]
    # group and simple count
    return df_chunk.groupby('WDPAID').count()

def concat_chunk_results(df_chunks):
    # list of chunks to concat, 
    # more effcient to have a couple of dfs, than appending each df and sum
    df_chunk = pd.concat(df_chunks)
    return df_chunk.groupby('WDPAID').sum()

def chunk_worker(df_chunk, chunksize=40000):
    counter = 0
    
    result = []
    result.append(analyse_chunk_mk2(df_chunk))
        
    if counter % 10 == 0:
        print('Processed rows:{:,}'.format(counter*chunksize))
        result = [concat_chunk_results(result)]
        
    # ensure no un
    return concat_chunk_results(result)


Use the `id_no` as the index

In [16]:
def test_main():
    import time
    from datetime import timedelta
    start_time = time.time()
    chunksize = 4000000
    
    # read species attributes
    sis = pd.read_csv("sis_2017.csv")

    columns=['id_no', 'kingdom', 'phylum', 'class', 'order_', 'family', 'genus', 'code']
    sis_clean = pd.DataFrame(sis[columns])
    sis_clean = sis_clean.astype(dtype={"id_no": "uint32",
                                        'kingdom': "category",
                                        'phylum': "category",
                                         'class': "category",
                                         'order_': "category",
                                         'family': "category",
                                         'genus': "category",
                                         'code': "category"})
    
    sis_clean_index = sis_clean.set_index('id_no')

    # read result in chunk
    species = pd.read_csv("test_result2017_reso10_memo.csv.gz", chunksize=chunksize, 
                        skipinitialspace=True,
                        dtype={'ID_NO': np.uint32, 'WDPAID': np.uint32})
    
    # run
    result = []
    total_rows = 0
    counter = 0
    
    for df_chunk in species:
        
#         if counter == 101:
#             break
               
        result.append(analyse_chunk_mk2(df_chunk, sis_clean_index))
        
        if counter % 10 == 0:
            print('Processed rows: {:,} '.format(counter*chunksize))
            result = [concat_chunk_results(result)]
        
        if counter % 100 == 0:
            elapsed = time.time() - start_time
            print('Elapsed time: {}'.format(str(timedelta(seconds=elapsed))))
            print('Memory usage of the output: {:,.1f} KB \n---'.format(result[0].memory_usage().sum()/1024))
            
        counter += 1
        
    # final push
    result = concat_chunk_results(result)
    result.to_csv('output.csv')
    return result

In [14]:
# output = test_main()

Processed rows: 0 
Elapsed time: 0:00:03.385200
Memory usage of the output: 26,681.4 KB 
---
Processed rows: 40,000,000 
Processed rows: 80,000,000 
Processed rows: 120,000,000 
Processed rows: 160,000,000 
Processed rows: 200,000,000 
Processed rows: 240,000,000 
Processed rows: 280,000,000 
Processed rows: 320,000,000 
Processed rows: 360,000,000 
Processed rows: 400,000,000 
Elapsed time: 0:04:19.864800
Memory usage of the output: 77,621.3 KB 
---


In [17]:
# output = test_main()

Processed rows: 0 
Elapsed time: 0:00:03.260400
Memory usage of the output: 26,681.4 KB 
---
Processed rows: 40,000,000 
Processed rows: 80,000,000 
Processed rows: 120,000,000 
Processed rows: 160,000,000 
Processed rows: 200,000,000 
Processed rows: 240,000,000 
Processed rows: 280,000,000 
Processed rows: 320,000,000 
Processed rows: 360,000,000 
Processed rows: 400,000,000 
Elapsed time: 0:04:23.810600
Memory usage of the output: 77,621.3 KB 
---
Processed rows: 440,000,000 
Processed rows: 480,000,000 
Processed rows: 520,000,000 
Processed rows: 560,000,000 
Processed rows: 600,000,000 
Processed rows: 640,000,000 
Processed rows: 680,000,000 
Processed rows: 720,000,000 
Processed rows: 760,000,000 
Processed rows: 800,000,000 
Elapsed time: 0:08:45.023200
Memory usage of the output: 77,817.9 KB 
---
Processed rows: 840,000,000 
Processed rows: 880,000,000 
Processed rows: 920,000,000 
Processed rows: 960,000,000 
Processed rows: 1,000,000,000 
Processed rows: 1,040,000,000 
Pro

## Parallel computing

In [69]:
def worker(df_chunk):
    # find relevent species entries using the species_filter, which has id_no as index and filtering
    df_chunk = df_chunk[df_chunk.ID_NO.isin(sis_clean_index.index)]
    # group and simple count
    return df_chunk.groupby('WDPAID').count()


In [70]:
def test_main_parallel():
    import time
    from datetime import timedelta
    start_time = time.time()
    chunksize = 8000000
    
    # read species attributes
    sis = pd.read_csv("sis_2017.csv")

    columns=['id_no', 'kingdom', 'phylum', 'class', 'order_', 'family', 'genus', 'code']
    sis_clean = pd.DataFrame(sis[columns])
    sis_clean = sis_clean.astype(dtype={"id_no": "uint32",
                                        'kingdom': "category",
                                        'phylum': "category",
                                         'class': "category",
                                         'order_': "category",
                                         'family': "category",
                                         'genus': "category",
                                         'code': "category"})
        
    sis_clean_index = sis_clean.set_index('id_no')
    
    # set up parallel engines
    import ipyparallel as ipp
    c = ipp.Client()
    
    dview = c[:]
    dview.push(dict(sis_clean_index=sis_clean_index))

    # read result in chunk
    species = pd.read_csv("test_result2017_reso10_memo.csv.gz", chunksize=chunksize, 
                        skipinitialspace=True,
                        dtype={'ID_NO': np.uint32, 'WDPAID': np.uint32})
    
    # run
    result = []
    total_rows = 0
    counter = 0
    
    df_chunks = []
    
    for df_chunk in species:
        # debug
#         if counter == 101:                  
#             break
        
        if counter % 10 == 0:
            print('Processed rows: {:,} '.format(counter*chunksize))
        
        if counter % 100 == 0:
            elapsed = time.time() - start_time
            print('Elapsed time: {}'.format(str(timedelta(seconds=elapsed))))
#             print('Memory usage of the output: {:,.1f} KB \n---'.format(result[0].memory_usage().sum()/1024))
                    
        
        df_chunks.append(df_chunk)
        counter += 1
        
        if len(df_chunks) == 8:
            ar = dview.map_async(worker, df_chunks)
            # clear list
            df_chunks = []
            # once all workers have finished, concatenate result
            result = [concat_chunk_results(ar.get() + result)]
        else:
            continue
            

    # check if unprocessed chunks
    if df_chunks:
        ar = dview.map_async(worker, df_chunks)
        result = concat_chunk_results(ar.get() + result)
    else:
        result = result[0]
        
    # final push
    result.to_csv('output_parallel.csv')
    return result

The main bottleneck appears to be I/O. Most of the I/O occur in one process while other engine processes sit idle. Not seeing a massive performance boom due that may benefit from the increase of CPU cores or memory

In [13]:
# test_main_parallel()

Processed rows: 0 
Elapsed time: 0:00:03.057600
Processed rows: 80,000,000 
Processed rows: 160,000,000 
Processed rows: 240,000,000 
Processed rows: 320,000,000 
Processed rows: 400,000,000 
Processed rows: 480,000,000 
Processed rows: 560,000,000 
Processed rows: 640,000,000 
Processed rows: 720,000,000 
Processed rows: 800,000,000 
Elapsed time: 0:06:09.169200
Processed rows: 880,000,000 
Processed rows: 960,000,000 
Processed rows: 1,040,000,000 
Processed rows: 1,120,000,000 
Processed rows: 1,200,000,000 
Processed rows: 1,280,000,000 
Processed rows: 1,360,000,000 


Unnamed: 0_level_0,ID_NO
WDPAID,Unnamed: 1_level_1
61,1
62,1
63,1
64,1
65,1
66,1
67,1
68,1
69,1
70,1


In [20]:
whos

Variable               Type        Data/Info
--------------------------------------------
analyse_chunk_mk2      function    <function analyse_chunk_m<...>k2 at 0x000000000AE0CD90>
c                      Client      <ipyparallel.client.clien<...>ct at 0x000000000807BE80>
chunk_worker           function    <function chunk_worker at 0x000000000AFEBB70>
concat_chunk_results   function    <function concat_chunk_re<...>ts at 0x000000000AFEBAE8>
ipp                    module      <module 'ipyparallel' fro<...>pyparallel\\__init__.py'>
np                     module      <module 'numpy' from 'E:\<...>ges\\numpy\\__init__.py'>
pd                     module      <module 'pandas' from 'E:<...>es\\pandas\\__init__.py'>
test_main_parallel     function    <function test_main_paral<...>el at 0x00000000092F1950>
worker                 function    <function worker at 0x0000000008047950>


## Distributed + parallel

The idea is to have a combination of engines running on the local machine plus any machine that are visible in LAN

It appears significant overheads incur probably due to sending messages across the network

In [185]:
def test_main_parallel_distribute():
    import time
    from datetime import timedelta
    start_time = time.time()
    chunksize = 8_000_000
    
    # read species attributes
    sis = pd.read_csv("sis_2017.csv")

    columns=['id_no', 'kingdom', 'phylum', 'class', 'order_', 'family', 'genus', 'code']
    sis_clean = pd.DataFrame(sis[columns])
    sis_clean = sis_clean.astype(dtype={"id_no": "uint32",
                                        'kingdom': "category",
                                        'phylum': "category",
                                         'class': "category",
                                         'order_': "category",
                                         'family': "category",
                                         'genus': "category",
                                         'code': "category"})
        
    sis_clean_index = sis_clean.set_index('id_no')
    
    # worker functions

    
    # set up parallel engines
    import ipyparallel as ipp
    c = ipp.Client()
    n_chunks = len(c.ids)
    v = c[:]

    # read result in chunk
    species = pd.read_csv("test_result2017_reso10_memo.csv.gz", chunksize=chunksize, 
                        skipinitialspace=True,
                        dtype={'ID_NO': np.uint32, 'WDPAID': np.uint32})
    
    # run
    result = []
    total_rows = 0
    counter = 0
    
    
    
    df_chunks = []
    
    for df_chunk in species:
        # add chunk to df_chunks until its
        df_chunks.append(df_chunk)
        counter += 1
        
        # if
        if counter % n_chunks != 0:
            continue
            
        # map async
        amr = v.map(analyse_chunk_mk3, df_chunks)

        pending = set(amr.msg_ids)
        while pending:
            try:
                # check every 2 seconds to aggregate
                c.wait(pending, 2)
            except parallel.TimeoutError:
                # ignore timeouterrors, since they only mean that at least one isn't done
                pass

            # finished is the set of msg_ids that are complete
            finished = pending.difference(c.outstanding)

            # debug, if more than 100 chunks been processed
            counter += len(finished)
            print('Processed rows: {:,} '.format(counter*chunksize))
            elapsed = time.time() - start_time
            print('Elapsed time: {}'.format(str(timedelta(seconds=elapsed))))

            if counter > 100:
                break

            # update pending to exclude those that just finished
            pending = pending.difference(finished)

            # get results already available
            df_chunks = [c.get_result(msg_id) for msg_id in finished]

            result = [concat_chunk_results(df_chunks + result)]

    result[0].to_csv('output_parallel_dis.csv')
    return result[0]
        

In [181]:
from __future__ import print_function

import time, random

import ipyparallel as ipp

# create client & view
rc = ipp.Client()
print(rc.ids)
v = rc.load_balanced_view()

def sleep_here(count, t):
    """simple function that takes args, prints a short message, sleeps for a time, and returns the same args"""
    import time
    time.sleep(t*count)
    return count,t

amr = v.map(sleep_here, range(100), [ random.random() for i in range(100) ], chunksize=2)

pending = set(amr.msg_ids)
while pending:
    try:
        rc.wait(pending, 1e-3)
    except parallel.TimeoutError:
        # ignore timeouterrors, since they only mean that at least one isn't done
        pass
    # finished is the set of msg_ids that are complete
    finished = pending.difference(rc.outstanding)
    # update pending to exclude those that just finished
    pending = pending.difference(finished)
    for msg_id in finished:
        # we know these are done, so don't worry about blocking
        ar = rc.get_result(msg_id)
        print("job id {} finished on engine {}".format(msg_id, ar.engine_id))
        print("with stdout:")
        print('    ' + ar.stdout.replace('\n', '\n    ').rstrip())
        print("and results:")
        
        # note that each job in a map always returns a list of length chunksize
        # even if chunksize == 1
        for (count,t) in ar.get():
            print("  item {}: slept for {}".format(count, t))

[13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]


In [2]:
import ipyparallel as ipp
c = ipp.Client()
v = c[:]

In [3]:
c.ids, v

([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
 <DirectView [0, 1, 2, 3,...]>)

In [5]:
len(c.ids)

17

In [6]:
import pandas as pd
import numpy as np
species = pd.read_csv("test_result2017_reso10_memo.csv.gz", chunksize=200000, 
                        skipinitialspace=True,
                        dtype={'ID_NO': np.uint32, 'WDPAID': np.uint32})

In [16]:
chunk = species.get_chunk()

In [41]:
analyse_chunk_mk3(chunk)

Unnamed: 0_level_0,ID_NO
WDPAID,Unnamed: 1_level_1
927009,1
927010,1
927011,1
927012,1
927013,1
927014,1
927015,1
927016,1
927033,1
927034,1


In [34]:
ar = v.map_async(analyse_chunk_mk3, [chunk])

In [35]:
ar.ready()

True

In [7]:
def dummy(chunk):
    pass
    

Testing it out

In [95]:
v = c[:]

In [98]:
len(c.ids)

10

In [92]:
import pandas as pd
import numpy as np
import time
species = pd.read_csv("test_result2017_reso10_memo.csv.gz", chunksize=8000000, 
                        skipinitialspace=True,
                        dtype={'ID_NO': np.uint32, 'WDPAID': np.uint32})



chunks = []
counter = 0
for chunk in species:
    chunks.append(chunk)
    counter += 1
    if counter == 10:
        break

In [99]:
def concat_chunk_results(df_chunks):
    # list of chunks to concat, 
    # more effcient to have a couple of dfs, than appending each df and sum
    df_chunk = pd.concat(df_chunks)
    return df_chunk.groupby('WDPAID').sum()

def analyse_chunk_mk3(df_chunk):
#     df_chunk = df_chunk[df_chunk.ID_NO.isin(sis_clean_index.index)]
    return df_chunk.groupby('WDPAID').count()

In [104]:
def multi_version_dv():
    import ipyparallel as ipp
    c = ipp.Client()
    v = c[:]
    
    start = time.time()
    
    amr = v.map_async(analyse_chunk_mk3, chunks)
    result = concat_chunk_results(amr.get())
    
    print(time.time() - start)
    return result

def multi_version_load():
    import ipyparallel as ipp
    c = ipp.Client()
    v = c.load_balanced_view()
    
    start = time.time()
    
    amr = v.map_async(analyse_chunk_mk3, chunks)
    result = concat_chunk_results(amr.get())
    
    print(time.time() - start)
    return result

In [105]:
def single_version():
    start = time.time()
    
    result = concat_chunk_results(analyse_chunk_mk3(chunk) for chunk in chunks)
    
    print(time.time() - start)
    return result


In [107]:
multi_version_dv()

7.612799882888794


Unnamed: 0_level_0,ID_NO
WDPAID,Unnamed: 1_level_1
216938,1
216939,1
216940,1
216941,1
216942,1
216943,1
216944,1
216945,1
216946,1
216947,1


In [106]:
multi_version_load()

7.924800157546997


Unnamed: 0_level_0,ID_NO
WDPAID,Unnamed: 1_level_1
216938,1
216939,1
216940,1
216941,1
216942,1
216943,1
216944,1
216945,1
216946,1
216947,1


# TEST

In [34]:
def dummy(sleep_time):
    import time
    time.sleep(sleep_time/10.0)
    
    return sleep_time

import ipyparallel as ipp
c = ipp.Client()
v = c[:]

In [35]:
c.ids

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [44]:
amr = v.map_async(dummy, [1,2,3,4,5,6,7,8,9,2,3,5,6,7,10])

In [49]:
amr.ready()

True

In [52]:
amr.get()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 5, 6, 7, 10]

In [50]:
amr.msg_ids

['849c1a3a-c31532a26a147538bb325b30',
 '41d0f9da-fe60d23a9c414003ac7e7bb4',
 '9b5f4e89-bc19265d84f1591ba682b425',
 '9951792b-b580ee9eb126bdf3063a6a2c',
 '4489a66a-4142f438b8f8a347048427a1',
 'd618a70d-9a3e636058f3bde40af4c757',
 'b009145f-236f6555fe22e33daf2cd294',
 'd9a5faab-46495f2101bd1ab283a41649',
 '417476ea-5c13b6bc3f9b1fb4e9c7e616',
 '06da5303-08de2687af26f6444c7b8838']

In [54]:
c.get_result('849c1a3a-c31532a26a147538bb325b30').get()

[1, 2]