# Ingest

Ingest all *.rdf files recursively, convert to a structured numpy array and upload to S3 for easy public download via numpy.DataSource().open(url)

See [file formats](https://github.com/braingeneers/braingeneers/wiki/File-Formats) for details on the source file format and [layout](https://github.com/braingeneers/braingeneers/wiki/data-repository-layout) for details on the repository structure.

Note: The original Intan code saves the data as float64 which effectively increaes the size of the data by 4x vs its original source as 16 bit samples. The below code is modified to keep things in 16 bit form and as a result does not scale the amplifier_data structure leaving that to the analysis phase. There is still significant overhead added vs. the raw file (82M vs. 44M) but its much better then just storing the float64

In [3]:
import os
import sys
import glob
import json
import datetime
import numpy as np

In [4]:
#! /bin/env python
#
# Michael Gibson 17 July 2015
# Modified Adrian Foy Sep 2018
# Modified to leave amplifier_data as 16 bit unsigned int and to be less chatty
# This is all a bit of a hack IMHO and needs to be re-structured/cleaned up from the source intan code

import sys, struct, math, os, time
import numpy as np

from intanutil.read_header import read_header
from intanutil.get_bytes_per_data_block import get_bytes_per_data_block
from intanutil.read_one_data_block import read_one_data_block
from intanutil.notch_filter import notch_filter
from intanutil.data_to_result import data_to_result

def read_data(filename):
    """
    Reads Intan Technologies RHD2000 data file generated by evaluation board GUI.
    Data are returned in a dictionary, for future extensibility.
    """

    tic = time.time()
    fid = open(filename, 'rb')
    filesize = os.path.getsize(filename)

    header = read_header(fid)

    print('{} amplifier channels'.format(header['num_amplifier_channels']))
    print('{} auxiliary input channels'.format(header['num_aux_input_channels']))
    print('{} supply voltage channels'.format(header['num_supply_voltage_channels']))
    print('{} board ADC channels'.format(header['num_board_adc_channels']))
    print('{} board digital input channels'.format(header['num_board_dig_in_channels']))
    print('{} board digital output channels'.format(header['num_board_dig_out_channels']))
    print('{} temperature sensors channels'.format(header['num_temp_sensor_channels']))

    # Determine how many samples the data file contains.
    bytes_per_block = get_bytes_per_data_block(header)

    # How many data blocks remain in this file?
    data_present = False
    bytes_remaining = filesize - fid.tell()
    if bytes_remaining > 0:
        data_present = True

    if bytes_remaining % bytes_per_block != 0:
        raise Exception('Something is wrong with file size : should have a whole number of data blocks')

    num_data_blocks = int(bytes_remaining / bytes_per_block)

    num_amplifier_samples = header['num_samples_per_data_block'] * num_data_blocks
    num_aux_input_samples = int((header['num_samples_per_data_block'] / 4) * num_data_blocks)
    num_supply_voltage_samples = 1 * num_data_blocks
    num_board_adc_samples = header['num_samples_per_data_block'] * num_data_blocks
    num_board_dig_in_samples = header['num_samples_per_data_block'] * num_data_blocks
    num_board_dig_out_samples = header['num_samples_per_data_block'] * num_data_blocks

    record_time = num_amplifier_samples / header['sample_rate']

    if data_present:
        print('File contains {:0.3f} seconds of data.  Amplifiers were sampled at {:0.2f} kS/s.'.format(record_time, header['sample_rate'] / 1000))
    else:
        print('Header file contains no data.  Amplifiers were sampled at {:0.2f} kS/s.'.format(header['sample_rate'] / 1000))

    if data_present:
        # Pre-allocate memory for data.
        print('')
        print('Allocating memory for data...')

        data = {}
        if (header['version']['major'] == 1 and header['version']['minor'] >= 2) or (header['version']['major'] > 1):
            data['t_amplifier'] = np.zeros(num_amplifier_samples, dtype=np.int)
        else:
            data['t_amplifier'] = np.zeros(num_amplifier_samples, dtype=np.uint)

        # NOTE: Changed from uint to uint16
        data['amplifier_data'] = np.zeros([header['num_amplifier_channels'], num_amplifier_samples], dtype=np.uint16)
        data['aux_input_data'] = np.zeros([header['num_aux_input_channels'], num_aux_input_samples], dtype=np.uint)
        data['supply_voltage_data'] = np.zeros([header['num_supply_voltage_channels'], num_supply_voltage_samples], dtype=np.uint)
        data['temp_sensor_data'] = np.zeros([header['num_temp_sensor_channels'], num_supply_voltage_samples], dtype=np.uint)
        data['board_adc_data'] = np.zeros([header['num_board_adc_channels'], num_board_adc_samples], dtype=np.uint)
        
        # by default, this script interprets digital events (digital inputs and outputs) as booleans
        # if unsigned int values are preferred(0 for False, 1 for True), replace the 'dtype=np.bool' argument with 'dtype=np.uint' as shown
        # the commented line below illustrates this for digital input data; the same can be done for digital out
        
        #data['board_dig_in_data'] = np.zeros([header['num_board_dig_in_channels'], num_board_dig_in_samples], dtype=np.uint)
        data['board_dig_in_data'] = np.zeros([header['num_board_dig_in_channels'], num_board_dig_in_samples], dtype=np.bool)
        data['board_dig_in_raw'] = np.zeros(num_board_dig_in_samples, dtype=np.uint)
        
        data['board_dig_out_data'] = np.zeros([header['num_board_dig_out_channels'], num_board_dig_out_samples], dtype=np.bool)
        data['board_dig_out_raw'] = np.zeros(num_board_dig_out_samples, dtype=np.uint)

        # Read sampled data from file.
        print('Reading data from file...')

        # Initialize indices used in looping
        indices = {}
        indices['amplifier'] = 0
        indices['aux_input'] = 0
        indices['supply_voltage'] = 0
        indices['board_adc'] = 0
        indices['board_dig_in'] = 0
        indices['board_dig_out'] = 0

        print_increment = 10
        percent_done = print_increment
        for i in range(num_data_blocks):
            read_one_data_block(data, header, indices, fid)

            # Increment indices
            indices['amplifier'] += header['num_samples_per_data_block']
            indices['aux_input'] += int(header['num_samples_per_data_block'] / 4)
            indices['supply_voltage'] += 1
            indices['board_adc'] += header['num_samples_per_data_block']
            indices['board_dig_in'] += header['num_samples_per_data_block']
            indices['board_dig_out'] += header['num_samples_per_data_block']            

            fraction_done = 100 * (1.0 * i / num_data_blocks)
            if fraction_done >= percent_done:
                print('.', end='', flush=True)
#                 print('{}% done...'.format(percent_done))
                percent_done = percent_done + print_increment

        # Make sure we have read exactly the right amount of data.
        bytes_remaining = filesize - fid.tell()
        if bytes_remaining != 0: raise Exception('Error: End of file not reached.')

    # Close data file.
    fid.close()

    if (data_present):
        print('Parsing data...')

        # Extract digital input channels to separate variables.
        for i in range(header['num_board_dig_in_channels']):
            data['board_dig_in_data'][i, :] = np.not_equal(np.bitwise_and(data['board_dig_in_raw'], (1 << header['board_dig_in_channels'][i]['native_order'])), 0)

        # Extract digital output channels to separate variables.
        for i in range(header['num_board_dig_out_channels']):
            data['board_dig_out_data'][i, :] = np.not_equal(np.bitwise_and(data['board_dig_out_raw'], (1 << header['board_dig_out_channels'][i]['native_order'])), 0)

        # Scale voltage levels appropriately.
        # NOTE: Commented out to reduce size of file by 4x by storing 16 bit ints in the resulting .npy
#         data['amplifier_data'] = np.multiply(0.195, (data['amplifier_data'].astype(np.int32) - 32768))      # units = microvolts
        data['aux_input_data'] = np.multiply(37.4e-6, data['aux_input_data'])               # units = volts
        data['supply_voltage_data'] = np.multiply(74.8e-6, data['supply_voltage_data'])     # units = volts
        if header['eval_board_mode'] == 1:
            data['board_adc_data'] = np.multiply(152.59e-6, (data['board_adc_data'].astype(np.int32) - 32768)) # units = volts
        elif header['eval_board_mode'] == 13:
            data['board_adc_data'] = np.multiply(312.5e-6, (data['board_adc_data'].astype(np.int32) - 32768)) # units = volts
        else:
            data['board_adc_data'] = np.multiply(50.354e-6, data['board_adc_data'])           # units = volts
        data['temp_sensor_data'] = np.multiply(0.01, data['temp_sensor_data'])               # units = deg C

        # Check for gaps in timestamps.
        num_gaps = np.sum(np.not_equal(data['t_amplifier'][1:]-data['t_amplifier'][:-1], 1))
        assert num_gaps == 0  # We don't handle missing samples in all our downstream analysis

        # Scale time steps (units = seconds).
        data['t_amplifier'] = data['t_amplifier'] / header['sample_rate']
        data['t_aux_input'] = data['t_amplifier'][range(0, len(data['t_amplifier']), 4)]
        data['t_supply_voltage'] = data['t_amplifier'][range(0, len(data['t_amplifier']), header['num_samples_per_data_block'])]
        data['t_board_adc'] = data['t_amplifier']
        data['t_dig'] = data['t_amplifier']
        data['t_temp_sensor'] = data['t_supply_voltage']

        # If the software notch filter was selected during the recording, apply the
        # same notch filter to amplifier data here.
        assert header['notch_filter_frequency'] == 0
        if header['notch_filter_frequency'] > 0:
            print('Applying notch filter...')

            print_increment = 10
            percent_done = print_increment
            for i in range(header['num_amplifier_channels']):
                data['amplifier_data'][i,:] = notch_filter(data['amplifier_data'][i,:], header['sample_rate'], header['notch_filter_frequency'], 10)

                fraction_done = 100 * (i / header['num_amplifier_channels'])
                if fraction_done >= percent_done:
                    print('{}% done...'.format(percent_done))
                    percent_done += print_increment
    else:
        data = [];

    # Move variables to result struct.
    result = data_to_result(header, data, data_present)

    print('Done!  Elapsed time: {0:0.1f} seconds'.format(time.time() - tic))
    return result

In [31]:
guid = "2018-12-14"  # ISO 8601
date = "2018-12-14"

os.chdir("/public/groups/braingeneers/archive")
os.makedirs("derived/{}".format(guid), exist_ok=True)

for path in glob.glob("original/{}/**/*.rhd".format(guid), recursive=True):
    print(path)

    data = read_data(path)
    
    signal = data["amplifier_data"]
    
    # Only include smaller items ie redact the actual data fields
    metadata = {k: v for k, v in data.items() if sys.getsizeof(v) < 8192}
     
    metadata["date"] = date
    metadata["source_file_path"] = path  # Add annotation of source file path and name relative to archive/

    dest = "derived/{}/{}".format(guid, os.path.basename(path)[:-4].replace(" ","-"))
    print("Saving numpy and json to {}".format(dest))
    np.save(dest + ".npy", signal)
    with open(dest + ".json", "w") as f:
        json.dump(metadata, f, sort_keys=True)  # Sort so consequitive runs generate identical output
    print()

original/2018-12-14/R_UCSC355A_64D26_181214_132046.rhd

Reading Intan Technologies RHD2000 Data File, Version 1.4

n signal groups 7
64 amplifier channels
0 auxiliary input channels
0 supply voltage channels
0 board ADC channels
3 board digital input channels
0 board digital output channels
0 temperature sensors channels
File contains 60.012 seconds of data.  Amplifiers were sampled at 20.00 kS/s.

Allocating memory for data...
Reading data from file...
.........Parsing data...
Done!  Elapsed time: 8.1 seconds
Saving numpy and json to derived/2018-12-14/R_UCSC355A_64D26_181214_132046

original/2018-12-14/R_UCSC355A_64D26_181214_132146.rhd

Reading Intan Technologies RHD2000 Data File, Version 1.4

n signal groups 7
64 amplifier channels
0 auxiliary input channels
0 supply voltage channels
0 board ADC channels
3 board digital input channels
0 board digital output channels
0 temperature sensors channels
File contains 60.012 seconds of data.  Amplifiers were sampled at 20.00 kS/s.

Alloca

In [33]:
# Upload all of the .npy files to S3 replacing spaces with -
!aws --profile {os.getenv("AWS_PROFILE")} --endpoint {os.getenv("AWS_S3_ENDPOINT")} \
    s3 sync /public/groups/braingeneers/archive/derived/'{guid}' \
    s3://braingeneers/archive/derived/{guid} \
    --delete --acl public-read

upload: derived/2018-12-14/R_UCSC355A_64D26_181214_132346.json to s3://braingeneers/archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132346.json
upload: derived/2018-12-14/R_UCSC355A_64D26_181214_132246.json to s3://braingeneers/archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132246.json
upload: derived/2018-12-14/R_UCSC355A_64D26_181214_132046.json to s3://braingeneers/archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132046.json
upload: derived/2018-12-14/R_UCSC355A_64D26_181214_132146.json to s3://braingeneers/archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132146.json
upload: derived/2018-12-14/R_UCSC355A_64D26_181214_132446.json to s3://braingeneers/archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132446.json
upload: derived/2018-12-14/R_UCSC355A_64D26_181214_132046.npy to s3://braingeneers/archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132046.npy
upload: derived/2018-12-14/R_UCSC355A_64D26_181214_132546.json to s3://braingeneers/archive/derived/2018-12-14/R_UCSC355

In [34]:
!aws --profile {os.getenv("AWS_PROFILE")} --endpoint {os.getenv("AWS_S3_ENDPOINT")} \
    s3 ls --recursive s3://braingeneers/archive/derived/{guid}

2019-01-10 22:47:24      28108 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132046.json
2019-01-10 22:47:36  153630848 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132046.npy
2019-01-10 22:47:24      28108 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132146.json
2019-01-10 22:47:38  153630848 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132146.npy
2019-01-10 22:47:24      28109 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132246.json
2019-01-10 22:47:39  153630848 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132246.npy
2019-01-10 22:47:24      28106 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132346.json
2019-01-10 22:47:39  153630848 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132346.npy
2019-01-10 22:47:25      28108 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132446.json
2019-01-10 22:47:40  153630848 archive/derived/2018-12-14/R_UCSC355A_64D26_181214_132446.npy
2019-01-10 22:47:36      28108 archive/derived/2018-12-

In [35]:
# Mirror local to prp s3
!aws --profile {os.getenv("AWS_PROFILE")} --endpoint {os.getenv("AWS_S3_ENDPOINT")} \
    s3 sync /public/groups/braingeneers/archive/ s3://braingeneers/archive/ \
    --delete --acl public-read --dryrun

In [47]:
# Read a single numpy file in directly from S3 to verify its the same as above
with np.DataSource(None).open("https://s3.nautilus.optiputer.net/braingeneers/archive/derived/2018-12-11/organoid-1-recording-background-100Hz-filter_181208_130030.npy", "rb") as f:
    signal = np.load(f)

signal == derived

array([[ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       ...,
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True],
       [ True,  True,  True, ...,  True,  True,  True]])