OverflowError: Python int too large to convert to C long #1437

Closed
ankravch opened this Issue Aug 2, 2016 · 11 comments

Comments

Projects
None yet
2 participants
@ankravch

ankravch commented Aug 2, 2016

I am testing limits of dd.concat by passing a list of 1000 dask.dataframes each 10M rows:

Traceback(selected)

bcolz_stack_ = dd.concat(list(dask.compute(*concat_dd_list)))
output_dd = input_dd.set_index('global_id', drop=False)

dataframe\core.py in set_index()
return set_index(...)

dataframe\shuffle.py in set_index()
._repartition_quantiles(...)

dask\async.py in get_async()
raise(remote_exception(res,tb))
OverFLowError: Python int too large to convert to C long

dask\async.py, line 249, in _execute_task
return func(*args2)

It worked fine for 100 dask.dataframes. I am working on Windows 64-bit.

@ankravch

This comment has been minimized.

Show comment
Hide comment
@ankravch

ankravch Aug 2, 2016

Here is an actual script

@dask.delayed
def prep_dd(input_table, start, stop, chunksize):
    input_dd = dd.from_bcolz(input_table, chunksize=chunksize, lock=False)
    a_dd = da.arange(start,stop, chunks=[chunksize])
    input_dd['global_id'] = dd.from_dask_array(a_dd)
    output_dd = input_dd.set_index('global_id', drop=False)
    return output_dd

@dask.delayed
def read_tables(table_path,keep_var_list):
    return bcolz.ctable(rootdir=table_path)[keep_var_list]

keep_var_list = ['f0','f1']
t0 = [read_tables(table_path,keep_var_list) for table_path in ['wide_table_10M.bcolz']*100]
input_tables = dask.compute(*t0)

concat_dd_list = []
start,chunksize = 0,int(2e6)
for itable in input_tables:
    stop = start + itable.len
    print(start, stop, itable.shape)
    output_dd = prep_dd(itable[keep_var_list], start, stop, chunksize)
    concat_dd_list.append(output_dd)
    start = stop

bcolz_stack_ = dd.multi.concat(list(dask.compute(*concat_dd_list)))

ankravch commented Aug 2, 2016

Here is an actual script

@dask.delayed
def prep_dd(input_table, start, stop, chunksize):
    input_dd = dd.from_bcolz(input_table, chunksize=chunksize, lock=False)
    a_dd = da.arange(start,stop, chunks=[chunksize])
    input_dd['global_id'] = dd.from_dask_array(a_dd)
    output_dd = input_dd.set_index('global_id', drop=False)
    return output_dd

@dask.delayed
def read_tables(table_path,keep_var_list):
    return bcolz.ctable(rootdir=table_path)[keep_var_list]

keep_var_list = ['f0','f1']
t0 = [read_tables(table_path,keep_var_list) for table_path in ['wide_table_10M.bcolz']*100]
input_tables = dask.compute(*t0)

concat_dd_list = []
start,chunksize = 0,int(2e6)
for itable in input_tables:
    stop = start + itable.len
    print(start, stop, itable.shape)
    output_dd = prep_dd(itable[keep_var_list], start, stop, chunksize)
    concat_dd_list.append(output_dd)
    start = stop

bcolz_stack_ = dd.multi.concat(list(dask.compute(*concat_dd_list)))
@ankravch

This comment has been minimized.

Show comment
Hide comment
@ankravch

ankravch Aug 2, 2016

OverflowError: Python int too large to convert to C long

Is there a way to catch this error and re-raise as more helpful error message?

ankravch commented Aug 2, 2016

OverflowError: Python int too large to convert to C long

Is there a way to catch this error and re-raise as more helpful error message?

@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Aug 3, 2016

Member

There is a lot of mixing of dask things and pandas things going on here. Two things jump out at me:

