# Basic configuration
These settings can be changed to vary the characteristics of the output pattern.

In [None]:
# The message to be encoded. Any valid UTF-8 string can be used. This has a
# maximum byte-length of 255. Keep in mind that emojis can use up to four bytes
# on their own.
MESSAGE = "Beans 🫘"

# The hue of the HSL color to be used. This color, and it's compliment, will be
# used to create the pattern.
COLOR_HUE = 185

# How many black frames to add before the animation. Can be any positive integer
# including zero.
ADD_BLACK_FRAMES = 6

# Advanced Configuration
Changing these settings can profoundly change the output pattern, but have a
relatively high risk of breaking my fairly unclean code. Change with caution.

In [None]:
# The number of FPS in the output pattern. Keeping this at 24 is recommended.
FPS = 24

# The number of color cycles to go through for each symbol. Keep this as a
# multiple of 2, E.G. 0.25, 0.5, 1.0, 2.0, etc.
CYCLES_PER_SYMBOL = 0.5

# The number of symbols encoded per second of the animation.
SYMBOLS_PER_SECOND = 1.0

# Imports and Constants

In [None]:
import numpy as np
import scipy.signal.windows as windows
from PIL import Image, ImageDraw
from io import BytesIO
import apng
import plotly.graph_objects as go
from color import Color
from crc8 import crc8

# The number of samples for each symbol is known at this time, since it's
# based only on other constants
SAMPLES_PER_SYMBOL = int(FPS * CYCLES_PER_SYMBOL / SYMBOLS_PER_SECOND)

# 13 symbol complex barker code used for it's great autocorrelation properties.
# Rotate it to 45 degrees so it matches our constellation.
MESSAGE_PREAMBLE = np.array(
    [1, 1j, -1, -1j, 1, -1j, 1, -1j, 1, -1j, -1, 1j, 1]
).repeat(SAMPLES_PER_SYMBOL) * (1 + 1j) / np.sqrt(2)

# The number of radians per sample of our message, used for carrier creation
PHA_PER_SAMPLE = 2 * np.pi * CYCLES_PER_SYMBOL / SAMPLES_PER_SYMBOL

# Encoding

In [None]:
# Encode the string as raw bytes
message_bytes = MESSAGE.encode()

# Take the 8-bit CRC of the message string, used to verify data integrity on the
# decoding side.
message_crc = crc8(message_bytes).to_bytes()

# Create our full packet's data with the following structure, then split it into
# individual bits
# | Message Size | 1 byte       |
# | Message      | <= 255 bytes |
# | CRC          | 1 byte       |
message_size = len(message_bytes) + len(message_crc)
message_bits = np.unpackbits(np.frombuffer(message_size.to_bytes(1) + message_bytes + message_crc, dtype=np.uint8))

# create an empty-padded array that we can strategically fill with our bits so
# that we can pack them into our 2-bit symbols
message_padded = np.zeros(message_bits.size * 4, dtype=np.uint8)
message_padded[6::8] = message_bits[0::2]
message_padded[7::8] = message_bits[1::2]
message_symbols = np.packbits(message_padded).astype(np.float32)

In [None]:
# Calculate our total number of samples including our preamble
n_samples = MESSAGE_PREAMBLE.size + SAMPLES_PER_SYMBOL * message_symbols.size

# Convert each of our message's symbols to a phase angle
symbols_angle = (message_symbols.astype(np.float32) / 4) * (2 * np.pi) + (np.pi / 4)
symbols_angle = symbols_angle.repeat(SAMPLES_PER_SYMBOL)

qpsk = np.concatenate([MESSAGE_PREAMBLE, np.exp(symbols_angle * 1j)])

In [None]:
# Plot our QPSK modulated packet, so we can get an idea of what it looks like
fig = go.Figure()

fig.add_scatter(
    x = np.linspace(0, qpsk.size, qpsk.size, endpoint=False),
    y = qpsk.real,
    name = "I"
)

fig.add_scatter(
    x = np.linspace(0, qpsk.size, qpsk.size, endpoint=False),
    y = qpsk.imag,
    name = "Q"
)

