/
carrayExtension.pyx
2374 lines (2020 loc) · 74.4 KB
/
carrayExtension.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#########################################################################
#
# License: BSD
# Created: August 05, 2010
# Author: Francesc Alted - francesc@continuum.io
#
########################################################################
import sys
import numpy as np
import blaze.carray as ca
from blaze.carray import utils, attrs, array2string
import os, os.path
import struct
import shutil
import tempfile
import json
import cython
_KB = 1024
_MB = 1024*_KB
# Directories for saving the data and metadata for carray persistency
DATA_DIR = 'data'
META_DIR = 'meta'
SIZES_FILE = 'sizes'
STORAGE_FILE = 'storage'
# For the persistence layer
EXTENSION = '.blp'
MAGIC = 'blpk'
BLOSCPACK_HEADER_LENGTH = 16
BLOSC_HEADER_LENGTH = 16
FORMAT_VERSION = 1
MAX_FORMAT_VERSION = 255
MAX_CHUNKS = (2**63)-1
# The type used for size values: indexes, coordinates, dimension
# lengths, row numbers, shapes, chunk shapes, byte counts...
SizeType = np.int64
# The native int type for this platform
IntType = np.dtype(np.int_)
#-----------------------------------------------------------------
# numpy functions & objects
from definitions cimport import_array, ndarray, dtype, \
malloc, realloc, free, memcpy, memset, strdup, strcmp, \
PyString_AsString, PyString_GET_SIZE, PyString_FromString, \
PyString_FromStringAndSize, \
Py_BEGIN_ALLOW_THREADS, Py_END_ALLOW_THREADS, \
PyArray_GETITEM, PyArray_SETITEM, \
npy_intp, PyBuffer_FromMemory, Py_uintptr_t
#-----------------------------------------------------------------
# Blosc routines
cdef extern from "blosc.h":
cdef enum:
BLOSC_MAX_OVERHEAD,
BLOSC_VERSION_STRING,
BLOSC_VERSION_DATE
void blosc_get_versions(char *version_str, char *version_date)
int blosc_set_nthreads(int nthreads)
int blosc_compress(int clevel, int doshuffle, size_t typesize,
size_t nbytes, void *src, void *dest,
size_t destsize) nogil
int blosc_decompress(void *src, void *dest, size_t destsize) nogil
int blosc_getitem(void *src, int start, int nitems, void *dest) nogil
void blosc_free_resources()
void blosc_cbuffer_sizes(void *cbuffer, size_t *nbytes,
size_t *cbytes, size_t *blocksize)
void blosc_cbuffer_metainfo(void *cbuffer, size_t *typesize, int *flags)
void blosc_cbuffer_versions(void *cbuffer, int *version, int *versionlz)
void blosc_set_blocksize(size_t blocksize)
#----------------------------------------------------------------------------
# Initialization code
# The numpy API requires this function to be called before
# using any numpy facilities in an extension module.
import_array()
#-------------------------------------------------------------
# Some utilities
def _blosc_set_nthreads(nthreads):
"""
blosc_set_nthreads(nthreads)
Sets the number of threads that Blosc can use.
Parameters
----------
nthreads : int
The desired number of threads to use.
Returns
-------
out : int
The previous setting for the number of threads.
"""
return blosc_set_nthreads(nthreads)
def blosc_version():
"""
blosc_version()
Return the version of the Blosc library.
"""
return (<char *>BLOSC_VERSION_STRING, <char *>BLOSC_VERSION_DATE)
# This is the same than in utils.py, but works faster in extensions
cdef get_len_of_range(npy_intp start, npy_intp stop, npy_intp step):
"""Get the length of a (start, stop, step) range."""
cdef npy_intp n
n = 0
if start < stop:
# Do not use a cython.cdiv here (do not ask me why!)
n = ((stop - start - 1) // step + 1)
return n
cdef clip_chunk(npy_intp nchunk, npy_intp chunklen,
npy_intp start, npy_intp stop, npy_intp step):
"""Get the limits of a certain chunk based on its length."""
cdef npy_intp startb, stopb, blen, distance
startb = start - nchunk * chunklen
stopb = stop - nchunk * chunklen
# Check limits
if (startb >= chunklen) or (stopb <= 0):
return startb, stopb, 0 # null size
if startb < 0:
startb = 0
if stopb > chunklen:
stopb = chunklen
# step corrections
if step > 1:
# Just correcting startb is enough
distance = (nchunk * chunklen + startb) - start
if distance % step > 0:
startb += (step - (distance % step))
if startb > chunklen:
return startb, stopb, 0 # null size
# Compute size of the clipped block
blen = get_len_of_range(startb, stopb, step)
return startb, stopb, blen
cdef int check_zeros(char *data, int nbytes):
"""Check whether [data, data+nbytes] is zero or not."""
cdef int i, iszero, chunklen, leftover
cdef size_t *sdata
iszero = 1
sdata = <size_t *>data
chunklen = cython.cdiv(nbytes, sizeof(size_t))
leftover = nbytes % sizeof(size_t)
with nogil:
for i from 0 <= i < chunklen:
if sdata[i] != 0:
iszero = 0
break
else:
data += nbytes - leftover
for i from 0 <= i < leftover:
if data[i] != 0:
iszero = 0
break
return iszero
cdef int true_count(char *data, int nbytes):
"""Count the number of true values in data (boolean)."""
cdef int i, count
with nogil:
count = 0
for i from 0 <= i < nbytes:
count += <int>(data[i])
return count
#-------------------------------------------------------------
# For member defintions see carrayExtension.pxd ~Stephen
cdef class chunk:
"""
chunk(array, atom, cparams)
Compressed in-memory container for a data chunk.
This class is meant to be used only by the `carray` class.
"""
cdef char typekind, isconstant
cdef public int atomsize, itemsize, blocksize
cdef public int nbytes, cbytes, cdbytes
cdef int true_count
cdef char *data
cdef object atom, constant, dobject
cdef void _getitem(self, int start, int stop, char *dest)
cdef compress_data(self, char *data, size_t itemsize, size_t nbytes, object cparams)
cdef compress_arrdata(self, ndarray array, object cparams, object _memory)
property dtype:
"The NumPy dtype for this chunk."
def __get__(self):
return self.atom
def __cinit__(self, object dobject, object atom, object cparams,
object _memory=True, object _compr=False):
cdef int itemsize, footprint
cdef size_t nbytes, cbytes, blocksize
cdef dtype dtype_
cdef char *data
self.atom = atom
self.atomsize = atom.itemsize
dtype_ = atom.base
self.itemsize = itemsize = dtype_.elsize
self.typekind = dtype_.kind
self.dobject = None
footprint = 0
if _compr:
# Data comes in an already compressed state inside a Python String
self.data = PyString_AsString(dobject)
# Increment the reference so that data don't go away
self.dobject = dobject
# Set size info for the instance
blosc_cbuffer_sizes(self.data, &nbytes, &cbytes, &blocksize)
elif dtype_ == 'O':
# The objects should arrive here already pickled
data = PyString_AsString(dobject)
nbytes = PyString_GET_SIZE(dobject)
cbytes, blocksize = self.compress_data(data, 1, nbytes, cparams)
else:
# Compress the data object (a NumPy object)
nbytes, cbytes, blocksize, footprint = self.compress_arrdata(
dobject, cparams, _memory)
footprint += 128 # add the (aprox) footprint of this instance in bytes
# Fill instance data
self.nbytes = nbytes
self.cbytes = cbytes + footprint
self.cdbytes = cbytes
self.blocksize = blocksize
cdef compress_arrdata(self, ndarray array, object cparams, object _memory):
"""Compress data in `array` and put it in ``self.data``"""
cdef size_t nbytes, cbytes, blocksize, itemsize, footprint
# Compute the total number of bytes in this array
itemsize = array.itemsize
nbytes = itemsize * array.size
cbytes = 0
footprint = 0
# Check whether incoming data can be expressed as a constant or not.
# Disk-based chunks are not allowed to do this.
self.isconstant = 0
self.constant = None
if _memory and (array.strides[0] == 0
or check_zeros(array.data, nbytes)):
self.isconstant = 1
# Get the NumPy constant. Avoid this NumPy quirk:
# np.array(['1'], dtype='S3').dtype != s[0].dtype
if array.dtype.kind != 'S':
self.constant = array[0]
else:
self.constant = np.array(array[0], dtype=array.dtype)
# Add overhead (64 bytes for the overhead of the numpy container)
footprint += 64 + self.constant.size * self.constant.itemsize
if self.isconstant:
blocksize = 4*1024 # use 4 KB as a cache for blocks
# Make blocksize a multiple of itemsize
if blocksize % itemsize > 0:
blocksize = cython.cdiv(blocksize, itemsize) * itemsize
# Correct in case we have a large itemsize
if blocksize == 0:
blocksize = itemsize
else:
if self.typekind == 'b':
self.true_count = true_count(array.data, nbytes)
if array.strides[0] == 0:
# The chunk is made of constants. Regenerate the actual data.
array = array.copy()
# Compress data
cbytes, blocksize = self.compress_data(array.data, itemsize, nbytes,
cparams)
return (nbytes, cbytes, blocksize, footprint)
cdef compress_data(self, char *data, size_t itemsize, size_t nbytes,
object cparams):
"""Compress data with `caparms` and return metadata."""
cdef size_t nbytes_, cbytes, blocksize
cdef int clevel, shuffle
cdef char *dest
clevel = cparams.clevel
shuffle = cparams.shuffle
dest = <char *>malloc(nbytes+BLOSC_MAX_OVERHEAD)
with nogil:
cbytes = blosc_compress(clevel, shuffle, itemsize, nbytes,
data, dest, nbytes+BLOSC_MAX_OVERHEAD)
if cbytes <= 0:
raise RuntimeError, "fatal error during Blosc compression: %d" % cbytes
# Free the unused data
self.data = <char *>realloc(dest, cbytes)
# Set size info for the instance
blosc_cbuffer_sizes(self.data, &nbytes_, &cbytes, &blocksize)
assert nbytes_ == nbytes
return (cbytes, blocksize)
def getdata(self):
"""Get a compressed string object out of this chunk (for persistence)."""
cdef object string
assert (not self.isconstant,
"This function can only be used for persistency")
string = PyString_FromStringAndSize(self.data, <Py_ssize_t>self.cdbytes)
return string
def getudata(self):
"""Get an uncompressed string out of this chunk (for 'O'bject types)."""
cdef int ret
cdef char *dest
dest = <char *>malloc(self.nbytes)
# Fill dest with uncompressed data
with nogil:
ret = blosc_decompress(self.data, dest, self.nbytes)
if ret < 0:
raise RuntimeError, "fatal error during Blosc decompression: %d" % ret
string = PyString_FromStringAndSize(dest, <Py_ssize_t>self.nbytes)
return string
cdef void _getitem(self, int start, int stop, char *dest):
"""Read data from `start` to `stop` and return it as a numpy array."""
cdef int ret, bsize, blen, nitems, nstart
cdef ndarray constants
blen = stop - start
bsize = blen * self.atomsize
nitems = cython.cdiv(bsize, self.itemsize)
nstart = cython.cdiv(start * self.atomsize, self.itemsize)
if self.isconstant:
# The chunk is made of constants
constants = np.ndarray(shape=(blen,), dtype=self.dtype,
buffer=self.constant, strides=(0,)).copy()
memcpy(dest, constants.data, bsize)
return
# Fill dest with uncompressed data
with nogil:
if bsize == self.nbytes:
ret = blosc_decompress(self.data, dest, bsize)
else:
ret = blosc_getitem(self.data, nstart, nitems, dest)
if ret < 0:
raise RuntimeError, "fatal error during Blosc decompression: %d" % ret
def __getitem__(self, object key):
"""__getitem__(self, key) -> values."""
cdef ndarray array
cdef object start, stop, step, clen, idx
if isinstance(key, (int, long)):
# Quickly return a single element
array = np.empty(shape=(1,), dtype=self.dtype)
self._getitem(key, key+1, array.data)
return PyArray_GETITEM(array, array.data)
elif isinstance(key, slice):
(start, stop, step) = key.start, key.stop, key.step
elif isinstance(key, tuple) and self.dtype.shape != ():
# Build an array to guess indices
clen = cython.cdiv(self.nbytes, self.itemsize)
idx = np.arange(clen, dtype=np.int32).reshape(self.dtype.shape)
idx2 = idx(key)
if idx2.flags.contiguous:
# The slice represents a contiguous slice. Get start and stop.
start, stop = idx2.flatten()[[0,-1]]
step = 1
else:
(start, stop, step) = key[0].start, key[0].stop, key[0].step
else:
raise IndexError, "key not suitable:", key
# Get the corrected values for start, stop, step
clen = cython.cdiv(self.nbytes, self.atomsize)
(start, stop, step) = slice(start, stop, step).indices(clen)
# Build a numpy container
array = np.empty(shape=(stop-start,), dtype=self.dtype)
# Read actual data
self._getitem(start, stop, array.data)
# Return the value depending on the step
if step > 1:
return array[::step]
return array
@property
def pointer(self):
return <Py_uintptr_t> self.data+BLOSCPACK_HEADER_LENGTH
@property
def viewof(self):
return PyBuffer_FromMemory(<void*>self.data, <Py_ssize_t>self.cdbytes)
def __setitem__(self, object key, object value):
"""__setitem__(self, key, value) -> None."""
raise NotImplementedError
def __str__(self):
"""Represent the chunk as an string."""
return str(self[:])
def __repr__(self):
"""Represent the chunk as an string, with additional info."""
cratio = self.nbytes / float(self.cbytes)
fullrepr = "chunk(%s, %s) nbytes: %d; cbytes: %d; ratio: %.2f\n%r" % \
(self.shape, self.dtype, self.nbytes, self.cbytes, cratio, str(self))
return fullrepr
def __dealloc__(self):
"""Release C resources before destruction."""
if self.dobject:
self.dobject = None # DECREF pointer to data object
else:
free(self.data) # explictly free the data area
cdef create_bloscpack_header(nchunks=None, format_version=FORMAT_VERSION):
""" Create the bloscpack header string.
Parameters
----------
nchunks : int
the number of chunks, default: None
format_version : int
the version format for the compressed file
Returns
-------
bloscpack_header : string
the header as string
Notes
-----
The bloscpack header is 16 bytes as follows:
|-0-|-1-|-2-|-3-|-4-|-5-|-6-|-7-|-8-|-9-|-A-|-B-|-C-|-D-|-E-|-F-|
| b l p k | ^ | RESERVED | nchunks |
version
The first four are the magic string 'blpk'. The next one is an 8 bit
unsigned little-endian integer that encodes the format version. The next
three are reserved, and the last eight are a signed 64 bit little endian
integer that encodes the number of chunks
The value of '-1' for 'nchunks' designates an unknown size and can be
inserted by setting 'nchunks' to None.
Raises
------
ValueError
if the nchunks argument is too large or negative
struct.error
if the format_version is too large or negative
"""
if not 0 <= nchunks <= MAX_CHUNKS and nchunks is not None:
raise ValueError(
"'nchunks' must be in the range 0 <= n <= %d, not '%s'" %
(MAX_CHUNKS, str(nchunks)))
return (MAGIC + struct.pack('<B', format_version) + '\x00\x00\x00' +
struct.pack('<q', nchunks if nchunks is not None else -1))
def decode_byte(byte):
return int(byte.encode('hex'), 16)
def decode_uint32(fourbyte):
return struct.unpack('<I', fourbyte)[0]
cdef decode_blosc_header(buffer_):
""" Read and decode header from compressed Blosc buffer.
Parameters
----------
buffer_ : string of bytes
the compressed buffer
Returns
-------
settings : dict
a dict containing the settings from Blosc
Notes
-----
The Blosc 1.1.3 header is 16 bytes as follows:
|-0-|-1-|-2-|-3-|-4-|-5-|-6-|-7-|-8-|-9-|-A-|-B-|-C-|-D-|-E-|-F-|
^ ^ ^ ^ | nbytes | blocksize | ctbytes |
| | | |
| | | +--typesize
| | +------flags
| +----------versionlz
+--------------version
The first four are simply bytes, the last three are are each unsigned ints
(uint32) each occupying 4 bytes. The header is always little-endian.
'ctbytes' is the length of the buffer including header and nbytes is the
length of the data when uncompressed.
"""
return {'version': decode_byte(buffer_[0]),
'versionlz': decode_byte(buffer_[1]),
'flags': decode_byte(buffer_[2]),
'typesize': decode_byte(buffer_[3]),
'nbytes': decode_uint32(buffer_[4:8]),
'blocksize': decode_uint32(buffer_[8:12]),
'ctbytes': decode_uint32(buffer_[12:16])}
cdef class chunks(object):
"""Store the different carray chunks in a directory on-disk."""
cdef object _rootdir, _mode
cdef object dtype, cparams, lastchunkarr
cdef object chunk_cached
cdef npy_intp nchunks, nchunk_cached, len
property mode:
"The mode used to create/open the `mode`."
def __get__(self):
return self._mode
def __set__(self, value):
self._mode = value
property rootdir:
"The on-disk directory used for persistency."
def __get__(self):
return self._rootdir
def __set__(self, value):
self._rootdir = value
property datadir:
"""The directory for data files."""
def __get__(self):
return os.path.join(self.rootdir, DATA_DIR)
def __cinit__(self, rootdir, metainfo=None, _new=False):
cdef ndarray lastchunkarr
cdef void *decompressed, *compressed
cdef int leftover
cdef char *lastchunk
cdef size_t chunksize
cdef object scomp
cdef int ret
cdef int itemsize, atomsize
self._rootdir = rootdir
self.nchunks = 0
self.nchunk_cached = -1 # no chunk cached initially
self.dtype, self.cparams, self.len, lastchunkarr, self._mode = metainfo
atomsize = self.dtype.itemsize
itemsize = self.dtype.base.itemsize
# For 'O'bject types, the number of chunks is equal to the number of
# elements
if self.dtype.char == 'O':
self.nchunks = self.len
# Initialize last chunk (not valid for 'O'bject dtypes)
if not _new and self.dtype.char != 'O':
self.nchunks = cython.cdiv(self.len, len(lastchunkarr))
chunksize = len(lastchunkarr) * atomsize
lastchunk = lastchunkarr.data
leftover = (self.len % len(lastchunkarr)) * atomsize
if leftover:
# Fill lastchunk with data on disk
scomp = self.read_chunk(self.nchunks)
compressed = PyString_AsString(scomp)
with nogil:
ret = blosc_decompress(compressed, lastchunk, chunksize)
if ret < 0:
raise RuntimeError(
"error decompressing the last chunk (error code: %d)" % ret)
cdef read_chunk(self, nchunk):
"""Read a chunk and return it in compressed form."""
dname = "__%d%s" % (nchunk, EXTENSION)
schunkfile = os.path.join(self.datadir, dname)
if not os.path.exists(schunkfile):
raise ValueError("chunkfile %s not found" % schunkfile)
with open(schunkfile, 'rb') as schunk:
bloscpack_header = schunk.read(BLOSCPACK_HEADER_LENGTH)
blosc_header_raw = schunk.read(BLOSC_HEADER_LENGTH)
blosc_header = decode_blosc_header(blosc_header_raw)
ctbytes = blosc_header['ctbytes']
nbytes = blosc_header['nbytes']
# seek back BLOSC_HEADER_LENGTH bytes in file relative to current
# position
schunk.seek(-BLOSC_HEADER_LENGTH, 1)
scomp = schunk.read(ctbytes)
return scomp
def __getitem__(self, nchunk):
cdef void *decompressed, *compressed
if nchunk == self.nchunk_cached:
# Hit!
return self.chunk_cached
else:
scomp = self.read_chunk(nchunk)
# Data chunk should be compressed already
chunk_ = chunk(scomp, self.dtype, self.cparams,
_memory=False, _compr=True)
# Fill cache
self.nchunk_cached = nchunk
self.chunk_cached = chunk_
return chunk_
def __setitem__(self, nchunk, chunk_):
self._save(nchunk, chunk_)
def __len__(self):
return self.nchunks
def append(self, chunk_):
"""Append an new chunk to the carray."""
self._save(self.nchunks, chunk_)
self.nchunks += 1
cdef _save(self, nchunk, chunk_):
"""Save the `chunk_` as chunk #`nchunk`. """
if self.mode == "r":
raise RuntimeError(
"cannot modify data because mode is '%s'" % self.mode)
dname = "__%d%s" % (nchunk, EXTENSION)
schunkfile = os.path.join(self.datadir, dname)
bloscpack_header = create_bloscpack_header(1)
with open(schunkfile, 'wb') as schunk:
schunk.write(bloscpack_header)
data = chunk_.getdata()
schunk.write(data)
# Mark the cache as dirty if needed
if nchunk == self.nchunk_cached:
self.nchunk_cached = -1
def flush(self, chunk_):
"""Flush the leftover chunk."""
self._save(self.nchunks, chunk_)
def pop(self):
"""Remove the last chunk and return it."""
nchunk = self.nchunks - 1
chunk_ = self.__getitem__(nchunk)
dname = "__%d%s" % (nchunk, EXTENSION)
schunkfile = os.path.join(self.datadir, dname)
if not os.path.exists(schunkfile):
raise RuntimeError("chunk filename %s does exist" % schunkfile)
os.remove(schunkfile)
# When poping a chunk, we must be sure that we don't leave anything
# behind (i.e. the lastchunk)
dname = "__%d%s" % (nchunk+1, EXTENSION)
schunkfile = os.path.join(self.datadir, dname)
if os.path.exists(schunkfile):
os.remove(schunkfile)
self.nchunks -= 1
return chunk_
cdef class carray:
"""
carray(array, cparams=None, dtype=None, dflt=None, expectedlen=None, chunklen=None, rootdir=None, mode='a')
A compressed and enlargeable in-memory data container.
`carray` exposes a series of methods for dealing with the compressed
container in a NumPy-like way.
Parameters
----------
array : a NumPy-like object
This is taken as the input to create the carray. It can be any Python
object that can be converted into a NumPy object. The data type of
the resulting carray will be the same as this NumPy object.
cparams : instance of the `cparams` class, optional
Parameters to the internal Blosc compressor.
dtype : NumPy dtype
Force this `dtype` for the carray (rather than the `array` one).
dflt : Python or NumPy scalar
The value to be used when enlarging the carray. If None, the default is
filling with zeros.
expectedlen : int, optional
A guess on the expected length of this object. This will serve to
decide the best `chunklen` used for compression and memory I/O
purposes.
chunklen : int, optional
The number of items that fits into a chunk. By specifying it you can
explicitely set the chunk size used for compression and memory I/O.
Only use it if you know what are you doing.
rootdir : str, optional
The directory where all the data and metadata will be stored. If
specified, then the carray object will be disk-based (i.e. all chunks
will live on-disk, not in memory) and persistent (i.e. it can be
restored in other session, e.g. via the `open()` top-level function).
mode : str, optional
The mode that a *persistent* carray should be created/opened. The
values can be:
* 'r' for read-only
* 'w' for read/write. During carray creation, the `rootdir` will be
removed if it exists. During carray opening, the carray will be
resized to 0.
* 'a' for append (possible data inside `rootdir` will not be removed).
"""
cdef public int itemsize, atomsize
cdef int _chunksize, _chunklen, leftover
cdef int nrowsinbuf, _row
cdef int sss_mode, wheretrue_mode, where_mode
cdef npy_intp startb, stopb
cdef npy_intp start, stop, step, nextelement
cdef npy_intp _nrow, nrowsread
cdef npy_intp _nbytes, _cbytes
cdef npy_intp nhits, limit, skip
cdef npy_intp expectedlen
cdef char *lastchunk
cdef object lastchunkarr, where_arr, arr1
cdef object _cparams, _dflt
cdef object _dtype
cdef public object chunks
cdef object _rootdir, datadir, metadir, _mode
cdef object _attrs
cdef ndarray iobuf, where_buf
# For block cache
cdef int idxcache
cdef ndarray blockcache
cdef char *datacache
property leftovers:
def __get__(self):
# Pointer to the leftovers chunk
return self.lastchunkarr.ctypes.data
property nchunks:
def __get__(self):
# TODO: do we need to handle the last chunk specially?
return <npy_intp>cython.cdiv(self._nbytes, self._chunksize)
property partitions:
def __get__(self):
# Return a sequence of tuples indicating the bounds
# of each of the chunks.
nchunks = <npy_intp>cython.cdiv(self._nbytes, self._chunksize)
chunklen = cython.cdiv(self._chunksize, self.atomsize)
return [(i*chunklen,(i+1)*chunklen) for i in xrange(nchunks)]
property leftover_array:
def __get__(self):
return self.lastchunkarr
property attrs:
"The attribute accessor."
def __get__(self):
return self._attrs
property cbytes:
"The compressed size of this object (in bytes)."
def __get__(self):
return self._cbytes
property chunklen:
"The chunklen of this object (in rows)."
def __get__(self):
return self._chunklen
property cparams:
"The compression parameters for this object."
def __get__(self):
return self._cparams
property dflt:
"The default value of this object."
def __get__(self):
return self._dflt
property dtype:
"The dtype of this object."
def __get__(self):
return self._dtype.base
property len:
"The length (leading dimension) of this object."
def __get__(self):
if self._dtype.char == 'O':
return len(self.chunks)
else:
# Important to do the cast in order to get a npy_intp result
return <npy_intp>cython.cdiv(self._nbytes, self.atomsize)
property mode:
"The mode used to create/open the `mode`."
def __get__(self):
return self._mode
def __set__(self, value):
self._mode = value
self.chunks.mode = value
property nbytes:
"The original (uncompressed) size of this object (in bytes)."
def __get__(self):
return self._nbytes
property ndim:
"The number of dimensions of this object."
def __get__(self):
return len(self.shape)
property shape:
"The shape of this object."
def __get__(self):
return tuple((self.len,) + self._dtype.shape)
property size:
"The size of this object."
def __get__(self):
return np.prod(self.shape)
property rootdir:
"The on-disk directory used for persistency."
def __get__(self):
return self._rootdir
def __set__(self, value):
if not self.rootdir:
raise ValueError(
"cannot modify the rootdir value of an in-memory carray")
self._rootdir = value
self.chunks.rootdir = value
def __cinit__(self, object array=None, object cparams=None,
object dtype=None, object dflt=None,
object expectedlen=None, object chunklen=None,
object rootdir=None, object mode="a"):
self._rootdir = rootdir
if mode not in ('r', 'w', 'a'):
raise ValueError("mode should be 'r', 'w' or 'a'")
self._mode = mode
if array is not None:
self.create_carray(array, cparams, dtype, dflt,
expectedlen, chunklen, rootdir, mode)
_new = True
elif rootdir is not None:
meta_info = self.read_meta()
self.open_carray(*meta_info)
_new = False
else:
raise ValueError("You need at least to pass an array or/and a rootdir")
# Attach the attrs to this object
self._attrs = attrs.attrs(self._rootdir, self.mode, _new=_new)
# Cache a len-1 array for accelerating self[int] case
self.arr1 = np.empty(shape=(1,), dtype=self._dtype)
# Sentinels
self.sss_mode = False
self.wheretrue_mode = False
self.where_mode = False
self.idxcache = -1 # cache not initialized
cdef _adapt_dtype(self, dtype, shape):
"""adapt the dtype to one supported in carray.
returns the adapted type with the shape modified accordingly.
"""
if dtype.hasobject:
if dtype != np.object_:
raise TypeError, repr(dtype) + ' is not a supported dtype'
else:
dtype = np.dtype((dtype, shape[1:]))
return dtype
def create_carray(self, array, cparams, dtype, dflt,
expectedlen, chunklen, rootdir, mode):
"""Create a new array.
There are restrictions creating carray objects with dtypes that have objects
(dtype.hasobject is True). The only case where this dtype is supported is
on unidimensionl arrays whose dtype is object (objects in composite dtypes
are not supported).
"""
cdef int itemsize, atomsize, chunksize
cdef ndarray lastchunkarr
cdef object array_, _dflt
# Check defaults for cparams
if cparams is None:
cparams = ca.cparams()
if not isinstance(cparams, ca.cparams):
raise ValueError, "`cparams` param must be an instance of `cparams` class"
# Convert input to an appropriate type
if type(dtype) is str:
dtype = np.dtype(dtype)
array_ = utils.to_ndarray(array, dtype)
# if no base dtype is provided, use the dtype from the array.
if dtype is None:
dtype = array_.dtype.base
# Multidimensional array. The atom will have array_.shape[1:] dims.
# atom dimensions will be stored in `self._dtype`, which is different
# than `self.dtype` in that `self._dtype` dimensions are borrowed
# from `self.shape`. `self.dtype` will always be scalar (NumPy
# convention).
#
# Note that objects are a special case. Carray does not support object
# arrays of more than one dimensions.
self._dtype = dtype = self._adapt_dtype(dtype, array_.shape)
# Check that atom size is less than 2 GB
if dtype.itemsize >= 2**31:
raise ValueError, "atomic size is too large (>= 2 GB)"
self.atomsize = atomsize = dtype.itemsize
self.itemsize = itemsize = dtype.base.itemsize
# Check defaults for dflt
_dflt = np.zeros((), dtype=dtype)
if dflt is not None:
if dtype.shape == ():
_dflt[()] = dflt
else:
_dflt[:] = dflt
self._dflt = _dflt
# Compute the chunklen/chunksize
if expectedlen is None:
# Try a guess
try:
expectedlen = len(array_)
except TypeError:
raise NotImplementedError(
"creating carrays from scalar objects not supported")
try:
self.expectedlen = expectedlen
except OverflowError:
raise OverflowError(
"The size cannot be larger than 2**31 on 32-bit platforms")
if chunklen is None:
# Try a guess
chunksize = utils.calc_chunksize((expectedlen * atomsize) / float(_MB))
# Chunksize must be a multiple of atomsize
chunksize = cython.cdiv(chunksize, atomsize) * atomsize
# Protection against large itemsizes
if chunksize < atomsize:
chunksize = atomsize
else:
if not isinstance(chunklen, int) or chunklen < 1:
raise ValueError, "chunklen must be a positive integer"
chunksize = chunklen * atomsize
chunklen = cython.cdiv(chunksize, atomsize)
self._chunksize = chunksize
self._chunklen = chunklen