dd.concat(list(dask.compute(*concat_dd_list))

Why not do the following instead?

dd.concat(concat_dd_list)

Why are the prep_dd and read_tables functions delayed? They look like they're building up dask graphs lazily. Maybe just run them normally?

Member

mrocklin commented Aug 3, 2016

There is a lot of mixing of dask things and pandas things going on here. Two things jump out at me:

dd.concat(list(dask.compute(*concat_dd_list))

Why not do the following instead?

dd.concat(concat_dd_list)

Why are the prep_dd and read_tables functions delayed? They look like they're building up dask graphs lazily. Maybe just run them normally?

@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Aug 3, 2016

Member

Also, when you show an error or traceback can you try to include the entire thing? I get that you're trying to show only the relevant details, but generally it's fine to include everything.

Member

mrocklin commented Aug 3, 2016

Also, when you show an error or traceback can you try to include the entire thing? I get that you're trying to show only the relevant details, but generally it's fine to include everything.

@ankravch

This comment has been minimized.

Show comment
Hide comment
@ankravch

ankravch Aug 3, 2016

Why not do the following instead? dd.concat(concat_dd_list)

right, it's fixed now

Why are the prep_dd and read_tables functions delayed?

ok, I dropped delayed from prep_dd. But for read_tables function it takes ~5 s to read a table, and bcolz.ctable(rootdir=table_path) is not lazy unfortunately.

def prep_dd(input_table, start, stop, chunksize):
    input_dd = dd.from_bcolz(input_table, chunksize=chunksize,\
                             categorize=False, lock=False)
    a_dd = da.arange(start,stop, chunks=[chunksize])
    input_dd['global_id'] = dd.from_dask_array(a_dd)
    output_dd = input_dd.set_index('global_id', drop=False)
    return output_dd

@dask.delayed
def read_tables(table_path,keep_var_list):
    return bcolz.ctable(rootdir=table_path)[keep_var_list]

starttime = datetime.now()
keep_var_list = ['f0','f1']
t0 = [read_tables(table_path,keep_var_list) for table_path in ['wide_table_10M.bcolz']*300]
input_tables = dask.compute(*t0)

concat_dd_list = []
start,chunksize = 0,int(2e6)
for itable in input_tables:
    stop = start + itable.len
    print(start, stop)
    output_dd = prep_dd(itable, start, stop, chunksize)
    concat_dd_list.append(output_dd)
    start = stop

bcolz_stack_ = dd.concat(concat_dd_list)

Here is a traceback

0 10000000
10000000 20000000
20000000 30000000
30000000 40000000
40000000 50000000
50000000 60000000
60000000 70000000
70000000 80000000
80000000 90000000
90000000 100000000
100000000 110000000
110000000 120000000
120000000 130000000
130000000 140000000
140000000 150000000
150000000 160000000
160000000 170000000
170000000 180000000
180000000 190000000
190000000 200000000
200000000 210000000
210000000 220000000
220000000 230000000
230000000 240000000
240000000 250000000
250000000 260000000
260000000 270000000
270000000 280000000
280000000 290000000
290000000 300000000
300000000 310000000
310000000 320000000
320000000 330000000
330000000 340000000
340000000 350000000
350000000 360000000
360000000 370000000
370000000 380000000
380000000 390000000
390000000 400000000
400000000 410000000
410000000 420000000
420000000 430000000
430000000 440000000
440000000 450000000
450000000 460000000
460000000 470000000
470000000 480000000
480000000 490000000
490000000 500000000
500000000 510000000
510000000 520000000
520000000 530000000
530000000 540000000
540000000 550000000
550000000 560000000
560000000 570000000
570000000 580000000
580000000 590000000
590000000 600000000
600000000 610000000
610000000 620000000
620000000 630000000
630000000 640000000
640000000 650000000
650000000 660000000
660000000 670000000
670000000 680000000
680000000 690000000
690000000 700000000
700000000 710000000
710000000 720000000
720000000 730000000
730000000 740000000
740000000 750000000
750000000 760000000
760000000 770000000
770000000 780000000
780000000 790000000
790000000 800000000
800000000 810000000
810000000 820000000
820000000 830000000
830000000 840000000
840000000 850000000
850000000 860000000
860000000 870000000
870000000 880000000
880000000 890000000
890000000 900000000
900000000 910000000
910000000 920000000
920000000 930000000
930000000 940000000
940000000 950000000
950000000 960000000
960000000 970000000
970000000 980000000
980000000 990000000
990000000 1000000000
1000000000 1010000000
1010000000 1020000000
1020000000 1030000000
1030000000 1040000000
1040000000 1050000000
1050000000 1060000000
1060000000 1070000000
1070000000 1080000000
1080000000 1090000000
1090000000 1100000000
1100000000 1110000000
1110000000 1120000000
1120000000 1130000000
1130000000 1140000000
1140000000 1150000000
1150000000 1160000000
1160000000 1170000000
1170000000 1180000000
1180000000 1190000000
1190000000 1200000000
1200000000 1210000000
1210000000 1220000000
1220000000 1230000000
1230000000 1240000000
1240000000 1250000000
1250000000 1260000000
1260000000 1270000000
1270000000 1280000000
1280000000 1290000000
1290000000 1300000000
1300000000 1310000000
1310000000 1320000000
1320000000 1330000000
1330000000 1340000000
1340000000 1350000000
1350000000 1360000000
1360000000 1370000000
1370000000 1380000000
1380000000 1390000000
1390000000 1400000000
1400000000 1410000000
1410000000 1420000000
1420000000 1430000000
1430000000 1440000000
1440000000 1450000000
1450000000 1460000000
1460000000 1470000000
1470000000 1480000000
1480000000 1490000000
1490000000 1500000000
1500000000 1510000000
1510000000 1520000000
1520000000 1530000000
1530000000 1540000000
1540000000 1550000000
1550000000 1560000000
1560000000 1570000000
1570000000 1580000000
1580000000 1590000000
1590000000 1600000000
1600000000 1610000000
1610000000 1620000000
1620000000 1630000000
1630000000 1640000000
1640000000 1650000000
1650000000 1660000000
1660000000 1670000000
1670000000 1680000000
1680000000 1690000000
1690000000 1700000000
1700000000 1710000000
1710000000 1720000000
1720000000 1730000000
1730000000 1740000000
1740000000 1750000000
1750000000 1760000000
1760000000 1770000000
1770000000 1780000000
1780000000 1790000000
1790000000 1800000000
1800000000 1810000000
1810000000 1820000000
1820000000 1830000000
1830000000 1840000000
1840000000 1850000000
1850000000 1860000000
1860000000 1870000000
1870000000 1880000000
1880000000 1890000000
1890000000 1900000000
1900000000 1910000000
1910000000 1920000000
1920000000 1930000000
1930000000 1940000000
1940000000 1950000000
1950000000 1960000000
1960000000 1970000000
1970000000 1980000000
1980000000 1990000000
1990000000 2000000000
2000000000 2010000000
2010000000 2020000000
2020000000 2030000000
2030000000 2040000000
2040000000 2050000000
2050000000 2060000000
2060000000 2070000000
2070000000 2080000000
2080000000 2090000000
2090000000 2100000000
2100000000 2110000000
2110000000 2120000000
2120000000 2130000000
2130000000 2140000000
2140000000 2150000000
---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
<ipython-input-2-d7afd6a49a44> in <module>()
     21     stop = start + itable.len
     22     print(start, stop)
---> 23     output_dd = prep_dd(itable, start, stop, chunksize)
     24     concat_dd_list.append(output_dd)
     25     start = stop

<ipython-input-2-d7afd6a49a44> in prep_dd(input_table, start, stop, chunksize)
      4     a_dd = da.arange(start,stop, chunks=[chunksize])
      5     input_dd['global_id'] = dd.from_dask_array(a_dd)
----> 6     output_dd = input_dd.set_index('global_id', drop=False)
      7     return output_dd
      8 

...\Anaconda3\lib\site-packages\dask\dataframe\core.py in set_index(self, other, drop, sorted, **kwargs)
   1644         else:
   1645             from .shuffle import set_index
-> 1646             return set_index(self, other, drop=drop, **kwargs)
   1647 
   1648     def set_partition(self, column, divisions, **kwargs):

...\Anaconda3\lib\site-packages\dask\dataframe\shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, **kwargs)
     46 
     47     divisions = (index2
---> 48                   ._repartition_quantiles(npartitions, upsample=upsample)
     49                   .compute()).tolist()
     50 

...\Anaconda3\lib\site-packages\dask\base.py in compute(self, **kwargs)
     84             Extra keywords to forward to the scheduler ``get`` function.
     85         """
---> 86         return compute(self, **kwargs)[0]
     87 
     88     @classmethod

...\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    177         dsk = merge(var.dask for var in variables)
    178     keys = [var._keys() for var in variables]
--> 179     results = get(dsk, keys, **kwargs)
    180 
    181     results_iter = iter(results)

...\Anaconda3\lib\site-packages\distributed\executor.py in get(self, dsk, keys, restrictions, loose_restrictions, **kwargs)
   1052 
   1053         try:
-> 1054             results = self.gather(futures)
   1055         except (KeyboardInterrupt, Exception) as e:
   1056             for f in futures.values():

...\Anaconda3\lib\site-packages\distributed\executor.py in gather(self, futures, errors, maxsize)
    765             return (self.gather(f, errors=errors) for f in futures)
    766         else:
--> 767             return sync(self.loop, self._gather, futures, errors=errors)
    768 
    769     @gen.coroutine

...\Anaconda3\lib\site-packages\distributed\utils.py in sync(loop, func, *args, **kwargs)
    114         e.wait(1000000)
    115     if error[0]:
--> 116         six.reraise(type(error[0]), error[0], traceback[0])
    117     else:
    118         return result[0]

...\Anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

...\Anaconda3\lib\site-packages\distributed\utils.py in f()
    100     def f():
    101         try:
--> 102             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    103         except Exception as exc:
    104             logger.exception(exc)

...\Anaconda3\lib\site-packages\tornado\gen.py in run(self)
   1006 
   1007                     try:
-> 1008                         value = future.result()
   1009                     except Exception:
   1010                         self.had_exception = True

...\Anaconda3\lib\site-packages\tornado\concurrent.py in result(self, timeout)
    230             return self._result
    231         if self._exc_info is not None:
--> 232             raise_exc_info(self._exc_info)
    233         self._check_done()
    234         return self._result

...\Anaconda3\lib\site-packages\tornado\util.py in raise_exc_info(exc_info)

...\Anaconda3\lib\site-packages\tornado\gen.py in run(self)
   1012 
   1013                     if exc_info is not None:
-> 1014                         yielded = self.gen.throw(*exc_info)
   1015                         exc_info = None
   1016                     else:

...\Anaconda3\lib\site-packages\distributed\executor.py in _gather(self, futures, errors)
    688                         six.reraise(type(d['exception']),
    689                                     d['exception'],
--> 690                                     d['traceback'])
    691                     if errors == 'skip':
    692                         bad_keys.add(key)

...\Anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

OverflowError: Python int too large to convert to C long

ankravch commented Aug 3, 2016

Why not do the following instead? dd.concat(concat_dd_list)

right, it's fixed now

Why are the prep_dd and read_tables functions delayed?

ok, I dropped delayed from prep_dd. But for read_tables function it takes ~5 s to read a table, and bcolz.ctable(rootdir=table_path) is not lazy unfortunately.

def prep_dd(input_table, start, stop, chunksize):
    input_dd = dd.from_bcolz(input_table, chunksize=chunksize,\
                             categorize=False, lock=False)
    a_dd = da.arange(start,stop, chunks=[chunksize])
    input_dd['global_id'] = dd.from_dask_array(a_dd)
    output_dd = input_dd.set_index('global_id', drop=False)
    return output_dd

@dask.delayed
def read_tables(table_path,keep_var_list):
    return bcolz.ctable(rootdir=table_path)[keep_var_list]

starttime = datetime.now()
keep_var_list = ['f0','f1']
t0 = [read_tables(table_path,keep_var_list) for table_path in ['wide_table_10M.bcolz']*300]
input_tables = dask.compute(*t0)

concat_dd_list = []
start,chunksize = 0,int(2e6)
for itable in input_tables:
    stop = start + itable.len
    print(start, stop)
    output_dd = prep_dd(itable, start, stop, chunksize)
    concat_dd_list.append(output_dd)
    start = stop

bcolz_stack_ = dd.concat(concat_dd_list)

Here is a traceback

0 10000000
10000000 20000000
20000000 30000000
30000000 40000000
40000000 50000000
50000000 60000000
60000000 70000000
70000000 80000000
80000000 90000000
90000000 100000000
100000000 110000000
110000000 120000000
120000000 130000000
130000000 140000000
140000000 150000000
150000000 160000000
160000000 170000000
170000000 180000000
180000000 190000000
190000000 200000000
200000000 210000000
210000000 220000000
220000000 230000000
230000000 240000000
240000000 250000000
250000000 260000000
260000000 270000000
270000000 280000000
280000000 290000000
290000000 300000000
300000000 310000000
310000000 320000000
320000000 330000000
330000000 340000000
340000000 350000000
350000000 360000000
360000000 370000000
370000000 380000000
380000000 390000000
390000000 400000000
400000000 410000000
410000000 420000000
420000000 430000000
430000000 440000000
440000000 450000000
450000000 460000000
460000000 470000000
470000000 480000000
480000000 490000000
490000000 500000000
500000000 510000000
510000000 520000000
520000000 530000000
530000000 540000000
540000000 550000000
550000000 560000000
560000000 570000000
570000000 580000000
580000000 590000000
590000000 600000000
600000000 610000000
610000000 620000000
620000000 630000000
630000000 640000000
640000000 650000000
650000000 660000000
660000000 670000000
670000000 680000000
680000000 690000000
690000000 700000000
700000000 710000000
710000000 720000000
720000000 730000000
730000000 740000000
740000000 750000000
750000000 760000000
760000000 770000000
770000000 780000000
780000000 790000000
790000000 800000000
800000000 810000000
810000000 820000000
820000000 830000000
830000000 840000000
840000000 850000000
850000000 860000000
860000000 870000000
870000000 880000000
880000000 890000000
890000000 900000000
900000000 910000000
910000000 920000000
920000000 930000000
930000000 940000000
940000000 950000000
950000000 960000000
960000000 970000000
970000000 980000000
980000000 990000000
990000000 1000000000
1000000000 1010000000
1010000000 1020000000
1020000000 1030000000
1030000000 1040000000
1040000000 1050000000
1050000000 1060000000
1060000000 1070000000
1070000000 1080000000
1080000000 1090000000
1090000000 1100000000
1100000000 1110000000
1110000000 1120000000
1120000000 1130000000
1130000000 1140000000
1140000000 1150000000
1150000000 1160000000
1160000000 1170000000
1170000000 1180000000
1180000000 1190000000
1190000000 1200000000
1200000000 1210000000
1210000000 1220000000
1220000000 1230000000
1230000000 1240000000
1240000000 1250000000
1250000000 1260000000
1260000000 1270000000
1270000000 1280000000
1280000000 1290000000
1290000000 1300000000
1300000000 1310000000
1310000000 1320000000
1320000000 1330000000
1330000000 1340000000
1340000000 1350000000
1350000000 1360000000
1360000000 1370000000
1370000000 1380000000
1380000000 1390000000
1390000000 1400000000
1400000000 1410000000
1410000000 1420000000
1420000000 1430000000
1430000000 1440000000
1440000000 1450000000
1450000000 1460000000
1460000000 1470000000
1470000000 1480000000
1480000000 1490000000
1490000000 1500000000
1500000000 1510000000
1510000000 1520000000
1520000000 1530000000
1530000000 1540000000
1540000000 1550000000
1550000000 1560000000
1560000000 1570000000
1570000000 1580000000
1580000000 1590000000
1590000000 1600000000
1600000000 1610000000
1610000000 1620000000
1620000000 1630000000
1630000000 1640000000
1640000000 1650000000
1650000000 1660000000
1660000000 1670000000
1670000000 1680000000
1680000000 1690000000
1690000000 1700000000
1700000000 1710000000
1710000000 1720000000
1720000000 1730000000
1730000000 1740000000
1740000000 1750000000
1750000000 1760000000
1760000000 1770000000
1770000000 1780000000
1780000000 1790000000
1790000000 1800000000
1800000000 1810000000
1810000000 1820000000
1820000000 1830000000
1830000000 1840000000
1840000000 1850000000
1850000000 1860000000
1860000000 1870000000
1870000000 1880000000
1880000000 1890000000
1890000000 1900000000
1900000000 1910000000
1910000000 1920000000
1920000000 1930000000
1930000000 1940000000
1940000000 1950000000
1950000000 1960000000
1960000000 1970000000
1970000000 1980000000
1980000000 1990000000
1990000000 2000000000
2000000000 2010000000
2010000000 2020000000
2020000000 2030000000
2030000000 2040000000
2040000000 2050000000
2050000000 2060000000
2060000000 2070000000
2070000000 2080000000
2080000000 2090000000
2090000000 2100000000
2100000000 2110000000
2110000000 2120000000
2120000000 2130000000
2130000000 2140000000
2140000000 2150000000
---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
<ipython-input-2-d7afd6a49a44> in <module>()
     21     stop = start + itable.len
     22     print(start, stop)
---> 23     output_dd = prep_dd(itable, start, stop, chunksize)
     24     concat_dd_list.append(output_dd)
     25     start = stop

<ipython-input-2-d7afd6a49a44> in prep_dd(input_table, start, stop, chunksize)
      4     a_dd = da.arange(start,stop, chunks=[chunksize])
      5     input_dd['global_id'] = dd.from_dask_array(a_dd)
----> 6     output_dd = input_dd.set_index('global_id', drop=False)
      7     return output_dd
      8 

...\Anaconda3\lib\site-packages\dask\dataframe\core.py in set_index(self, other, drop, sorted, **kwargs)
   1644         else:
   1645             from .shuffle import set_index
-> 1646             return set_index(self, other, drop=drop, **kwargs)
   1647 
   1648     def set_partition(self, column, divisions, **kwargs):

...\Anaconda3\lib\site-packages\dask\dataframe\shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, **kwargs)
     46 
     47     divisions = (index2
---> 48                   ._repartition_quantiles(npartitions, upsample=upsample)
     49                   .compute()).tolist()
     50 

...\Anaconda3\lib\site-packages\dask\base.py in compute(self, **kwargs)
     84             Extra keywords to forward to the scheduler ``get`` function.
     85         """
---> 86         return compute(self, **kwargs)[0]
     87 
     88     @classmethod

...\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    177         dsk = merge(var.dask for var in variables)
    178     keys = [var._keys() for var in variables]
--> 179     results = get(dsk, keys, **kwargs)
    180 
    181     results_iter = iter(results)

...\Anaconda3\lib\site-packages\distributed\executor.py in get(self, dsk, keys, restrictions, loose_restrictions, **kwargs)
   1052 
   1053         try:
-> 1054             results = self.gather(futures)
   1055         except (KeyboardInterrupt, Exception) as e:
   1056             for f in futures.values():

...\Anaconda3\lib\site-packages\distributed\executor.py in gather(self, futures, errors, maxsize)
    765             return (self.gather(f, errors=errors) for f in futures)
    766         else:
--> 767             return sync(self.loop, self._gather, futures, errors=errors)
    768 
    769     @gen.coroutine

...\Anaconda3\lib\site-packages\distributed\utils.py in sync(loop, func, *args, **kwargs)
    114         e.wait(1000000)
    115     if error[0]:
--> 116         six.reraise(type(error[0]), error[0], traceback[0])
    117     else:
    118         return result[0]

...\Anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

...\Anaconda3\lib\site-packages\distributed\utils.py in f()
    100     def f():
    101         try:
--> 102             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    103         except Exception as exc:
    104             logger.exception(exc)

...\Anaconda3\lib\site-packages\tornado\gen.py in run(self)
   1006 
   1007                     try:
-> 1008                         value = future.result()
   1009                     except Exception:
   1010                         self.had_exception = True

...\Anaconda3\lib\site-packages\tornado\concurrent.py in result(self, timeout)
    230             return self._result
    231         if self._exc_info is not None:
--> 232             raise_exc_info(self._exc_info)
    233         self._check_done()
    234         return self._result

...\Anaconda3\lib\site-packages\tornado\util.py in raise_exc_info(exc_info)

...\Anaconda3\lib\site-packages\tornado\gen.py in run(self)
   1012 
   1013                     if exc_info is not None:
-> 1014                         yielded = self.gen.throw(*exc_info)
   1015                         exc_info = None
   1016                     else:

...\Anaconda3\lib\site-packages\distributed\executor.py in _gather(self, futures, errors)
    688                         six.reraise(type(d['exception']),
    689                                     d['exception'],
--> 690                                     d['traceback'])
    691                     if errors == 'skip':
    692                         bad_keys.add(key)

...\Anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
    684         if value.__traceback__ is not tb:
    685             raise value.with_traceback(tb)
--> 686         raise value
    687 
    688 else:

OverflowError: Python int too large to convert to C long
@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Aug 3, 2016

Member

In order to get a nicer traceback and get dropped right into the error I recommend running this under the synchronous single-threaded scheduler

dask.set_options(get=dask.async.get_sync)
Member

mrocklin commented Aug 3, 2016

In order to get a nicer traceback and get dropped right into the error I recommend running this under the synchronous single-threaded scheduler

dask.set_options(get=dask.async.get_sync)
@ankravch

This comment has been minimized.

Show comment
Hide comment
@ankravch

ankravch Aug 3, 2016

ok, running under a synchronous single-threaded scheduler

dask.set_options(get=dask.async.get_sync)

def prep_dd(input_table, start, stop, chunksize):
    input_dd = dd.from_bcolz(input_table, chunksize=chunksize,\
                             categorize=False, lock=False)
    a_dd = da.arange(start,stop, chunks=[chunksize])
    input_dd['global_id'] = dd.from_dask_array(a_dd)
    output_dd = input_dd.set_index('global_id', drop=False)
    return output_dd

@dask.delayed
def read_tables(table_path,keep_var_list):
    return bcolz.ctable(rootdir=table_path)[keep_var_list]

starttime = datetime.now()
keep_var_list = ['f0','f1']
t0 = [read_tables(table_path,keep_var_list) for table_path in ['wide_table_10M.bcolz']*250]
input_tables = dask.compute(*t0)

concat_dd_list = []
start,chunksize = 0,int(2e6)
for itable in input_tables:
    stop = start + itable.len
    print(start, stop)
    output_dd = prep_dd(itable, start, stop, chunksize)
    concat_dd_list.append(output_dd)
    start = stop

bcolz_stack_ = dd.concat(concat_dd_list)

And a traceback:

0 10000000
10000000 20000000
20000000 30000000
...
2120000000 2130000000
2130000000 2140000000
2140000000 2150000000
---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
<ipython-input-8-81c10ede500a> in <module>()
     22     stop = start + itable.len
     23     print(start, stop)
---> 24     output_dd = prep_dd(itable, start, stop, chunksize)
     25     concat_dd_list.append(output_dd)
     26     start = stop

<ipython-input-8-81c10ede500a> in prep_dd(input_table, start, stop, chunksize)
      5     a_dd = da.arange(start,stop, chunks=[chunksize])
      6     input_dd['global_id'] = dd.from_dask_array(a_dd)
----> 7     output_dd = input_dd.set_index('global_id', drop=False)
      8     return output_dd
      9 

...\Anaconda3\lib\site-packages\dask\dataframe\core.py in set_index(self, other, drop, sorted, **kwargs)
   1644         else:
   1645             from .shuffle import set_index
-> 1646             return set_index(self, other, drop=drop, **kwargs)
   1647 
   1648     def set_partition(self, column, divisions, **kwargs):

...\Anaconda3\lib\site-packages\dask\dataframe\shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, **kwargs)
     46 
     47     divisions = (index2
---> 48                   ._repartition_quantiles(npartitions, upsample=upsample)
     49                   .compute()).tolist()
     50 

