In [349]:
import struct
import numpy as np
import pandas as pd
from math import ceil, floor
from datetime import datetime, timezone

SAMPLE_SIZE = 32 # 32bit floats
COMPLEX_SAMPLES = True # read in to_codif why changing this would be bed

# B = unsigned char (8b)
# H = unsigned short (16b)
# I = unsigned int (32b)
# Q = unsigned long long (64b)
header_format = "".join([
	"<", # whole thing is little-endian
	# word 0
	"I", "I",
	# word 1
	"B", "B", "H", "H", "H",
	# word 2
	"H", "H", "H", "H",
	# word 3
	"H", "H", "I",
	# word 4
	"Q",
	# word 5
	"I", "H", "H",
	# word 6-7
	"Q", "Q",
])

def encode_small_fields(complex_samples):
	# 0010 = sample representation (float32)
	sample_representation = (0b0010 << 4)
	# 0 = calibration signal (false, not present)
	# 0 = complex (default to false, using real numbers)
	complex = (0b1 if complex_samples else 0b0) << 2
	# 0 = invalid (false)
	# 0 = atypical (false)
	# 111 = dummy vdif version number (codif is 7)
	protocol = (0b111 << 13)
	# 00011 = codif version number (using 3)
	codif_version = (0b00011 << 8)
	combined = (sample_representation | complex | protocol | codif_version)
	return combined


def encode_frame(frame_number, timestamp, complex_samples, alignment_period, samples_per_alignment, samples):
	# first, construct infer-able values
	reference_epoch = ((timestamp.year - 2020) * 2) + (1 if timestamp.month >= 6 else 0)
	epoch_timestamp = datetime(year=timestamp.year, month=(6 if timestamp.month >= 6 else 1), day=1, tzinfo=timezone.utc)
	epoch_offset = int((epoch_timestamp - epoch_timestamp).total_seconds())
	channels_per_thread = samples.shape[1]
	num_samples = samples.shape[0]
	sample_size = SAMPLE_SIZE
	sample_block_length = int(sample_size * channels_per_thread * (2 if complex_samples else 1) / 8) # bytes
	if sample_block_length % 8 != 0:
		sample_block_length += 8 - (sample_block_length % 8)
	data_array_length = sample_block_length * num_samples
	# now, pack the small field values into a byte
	small_fields = encode_small_fields(complex_samples)
	# now, pack the values into a bytearray
	header_values = (
		frame_number,
		epoch_offset,
		reference_epoch,
		sample_size,
		small_fields,
		0, # reserved field
		alignment_period,
		0, # thread_id
		0, # group_id
		0, # secondary_id
		sum([ord(c) << 8 * i for i, c in enumerate("XX")]), # station_id
		# (arbitrary 2-char ASCII code, hopefully obvious enough to show not a real station)
  		# avoids known ids such as from e.g.
  		# - SKED (https://github.com/nvi-inc/sked_catalogs/blob/main/stations.cat)
		# - IVS (https://ivscc.gsfc.nasa.gov/about/org/components/ns-list.html)
		channels_per_thread,
		int(sample_block_length / 8), # units of 8 bytes
		int(data_array_length / 8), # units of 8 bytes
		samples_per_alignment,
		0xFEEDCAFE, # sync_sequence
		0, # metadata_id
		0, 0, 0, # metadata (blank)
	)
	frame_bytes = bytearray(struct.pack(header_format, *header_values))
	# now pack the samples into a bytearray and append
	for i in range(num_samples):
		sample_bytes = bytearray()
		for sample in samples[i]:
			if complex_samples:

				sample_bytes.extend(struct.pack("ff", sample.real, sample.imag))
			else:
				if sample.imag != 0: print("AHHH", sample)
				sample_bytes.extend(struct.pack("f", sample))
		# zero-pad if necessary to sample_block_length
		for _ in range(sample_block_length - len(sample_bytes)):
			sample_bytes.append(0)
		# now append to the overall frame
		frame_bytes.extend(sample_bytes)
	return frame_bytes

def get_alignment(timestamps):
	global thing
	thing = timestamps
	# so, CODIF frames handle the fact that samplerate might be infinitesimally small
	# (and small numbers are a nightmare to store precisely when bit depth is at a premium)
	# by instead storing alignment period
 	# (number of seconds between when sample time lines up with a whole second again)
	# and samples per alignment period
	# thus samplerate = (samples_per_alignment / alignment_period) per second
	mask = (timestamps.dt.microsecond == 0) & (timestamps.dt.nanosecond == 0)
	whole_timestamps = timestamps[mask]
	alignment_periods = whole_timestamps.diff(1).dropna().unique().tolist()
	alignment_periods = [int(x.total_seconds()) for x in alignment_periods]
	# now to get samples per period
	alignment_indices = timestamps.reset_index().rename(columns={"index":"sample_number"}).sample_number
	alignment_indices = alignment_indices[mask]
	first_alignment, last_alignment = (int(alignment_indices.iloc[0]), int(alignment_indices.iloc[-1]))
	if len(whole_timestamps) < 2:
		raise ValueError(f"Sample rate cannot be inferred from data which contains less than one complete alignment period")
	samples_per_alignments = alignment_indices.diff(1).dropna().unique().tolist()
	if last_alignment != len(timestamps) - 1:
		hanging_samples = len(timestamps) - last_alignment
		if hanging_samples > max(samples_per_alignments):
			samples_per_alignments.append(hanging_samples)
	if len(alignment_periods) > 1 or len(samples_per_alignments) > 1:
		low = min(samples_per_alignments) / max(alignment_periods) / 1e6 # MHz
		high = max(samples_per_alignments) / min(alignment_periods) / 1e6 # MHz
		raise ValueError(f"Sample rate is inconsistent between {int(low)} and {int(high)} MHz" \
				   f"\nalignment_periods: {alignment_periods}\nsamples_per_alignment: {samples_per_alignments}")
	alignment_period = int(alignment_periods[0])
	samples_per_alignment = int(samples_per_alignments[0])
	alignment_offset = int(samples_per_alignment - first_alignment)
	return alignment_period, samples_per_alignment, alignment_offset

