In [44]:
from nd2reader import ND2Reader
import numpy as np
import pandas as pd
import datetime
import json
import re
from pathlib import Path
from typing import List, Union

In [315]:
def split_acquisition_metadata_planes(l):
    # https://stackoverflow.com/questions/69832116/split-a-list-into-sublists-based-on-the-value-of-an-element
    x = [i for i, s in enumerate(l) if re.search("^Plane",s.lstrip())]
    y = x[1:] + [len(l)]
    z = [l[i:j] for i, j in zip(x, y)]
    return(z)


def parse_additional_metadata(acq_metadata):
    metadata_planes = split_acquisition_metadata_planes(
        acq_metadata['TextInfoItem_5'].split('\r\n')
    )
    metadata = ['\\n'.join(plane) for plane in metadata_planes]
    metadata = [s.replace(',',';') for s in metadata]
    return(metadata)


def get_start_time_abs(metadata_dict,common_metadata):
    start_time_abs = metadata_dict['date']

    if (start_time_abs is None):
        start_time_abs = common_metadata['TextInfoItem_9']
        start_time_abs = datetime.datetime.strptime(start_time_abs, '%d/%m/%Y  %I:%M:%S %p')
    return(start_time_abs)
    

def get_standard_field_id_mapping(df):
    image_metadata = df
    image_metadata = image_metadata[['field_id','stage_x_abs','stage_y_abs']].groupby('field_id').mean()
    image_metadata[['stage_x_abs','stage_y_abs']] = image_metadata[['stage_x_abs','stage_y_abs']].round()
    image_metadata['XYCoordinates']= image_metadata[['stage_x_abs','stage_y_abs']].apply(tuple, axis=1)
    image_metadata = image_metadata.reset_index()

    # Number fields from top-left to bottom-right (increase x first)
    unique_int_coords_sorted = sorted(list(set(image_metadata['XYCoordinates'])) , key=lambda k: [-k[1], k[0]])
    coord_index = dict(zip(unique_int_coords_sorted, ["%0d" %i for i in range(1,len(unique_int_coords_sorted)+1)])) 

    # keep this as StandardFieldID
    image_metadata['standard_field_id'] = image_metadata['XYCoordinates'].map(coord_index)
    return(image_metadata[['field_id','standard_field_id']])
    
    
def extract_metadata_and_save(
    in_file_path : Union[str,Path],
    out_path : Union[str,Path]):
        
    nd2_file = ND2Reader(in_file_path)
    acquisition_times = [t for t in nd2_file.parser._raw_metadata.acquisition_times]
    common_metadata = nd2_file.parser._raw_metadata.image_text_info[b'SLxImageTextInfo']
    common_metadata = { key.decode(): val.decode() for key, val in common_metadata.items() }

    # save 'SLxImageTextInfo' as JSON
    json_file_path = Path(out_path) / Path(Path(in_file_path).stem +'.json')
    with open(json_file_path, "w") as outfile:
        json.dump(common_metadata, outfile)

    # parse metadata
    metadata_dict = nd2_file.parser._raw_metadata.__dict__
    additional_metadata = parse_additional_metadata(common_metadata)
    additional_metadata_df = pd.DataFrame(additional_metadata).T
    additional_metadata_df.columns = ['metadata_string_acquisition_' + str(i) for i in range(0,len(additional_metadata))]
    
    # combine into dataframe
    metadata_df = pd.DataFrame(
        data={
            'n_pixels_y' : metadata_dict['height'],
            'n_pixels_x' : metadata_dict['width'],
            'objective_name' : common_metadata['TextInfoItem_13'],
            'pixel_size_microns' : metadata_dict['pixel_microns'],
            'stage_x_abs' : nd2_file.parser._raw_metadata.x_data,
            'stage_y_abs' : nd2_file.parser._raw_metadata.y_data,
            'stage_z_abs' : nd2_file.parser._raw_metadata.z_data,
            'acquisition_time_rel' : acquisition_times,
            'stage_z_id' : list(metadata_dict['z_levels'])*(nd2_file.sizes['t']*nd2_file.sizes['v']),
            'field_id' : list(np.repeat(range(1,1 + nd2_file.sizes['v']),nd2_file.sizes['z']))*nd2_file.sizes['t']})
    
    metadata_df['filename_ome_tiff'] = [Path(in_file_path).stem + '_' + str(f).zfill(4) + '.ome.tiff' for f in metadata_df['field_id']]
    
    start_time_abs = get_start_time_abs(metadata_dict,common_metadata)
    if (start_time_abs is not None):
        metadata_df['acquisition_time_abs']=[start_time_abs + datetime.timedelta(seconds=x) for x in metadata_df['acquisition_time_rel']]

    # standardise field id (top-left to bottom-right)
    standard_field_id_mapping = get_standard_field_id_mapping(metadata_df)
    metadata_df = pd.merge(metadata_df, standard_field_id_mapping,on = 'field_id', how = 'left')
    
    # add additional metadata as columns
    metadata_df = pd.merge(metadata_df, additional_metadata_df,how='cross')
    
    # write metadata to file
    with Path(out_path) / Path(Path(in_file_path).stem +'_metadata.csv') as out_file_path:
        metadata_df.to_csv(out_file_path, index=False)
    with Path(out_path) / Path(Path(in_file_path).stem +'_metadata.pkl') as out_file_path:
        metadata_df.to_pickle(out_file_path)
    
    return(metadata_df)  

In [324]:
in_file_path = '/srv/scratch/berrylab/z3532965/Nikon_AX_QPI/20221010_BleachChase_POLR2A/20221010_144534_824/WellE05_ChannelGFP,AF647_Seq0001.nd2'
#in_file_path = '/srv/scratch/berrylab/z3536241/NikonNSTORM/221216/221212_Pbody_DDXImmuno/20221216_140317_611/Well01_ChannelAG_647_FISH,AG_568_FISH,AG_488NHS,AG_DAPI_Seq0000.nd2'
out_path = '/srv/scratch/berrylab/z3532965/Nikon_AX_QPI/'

In [325]:
tmp = extract_metadata_and_save(in_file_path,out_path)