...\Anaconda3\lib\site-packages\dask\base.py in compute(self, **kwargs)
     84             Extra keywords to forward to the scheduler ``get`` function.
     85         """
---> 86         return compute(self, **kwargs)[0]
     87 
     88     @classmethod

...\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    177         dsk = merge(var.dask for var in variables)
    178     keys = [var._keys() for var in variables]
--> 179     results = get(dsk, keys, **kwargs)
    180 
    181     results_iter = iter(results)

...\Anaconda3\lib\site-packages\dask\async.py in get_sync(dsk, keys, **kwargs)
    521     queue = Queue()
    522     return get_async(apply_sync, 1, dsk, keys, queue=queue,
--> 523                      raise_on_exception=True, **kwargs)
    524 
    525 

...\Anaconda3\lib\site-packages\dask\async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    488             f(key, res, dsk, state, worker_id)
    489         while state['ready'] and len(state['running']) < num_workers:
--> 490             fire_task()
    491 
    492     # Final reporting

...\Anaconda3\lib\site-packages\dask\async.py in fire_task()
    459         # Submit
    460         apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 461                                         get_id, raise_on_exception])
    462 
    463     # Seed initial tasks into the thread pool

...\Anaconda3\lib\site-packages\dask\async.py in apply_sync(func, args, kwds)
    509 def apply_sync(func, args=(), kwds={}):
    510     """ A naive synchronous version of apply_async """
--> 511     return func(*args, **kwds)
    512 
    513 

...\Anaconda3\lib\site-packages\dask\async.py in execute_task(key, task, data, queue, get_id, raise_on_exception)
    265     """
    266     try:
--> 267         result = _execute_task(task, data)
    268         id = get_id()
    269         result = key, result, None, id

...\Anaconda3\lib\site-packages\dask\async.py in _execute_task(arg, cache, dsk)
    247         func, args = arg[0], arg[1:]
    248         args2 = [_execute_task(a, cache) for a in args]
--> 249         return func(*args2)
    250     elif not ishashable(arg):
    251         return arg

OverflowError: Python int too large to convert to C long

ankravch commented Aug 3, 2016

ok, running under a synchronous single-threaded scheduler

dask.set_options(get=dask.async.get_sync)

def prep_dd(input_table, start, stop, chunksize):
    input_dd = dd.from_bcolz(input_table, chunksize=chunksize,\
                             categorize=False, lock=False)
    a_dd = da.arange(start,stop, chunks=[chunksize])
    input_dd['global_id'] = dd.from_dask_array(a_dd)
    output_dd = input_dd.set_index('global_id', drop=False)
    return output_dd

@dask.delayed
def read_tables(table_path,keep_var_list):
    return bcolz.ctable(rootdir=table_path)[keep_var_list]

starttime = datetime.now()
keep_var_list = ['f0','f1']
t0 = [read_tables(table_path,keep_var_list) for table_path in ['wide_table_10M.bcolz']*250]
input_tables = dask.compute(*t0)