def to_codif(infile_path, outfile_path, num_channels=None):
	df = pd.read_csv(infile_path, parse_dates=True)
	# for this to work, it must have the structure [timestamp, ch0, ..., chN]
	# timestamps must be ISO-8601 (preferably utc) timestamps
	timestamps = df[df.columns[0]]
	try:
		timestamps = pd.to_datetime(timestamps, errors="raise").astype("datetime64[ns]")
	except (pd.errors.ParserError, ValueError) as error:
		error.add_note(timestamps.iloc[0])
		error.add_note("The first column in data must contain only valid ISO-8601 timestamps.")
		raise error
	# samples must be numeric, either real or complex numbers
	samples = df[df.columns[1:]]
	complex_samples = COMPLEX_SAMPLES # read the bottom of this conversion block
	for column in samples.columns:
		if not pd.api.types.is_numeric_dtype(samples[column]):
			# method 1: they are just numbers but pandas ignored that
			try:
				samples[column] = pd.to_numeric(samples[column])
			except: pass
			# method 2: they are complex numbers
			if not pd.api.types.is_numeric_dtype(samples):
				try:
					samples[column] = samples[column].apply(lambda x: np.complex(x))
					complex_samples = True
				except: pass
			# method 2: they are complex numbers but use i notation instead of j
			if not pd.api.types.is_numeric_dtype(samples):
				try:
					samples[column] = samples[column].apply(lambda x: np.complex(x.replace("i", "j")))
					complex_samples = True
				except:
					# if they're still not numbers by here, throw an error because we have no idea
					raise ValueError("Sample columns must be numeric in either real or complex format.")
	# because detection of complex numbers which may have an imaginary component of 0 is fraught in python
	# easier to always assume (and output in)complex mode
	samples = samples.astype(complex)
	# pad into certain number of channels if required
	if num_channels is not None:
		num_existing_channels = len(samples.columns)
		if num_channels < num_existing_channels:
			# trim some channels off
			samples = samples[[samples.columns[:num_channels]]]
		elif num_channels > num_existing_channels:
			# add some blank ones
			for i in range(num_channels - num_existing_channels):
				samples[f"black_ch{i+1}"] = 0+0j if complex else 0
	# now to write some data!
	outfile = open(outfile_path, "wb")
	samples_per_frame = 1000
	alignment_period, samples_per_alignment, alignment_offset = get_alignment(timestamps)
	frames_per_alignment_period = int(samples_per_alignment / samples_per_frame)
	num_frames = ceil(len(samples) / samples_per_frame)
	# now cull the list of timestamps to just the relevant one for each frame start
	timestamps = timestamps.tolist()[0::samples_per_frame]
	for i in range(num_frames):
		timestamp = timestamps[i]
		frame_number = (i + alignment_offset) % frames_per_alignment_period
		frame_samples = samples.loc[i * samples_per_frame: (i + 1) * samples_per_frame - 1].to_numpy() # loc is inclusive of end index
		# if samples cut off, (presumably in the last frame) pad with zeroes
		if frame_samples.shape[0] < samples_per_frame:
			np.vstack([frame_samples, [[0] * num_channels] * (samples_per_frame - frame_samples.shape[0])])
		frame_data = encode_frame(frame_number, timestamp, complex_samples, alignment_period, samples_per_alignment, frame_samples)
		outfile.write(frame_data)
	outfile.close()
	return

In [283]:
def create_data(frequency, num_samples, channel_functions):
	num_channels = len(channel_functions)
	df = pd.DataFrame(columns=["timestamp"] + [f"ch{i+1}" for i in range(num_channels)])
	timestamp = np.datetime64("now")
	def get_timestep(frequency):
		ns = float(1e9)
		nanoseconds = floor(ns / float(frequency))
		if int(ns / float(frequency)) != nanoseconds:
			raise ValueError("Frequency granularity too high, the nanosecond component of the sample timestep is a non-integer.")
		return np.timedelta64(nanoseconds, "ns")
	timestep = get_timestep(frequency)
	for i in range(num_samples):
		row = [timestamp] + [f(i) for f in channel_functions]
		df.loc[i] = row # insert into frame
		timestamp += timestep # increment time
	return df

In [359]:
from math import log10, pow

f = [ # add a function or lambda for each channel you want to produce
	lambda x: x / 10,
	lambda x: pow(10, log10(x + 1) - 1) - 1,
	lambda x: round(float(x) / 1000.0) * 1000.0
] # no judgies, I honestly have no clue what kind of number voltage values should have
df = create_data(2e3, 10000, f) # 2kHz, 100000 samples, channels produced using functions f
# (tiny frequency is because at least >1second of data must be made so the minimum samples scales with frequency
# and generating a billion samples takes like a minute each time)
df.to_csv("/Users/mars/csv_to_codif.csv", index=False)
df.head()

Unnamed: 0,timestamp,ch1,ch2,ch3
0,2024-04-19 07:56:14.000000,0.0,-0.9,0.0
1,2024-04-19 07:56:14.000500,0.1,-0.8,0.0
2,2024-04-19 07:56:14.001000,0.2,-0.7,0.0
3,2024-04-19 07:56:14.001500,0.3,-0.6,0.0
4,2024-04-19 07:56:14.002000,0.4,-0.5,0.0


In [360]:
to_codif("/Users/mars/csv_to_codif.csv", "/Users/mars/csv_to_codif_test.codif", num_channels=8)