In [1]:
import fitsio

import numpy as np
import mmap
from posix_ipc import Semaphore, O_CREX, ExistentialError, O_CREAT, SharedMemory, unlink_shared_memory
from ctypes import sizeof, memmove, addressof, create_string_buffer
from structures import MD
from ctypes import Structure, c_int32, c_int64, c_char_p, c_char


## Defining the structure that holds the meta-data

In [2]:
class MD(Structure):
    _fields_ = [
        ('byte_size', c_int64),
        ('data_shape', c_int64),
        ('S2_byte_size', c_int32),
        ('S2_shape', c_int32),

    ]
    
md_buf = create_string_buffer(sizeof(MD))

## Loading the fits file

In [3]:
# %%timeit -t -n 2 -r 2

# tfile = "MTL_535700SGA_97percent_2465674_gridpoints_main.fits"   # 301 MB
tfile = "MTL_all_SV0_ELG_tiles_0.37.0.fits"    # 6.9 GB

fits   = fitsio.FITS(tfile, mode="r")
nrows  = fits[1].get_nrows()
header = fits[1].read_header()
data   = fits[1].read(rows=None, dtype=np.int64)

  if __name__ == '__main__':


In [9]:
N=len(data)
data = data[:4000000]
N

4000000

## Meta-data
Here meta-data is the size of the data-array, data types, byte-size of the array and so on

In [10]:
POSIX_name = 'MTL'

byte_size = data.nbytes
data_type = str(data.dtype)
data_shape = data.shape[0]

S2 = np.asarray([ord(x) for x in list(data_type)], dtype='i4')
S2_shape = len(S2)
S2_byte_size = S2.nbytes

## Creating the shared memory 
POSIX memory

In [11]:
md_region = SharedMemory(POSIX_name + '-meta', O_CREAT, size=sizeof(MD))
md_buf_ = mmap.mmap(md_region.fd, md_region.size)
md_region.close_fd()

In [12]:
try:
    sem = Semaphore(POSIX_name, O_CREX)
    
except ExistentialError:
    sem = Semaphore(POSIX_name, O_CREAT)
    sem.unlink()
    sem = Semaphore(POSIX_name, O_CREX)
sem.release()

In [13]:
try:
    sem2 = Semaphore(POSIX_name+'2', O_CREX)
    
except ExistentialError:
    sem2 = Semaphore(POSIX_name+'2', O_CREAT)
    sem2.unlink()
    sem2 = Semaphore(POSIX_name+'2', O_CREX)
sem2.release()

In [14]:
print(byte_size, data_shape)
print(S2_byte_size, S2_shape)

1936000000 4000000
10484 2621


## Populating the meta-data portion in the sahred memory

In [15]:
md = MD(byte_size, data_shape, S2_byte_size, S2_shape)
memmove(md_buf, addressof(md), sizeof(md))
md_buf_[:] = bytes(md_buf)

## Populating the data portion in the sahred memory
shm_region hold the data array

In [16]:
shm_region = None


if not shm_region:
    shm_region = SharedMemory(POSIX_name, O_CREAT, size=byte_size)
    shm_buf = mmap.mmap(shm_region.fd, byte_size)
    shm_region.close_fd()

sem.acquire()
shm_buf[:] = data.tobytes()
sem.release()

shm_region2 is similar to shm_region but it holds the string of data types to be used in the reader code

In [17]:
shm_region2 = None


if not shm_region2:
    shm_region2 = SharedMemory(POSIX_name+'2', O_CREAT, size=S2_byte_size)
    shm_buf2 = mmap.mmap(shm_region2.fd, S2_byte_size)
    shm_region2.close_fd()

sem2.acquire()
shm_buf2[:] = S2.tobytes()
sem2.release()

In [18]:
print(byte_size)
print(data_type)
print(data_shape)