concat_dd_list = []
start,chunksize = 0,int(2e6)
for itable in input_tables:
    stop = start + itable.len
    print(start, stop)
    output_dd = prep_dd(itable, start, stop, chunksize)
    concat_dd_list.append(output_dd)
    start = stop

bcolz_stack_ = dd.concat(concat_dd_list)

And a traceback:

0 10000000
10000000 20000000
20000000 30000000
...
2120000000 2130000000
2130000000 2140000000
2140000000 2150000000
---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
<ipython-input-8-81c10ede500a> in <module>()
     22     stop = start + itable.len
     23     print(start, stop)
---> 24     output_dd = prep_dd(itable, start, stop, chunksize)
     25     concat_dd_list.append(output_dd)
     26     start = stop

<ipython-input-8-81c10ede500a> in prep_dd(input_table, start, stop, chunksize)
      5     a_dd = da.arange(start,stop, chunks=[chunksize])
      6     input_dd['global_id'] = dd.from_dask_array(a_dd)
----> 7     output_dd = input_dd.set_index('global_id', drop=False)
      8     return output_dd
      9 

...\Anaconda3\lib\site-packages\dask\dataframe\core.py in set_index(self, other, drop, sorted, **kwargs)
   1644         else:
   1645             from .shuffle import set_index