# Add a dotted line between each symbol
for i in range(SAMPLES_PER_SYMBOL, qpsk.size, SAMPLES_PER_SYMBOL):
    fig.add_vline(i, line=go.layout.shape.Line(dash="dot"))

# Add a thicker line at the end of our preamble
fig.add_vline(MESSAGE_PREAMBLE.size)

fig.show()

In [None]:

# Create our carrier to upmix our QPSK signal
carrier = np.exp(np.linspace(0, PHA_PER_SAMPLE * n_samples, n_samples, endpoint=False) * 1j)
upmixed = qpsk * carrier

# Pulse shape our mixed waveform to make smoother color transitions
pulse_filter = windows.blackmanharris(FPS // 2)
pulse_filter /= np.trapz(pulse_filter)
shaped = np.convolve(upmixed, pulse_filter, mode="same")

In [None]:
# Plot our finished QPSK signal that has been upmixed and pulse shaped.
fig = go.Figure()

fig.add_scatter(
    x = np.linspace(0, shaped.size, shaped.size, endpoint=False),
    y = shaped.real,
    name = "I (upmixed)",
)

fig.add_scatter(
    x = np.linspace(0, shaped.size, shaped.size, endpoint=False),
    y = shaped.imag,
    name = "Q (upmixed)",
)

# Add a dotted line between each symbol
for i in range(SAMPLES_PER_SYMBOL, upmixed.size, SAMPLES_PER_SYMBOL):
    fig.add_vline(i, line=go.layout.shape.Line(dash="dot"))

fig.show()

In [None]:
# Reinterpret our complex-valued data as Saturation and Lightness values for
# our HSL color pallete
if COLOR_HUE < 180:
    saturation = shaped.real
else:
    saturation = -shaped.real
lightness = shaped.imag * 0.5 + 0.5

In [None]:
# Plot our HSL colors
fig = go.Figure(layout=go.Layout(yaxis=dict(title="hue"), yaxis2=dict(title="saturation / lightness", side="right", overlaying='y')))

fig.add_scatter(
    x = np.linspace(0, (np.ones(saturation.shape) * COLOR_HUE).size, (np.ones(saturation.shape) * COLOR_HUE).size, endpoint=False),
    y = np.ones(saturation.shape) * COLOR_HUE,
    name = "hue",
)

fig.add_scatter(
    x = np.linspace(0, saturation.size, saturation.size, endpoint=False),
    y = saturation,
    name = "saturation",
    yaxis="y2"
)

fig.add_scatter(
    x = np.linspace(0, lightness.size, lightness.size, endpoint=False),
    y = lightness,
    name = "lightness",
    yaxis="y2"
)

# Add a dotted line between each symbol
for i in range(SAMPLES_PER_SYMBOL, (np.ones(saturation.shape) * COLOR_HUE).size, SAMPLES_PER_SYMBOL):
    fig.add_vline(i, line=go.layout.shape.Line(dash="dot"))

fig.show()

In [None]:
# Convert our HSL colors to RGB
hsl_array = np.stack([np.ones(saturation.shape) * COLOR_HUE, saturation, lightness], axis=1)
(red, green, blue) = Color.from_hsl_array(hsl_array).as_rgb().T

In [None]:
# Plot our RGB colors
fig = go.Figure()

fig.add_scatter(
    x = np.linspace(0, blue.size, blue.size, endpoint=False),
    y = blue,
    name = "blue"
)

fig.add_scatter(
    x = np.linspace(0, red.size, red.size, endpoint=False),
    y = red,
    name = "red"
)

fig.add_scatter(
    x = np.linspace(0, green.size, green.size, endpoint=False),
    y = green,
    name = "green"
)

# Add a dotted line between each symbol
for i in range(SAMPLES_PER_SYMBOL, red.size, SAMPLES_PER_SYMBOL):
    fig.add_vline(i, line=go.layout.shape.Line(dash="dot"))

fig.show()

In [None]:
# Here's a different way to visualize our colors. Each column of pixels is one
# frame of our finished animation
frame = Image.new("RGB", (red.size, 100))
draw = ImageDraw.Draw(frame)
for (i, (r, g, b)) in enumerate(zip(red, green, blue)):
    color = (r, g, b) if i % SAMPLES_PER_SYMBOL != 0 else (0, 0, 0)
    draw.line((i + 0.5, 0, i + 0.5, 100), fill=color)

frame

In [None]:
# Create our animation one frame at a time.
frames = []

# Add a few blank frames if desired
for i in range(ADD_BLACK_FRAMES):
    frame = Image.new("RGB", (100, 100))
    draw = ImageDraw.Draw(frame)
    draw.rectangle((0, 0, 100, 100), fill=(0, 0, 0))

    data = BytesIO()
    frame.save(data, "png")
    data.seek(0)
    frames.append(data)

for (r, g, b) in zip(red, green, blue):
    frame = Image.new("RGB", (100, 100))
    draw = ImageDraw.Draw(frame)
    draw.rectangle((0, 0, 100, 100), fill=(r, g, b))

    data = BytesIO()
    frame.save(data, "png")
    data.seek(0)
    frames.append(data)

animation = apng.APNG.from_files(frames, delay=(1000 // FPS))

# Save our animation as an animated PNG
animation.save("colors.png")

# Decoding

In [None]:
# load the animation from the file
animation = apng.APNG.open("colors.png")

recovered_colors = np.empty((len(animation.frames), 3), dtype=np.float32)
# recover the colors used from each from of the animation
for i, (png, control) in enumerate(animation.frames):
    data = BytesIO()
    png.save(data)
    data.seek(0)
    im = Image.open(data)
    colors = im.getcolors(1_000_000)
    colors.sort(key=lambda x: x[0])
    recovered_colors[i, :] = colors[0][1]

# convert from RGB to HSL
(recovered_hue, recovered_saturation, recovered_lightness) = Color.from_rgb_array(recovered_colors).as_hsl().T
hue_avg = np.sum(recovered_hue) / recovered_hue.size
recovered_saturation[np.argwhere(recovered_hue > hue_avg)] *= -1

# rebuild our complex values
recovered_complex = recovered_saturation + (recovered_lightness - 0.5) * 2j

In [None]:
# Plot our recovered complex values, including the hue, since it's important
# in determining a negative vs positive saturation value.
fig = go.Figure(layout = go.Layout(yaxis=dict(title="hue", range=[-10, 370]), yaxis2=dict(title="saturation / lightness", side="right", overlaying='y')))

fig.add_scatter(
    x = np.linspace(0, recovered_hue.size, recovered_hue.size, endpoint=False),
    y = recovered_hue,
    name = "hue",
    yaxis="y1",
)

fig.add_scatter(
    x = np.linspace(0, recovered_saturation.size, recovered_saturation.size, endpoint=False),
    y = recovered_complex.real,
    name = "saturation",
    yaxis="y2"
)

fig.add_scatter(
    x = np.linspace(0, recovered_lightness.size, recovered_lightness.size, endpoint=False),
    y = recovered_complex.imag,
    name = "lightness",
    yaxis="y2"
)

fig.show()

In [None]:
# downmix our signal
recovery_carrier = np.exp(-np.linspace(0, PHA_PER_SAMPLE * recovered_hue.size, recovered_hue.size, endpoint=False) * 1j)
recovered_qpsk = recovered_complex * recovery_carrier

# Perform correlation to find where in our animation our packet is
correlation = np.correlate(recovered_qpsk, MESSAGE_PREAMBLE, mode="full")

# Wherever our correlation is the strongest, assume that  that's the start of
# the packet
message_start = np.argmax(np.abs(correlation))

# Find the phase offset of our correlation, which tells us how much to shift our
# recovered QPSK by in order to align it for demodulation.
angle_offset = np.angle(correlation)[message_start]
recovered_qpsk *= np.exp(angle_offset * -1j)

# Weigh the middle of a symbol more than the edges (arbitrarily using the
# blackmanharris window)
symbol_kernel = windows.blackmanharris(SAMPLES_PER_SYMBOL)
symbol_kernel /= np.trapz(symbol_kernel)

In [None]:
# Plot our IQ values before and after weighing the center more than the edges,
# as well as our correlation so we can visualize our packet.
fig = go.Figure()

fig.add_scatter(
    x = np.linspace(0, recovered_qpsk.size, recovered_qpsk.size, endpoint=False),
    y = recovered_qpsk.real,
    name = "I",
)

fig.add_scatter(
    x = np.linspace(0, recovered_qpsk.size, recovered_qpsk.size, endpoint=False),
    y = recovered_qpsk.imag,
    name = "Q",
)

fig.add_scatter(
    x = np.linspace(0, recovered_qpsk.size, recovered_qpsk.size, endpoint=False),
    y = np.convolve(symbol_kernel, recovered_qpsk, mode="same").real,
    name = "I (filtered)",
)

fig.add_scatter(
    x = np.linspace(0, recovered_qpsk.size, recovered_qpsk.size, endpoint=False),
    y = np.convolve(symbol_kernel, recovered_qpsk, mode="same").imag,
    name = "Q (filtered)",
)

fig.add_scatter(
    x = np.linspace(0, recovered_qpsk.size, recovered_qpsk.size, endpoint=False),
    y = np.abs(correlation / np.trapz(MESSAGE_PREAMBLE)),
    name = "Preamble Correlation",
)

# Add a thicker line showing where we believe the start of our packet to be
fig.add_vline(message_start)

# Add a dotted line between each symbol
for i in range(message_start, recovered_qpsk.size, SAMPLES_PER_SYMBOL):
    fig.add_vline(i, line=go.layout.shape.Line(dash="dot"))

fig.show()

In [None]:
# Recover the size of message so we know how much to decode

# crop our complex values to the region that contains our message's length,
# then weigh the centers of the symbols more
filtered_qpsk_size = np.convolve(
    recovered_qpsk[message_start:message_start + SAMPLES_PER_SYMBOL * 4],
    symbol_kernel,
    mode="same"
)
# Get each important sample at each of our symbol intervals
recovered_points_size = filtered_qpsk_size[int(SAMPLES_PER_SYMBOL) // 2::int(SAMPLES_PER_SYMBOL)]

# Recover the angle of each of those points
recovered_phases_size = (np.angle(recovered_points_size) % (2 * np.pi) - (np.pi / 4))

# Convert each angle back into it's respective symbol
recovered_symbols_size = np.round(2 * recovered_phases_size / np.pi).astype(np.uint8)

# Some array tomfoolery to unpack our symbols and repack them as data
exploded_bits_size= np.unpackbits(recovered_symbols_size.astype(np.uint8))
recovered_bits_size = np.stack([exploded_bits_size[6::8], exploded_bits_size[7::8]]).flatten("F")

# convert the recovered data back into an integer as our size
recovered_size = int.from_bytes(bytes(np.packbits(recovered_bits_size)))

print(f"Message size: {recovered_size}")

In [None]:
# Recover remainder of message + CRC

# crop our complex values to the region that contains our message's length,
# then weigh the centers of the symbols more
match_filtered_phases = np.convolve(
    recovered_qpsk[message_start + SAMPLES_PER_SYMBOL * 4:message_start + SAMPLES_PER_SYMBOL * 4 + SAMPLES_PER_SYMBOL * 4 * recovered_size],
    symbol_kernel,
    mode="same"
)
# Get each important sample at each of our symbol intervals
recovered_point = match_filtered_phases[int(SAMPLES_PER_SYMBOL) // 2::int(SAMPLES_PER_SYMBOL)]

# Recover the angle of each of those points
recovered_phases = (np.angle(recovered_point) % (2 * np.pi) - (np.pi / 4))

# Convert each angle back into it's respective symbol
recovered_symbols = np.round(2 * recovered_phases / np.pi).astype(np.uint8)

# Some array tomfoolery to unpack our symbols and repack them as data
exploded_bits = np.unpackbits(recovered_symbols.astype(np.uint8))
recovered_bits = np.stack([exploded_bits[6::8], exploded_bits[7::8]]).flatten("F")

# Convert the message data back into python's raw bytes
recovered_bytes = bytes(np.packbits(recovered_bits))

# Check the CRC of the message, if it passes, our message contains valid data.
# Decode it and print it!
recovered_message = recovered_bytes[:-1]
if crc8(recovered_message, recovered_bytes[-1]) == 0:
    print("Message CRC passed!")
    print(f"Message contents: '{bytes(recovered_message).decode()}'")

else:
    print("Message CRC check failed!")