1936000000
[('NUMOBS_MORE', '>i8'), ('RELEASE', '>i2'), ('BRICKID', '>i4'), ('BRICKNAME', '<U8'), ('BRICK_OBJID', '>i4'), ('MORPHTYPE', '<U4'), ('RA', '>f8'), ('RA_IVAR', '>f4'), ('DEC', '>f8'), ('DEC_IVAR', '>f4'), ('EBV', '>f4'), ('FLUX_G', '>f4'), ('FLUX_R', '>f4'), ('FLUX_Z', '>f4'), ('FLUX_IVAR_G', '>f4'), ('FLUX_IVAR_R', '>f4'), ('FLUX_IVAR_Z', '>f4'), ('MW_TRANSMISSION_G', '>f4'), ('MW_TRANSMISSION_R', '>f4'), ('MW_TRANSMISSION_Z', '>f4'), ('FRACFLUX_G', '>f4'), ('FRACFLUX_R', '>f4'), ('FRACFLUX_Z', '>f4'), ('FRACMASKED_G', '>f4'), ('FRACMASKED_R', '>f4'), ('FRACMASKED_Z', '>f4'), ('FRACIN_G', '>f4'), ('FRACIN_R', '>f4'), ('FRACIN_Z', '>f4'), ('NOBS_G', '>i2'), ('NOBS_R', '>i2'), ('NOBS_Z', '>i2'), ('PSFDEPTH_G', '>f4'), ('PSFDEPTH_R', '>f4'), ('PSFDEPTH_Z', '>f4'), ('GALDEPTH_G', '>f4'), ('GALDEPTH_R', '>f4'), ('GALDEPTH_Z', '>f4'), ('FLUX_W1', '>f4'), ('FLUX_W2', '>f4'), ('FLUX_W3', '>f4'), ('FLUX_W4', '>f4'), ('FLUX_IVAR_W1', '>f4'), ('FLUX_IVAR_W2', '>f4'), ('FLUX_IVAR_W3', 

In [19]:
S2[:10]


array([91, 40, 39, 78, 85, 77, 79, 66, 83, 95], dtype=int32)

In [20]:
len(S2)

2621

In [21]:
data[:3]

array([(100, 8001, 638477, '2713p682', 1784, 'PSF', 271.57822094, 1.1226929e+14, 68.23066885, 1.1204010e+14, 0.04494829, 13.377395, 12.291068,   8.760663, 245.06502 , 118.429504, 71.51117 , 0.875416 , 0.91427064, 0.9511019, 2.6097623e-04, 1.7119080e-03, 8.2329474e-04, 0.00389323, 4.0909229e-03, 0.01162072, 0.9997574 , 0.9998439 , 0.99894214, 3, 3, 3, 663.42334, 205.63574, 79.37791 , 483.33347, 151.64256, 43.039734,  2.222856,  1.387598, -11.353064 ,   3.7152333, 26.60402 , 4.6205597, 0.02932631, 0.00034518, 0.99241155, 0.99533284, 0.99900275, 0.99962336, 0, 0, 0, 10.414153,  9.5684595,  6.8200793, 10.414153,  9.5684595,  6.8200793, 2015.5, 0, 0, 0, 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 2258343419699174784, 'G2', 19.754745, 210.92673, 19.676823,  20.26173, 19.599525,  16.327147, 1.2792153, 0.        , False, 0.5978343 ,  True, 2.4843044 ,   7.7688184, -4.6271515,  2.2121403, 0.49411172,  2.7031229, 'N', 35191448101979896,        8225, 0.59664697, 65535, 3050, 1, 16046,

# KILL POSIX Data

In [22]:
try:
    sem.acquire()
    md_buf_.close()
    shm_buf.close()
    unlink_shared_memory(POSIX_name)
    sem.release()
    sem.close()
    
    
    sem2.acquire()
    shm_buf2.close()
    unlink_shared_memory(POSIX_name+'2')
    sem2.release()
    sem2.close()    
    
except ExistentialError:
    pass

In [120]:
ord('E')
chr(69)

'E'