-> 1646             return set_index(self, other, drop=drop, **kwargs)
   1647 
   1648     def set_partition(self, column, divisions, **kwargs):

...\Anaconda3\lib\site-packages\dask\dataframe\shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, **kwargs)
     46 
     47     divisions = (index2
---> 48                   ._repartition_quantiles(npartitions, upsample=upsample)
     49                   .compute()).tolist()
     50 

...\Anaconda3\lib\site-packages\dask\base.py in compute(self, **kwargs)
     84             Extra keywords to forward to the scheduler ``get`` function.
     85         """
---> 86         return compute(self, **kwargs)[0]
     87 
     88     @classmethod

...\Anaconda3\lib\site-packages\dask\base.py in compute(*args, **kwargs)
    177         dsk = merge(var.dask for var in variables)
    178     keys = [var._keys() for var in variables]
--> 179     results = get(dsk, keys, **kwargs)
    180 
    181     results_iter = iter(results)

...\Anaconda3\lib\site-packages\dask\async.py in get_sync(dsk, keys, **kwargs)
    521     queue = Queue()
    522     return get_async(apply_sync, 1, dsk, keys, queue=queue,
--> 523                      raise_on_exception=True, **kwargs)
    524 
    525 

...\Anaconda3\lib\site-packages\dask\async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    488             f(key, res, dsk, state, worker_id)
    489         while state['ready'] and len(state['running']) < num_workers:
--> 490             fire_task()
    491 
    492     # Final reporting

...\Anaconda3\lib\site-packages\dask\async.py in fire_task()
    459         # Submit
    460         apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 461                                         get_id, raise_on_exception])
    462 
    463     # Seed initial tasks into the thread pool

...\Anaconda3\lib\site-packages\dask\async.py in apply_sync(func, args, kwds)
    509 def apply_sync(func, args=(), kwds={}):
    510     """ A naive synchronous version of apply_async """
--> 511     return func(*args, **kwds)
    512 
    513 

...\Anaconda3\lib\site-packages\dask\async.py in execute_task(key, task, data, queue, get_id, raise_on_exception)
    265     """
    266     try:
--> 267         result = _execute_task(task, data)
    268         id = get_id()
    269         result = key, result, None, id

...\Anaconda3\lib\site-packages\dask\async.py in _execute_task(arg, cache, dsk)
    247         func, args = arg[0], arg[1:]
    248         args2 = [_execute_task(a, cache) for a in args]
--> 249         return func(*args2)
    250     elif not ishashable(arg):
    251         return arg

OverflowError: Python int too large to convert to C long
@ankravch

This comment has been minimized.

Show comment
Hide comment
@ankravch

ankravch Aug 3, 2016

Here is a minimal example to reproduce error quickly

import dask.dataframe as dd
import dask.array as da
import dask, bcolz

N=int(1e7)
ct=bcolz.fromiter(((i,i) for i in range(N)), dtype="i8,i8", count=N, rootdir=r'foo.bcolz', mode='w')

dask.set_options(get=dask.async.get_sync)

def prep_dd(input_table, start, stop, chunksize):
    input_dd = dd.from_bcolz(input_table, chunksize=chunksize,\
                             categorize=False, lock=False)
    a_dd = da.arange(start,stop, chunks=[chunksize])
    input_dd['global_id'] = dd.from_dask_array(a_dd)
    output_dd = input_dd.set_index('global_id', drop=False)
    return output_dd

@dask.delayed
def read_tables(table_path,keep_var_list):
    return bcolz.ctable(rootdir=table_path)[keep_var_list]

keep_var_list = ['f0','f1']
t0 = [read_tables(table_path,keep_var_list) for table_path in ['foo.bcolz']*5]
input_tables = dask.compute(*t0)

concat_dd_list = []
start,chunksize = 2120000000,int(2e6)
for itable in input_tables:
    stop = start + itable.len
    print(start, stop)
    output_dd = prep_dd(itable, start, stop, chunksize)
    concat_dd_list.append(output_dd)
    start = stop

bcolz_stack_ = dd.concat(concat_dd_list)
print (bcolz_stack_)

ankravch commented Aug 3, 2016

Here is a minimal example to reproduce error quickly

import dask.dataframe as dd
import dask.array as da
import dask, bcolz

N=int(1e7)
ct=bcolz.fromiter(((i,i) for i in range(N)), dtype="i8,i8", count=N, rootdir=r'foo.bcolz', mode='w')

dask.set_options(get=dask.async.get_sync)

def prep_dd(input_table, start, stop, chunksize):
    input_dd = dd.from_bcolz(input_table, chunksize=chunksize,\
                             categorize=False, lock=False)
    a_dd = da.arange(start,stop, chunks=[chunksize])
    input_dd['global_id'] = dd.from_dask_array(a_dd)
    output_dd = input_dd.set_index('global_id', drop=False)
    return output_dd

@dask.delayed
def read_tables(table_path,keep_var_list):
    return bcolz.ctable(rootdir=table_path)[keep_var_list]

keep_var_list = ['f0','f1']
t0 = [read_tables(table_path,keep_var_list) for table_path in ['foo.bcolz']*5]
input_tables = dask.compute(*t0)

concat_dd_list = []
start,chunksize = 2120000000,int(2e6)
for itable in input_tables:
    stop = start + itable.len
    print(start, stop)
    output_dd = prep_dd(itable, start, stop, chunksize)
    concat_dd_list.append(output_dd)
    start = stop

bcolz_stack_ = dd.concat(concat_dd_list)
print (bcolz_stack_)
@ankravch

This comment has been minimized.

Show comment
Hide comment
@ankravch

ankravch Aug 3, 2016

Actually I tested the minimal example above (which failed on Windows 64-bit) on CentOS - and it worked.

It looks like this error ('OverflowError: Python int too large to convert to C long') was discussed in other places, for example: RaRe-Technologies/gensim#321

ankravch commented Aug 3, 2016

Actually I tested the minimal example above (which failed on Windows 64-bit) on CentOS - and it worked.

It looks like this error ('OverflowError: Python int too large to convert to C long') was discussed in other places, for example: RaRe-Technologies/gensim#321

@ankravch

This comment has been minimized.

Show comment
Hide comment
@ankravch

ankravch Aug 3, 2016

Or this one: paramiko/paramiko#353
'The issue is with the low-level protocol decode -- anything that's a uint32 can be incorrectly decoded, or worse cause an integer overflow exception, if the MSB has the value -xff. The window size is sent as a uint32.'

ankravch commented Aug 3, 2016

Or this one: paramiko/paramiko#353
'The issue is with the low-level protocol decode -- anything that's a uint32 can be incorrectly decoded, or worse cause an integer overflow exception, if the MSB has the value -xff. The window size is sent as a uint32.'

@ankravch

This comment has been minimized.

Show comment
Hide comment
@ankravch

ankravch Aug 3, 2016

Hmm, changing

a_dd = da.arange(start,stop, chunks=[chunksize])

to

a_dd = da.arange(start,stop, chunks=[chunksize], dtype=np.int64)

makes it work on Windows 64 bit.

ankravch commented Aug 3, 2016

Hmm, changing

a_dd = da.arange(start,stop, chunks=[chunksize])

to

a_dd = da.arange(start,stop, chunks=[chunksize], dtype=np.int64)

makes it work on Windows 64 bit.

@ankravch ankravch closed this Aug 3, 2016

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment