In [None]:
import ipywidgets as wdg
import serial
from serial.tools import list_ports
import time
import threading

import numpy as np
import bqplot as bq

In [None]:
RESOLUTION = 12

In [None]:
port_list = [port.device for port in list_ports.comports()]
port_list.sort()

In [None]:
tabs = []


port = wdg.Dropdown(options=port_list, value=port_list[0], description='Port:')
connect = wdg.Button(description='Connect')
disconnect = wdg.Button(description='Disconnect', disabled=True)
tabs.append(('Connection', wdg.VBox([port, connect, disconnect])))

request = wdg.Button(description='Request', disabled=True)
request_interval = wdg.FloatLogSlider(value=0.1, min=-3, max=2, step=0.1, description='Interval (s):')
request_continuous = wdg.Checkbox(value=False, description='Continuous')
tabs.append(('Request', wdg.VBox([request, request_interval, request_continuous])))

response = wdg.Label(value='')
response_display = wdg.IntProgress(value=0, min=0, max=2**RESOLUTION-1, description='Value:')


x_data = np.array([])
y_data = np.array([])

x_sc = bq.LinearScale()
y_sc = bq.LinearScale(min=0, max=2**RESOLUTION-1)

line = bq.Lines(x=x_data, y=y_data, scales={'x': x_sc, 'y': y_sc})

ax_x = bq.Axis(scale=x_sc, label='Time')
ax_y = bq.Axis(scale=y_sc, orientation='vertical', label='Value')

fig = bq.Figure(marks=[line], axes=[ax_x, ax_y], title='ADC Value vs Time')

top_bar = wdg.Tab(children=[tab for title, tab in tabs], titles=[title for title, tab in tabs])
layout = wdg.VBox([top_bar, response, response_display, fig])

In [None]:
fpga = None

requesting = False
request_thread = None

times = []
voltages = []

In [None]:
class FPGA_Sar:

    def __init__(self, port):
        self.port = port

    def __enter__(self):
        self.ser = serial.Serial(self.port, 115200, timeout=1)
        return self

    def request(self):
        self.ser.write(b's')
        response = self.ser.readline().decode('utf-8').strip()
        return int(response, 2)
    
    def __exit__(self, exc_type, exc_value, traceback):
        self.ser.close()

    def __del__(self):
        self.ser.close()

In [None]:
def connect_to_port(port_name):
    global fpga

    fpga = FPGA_Sar(port_name)

    try:
        with fpga as f:
            pass
    except serial.SerialException as e:
        print(e)

    connect.disabled = True
    disconnect.disabled = False
    request.disabled = False

    connect.description = 'Connected'
    top_bar.set_title(0, 'Connection: ' + port_name)

def disconnect_from_port(b):
    global fpga

    fpga = None
    
    connect.disabled = False
    disconnect.disabled = True
    request.disabled = True

    connect.description = 'Connect'
    top_bar.set_title(0, 'Connection')

def request_thread_entry():
    global requesting, x_data, y_data

    last_time = time.time()
    start_time = last_time

    with fpga as f:
        while requesting:
            loop_start_time = time.time()
            
            value = f.request()

            elapsed_time = time.time() - last_time
            last_time = time.time()
            response.value = f"{value:b} ({value}) @ {1/elapsed_time:.2f} Hz"
            response_display.value = value

            times.append(last_time - start_time)
            voltages.append(value)

            # constrain times and voltages to last 10 seconds
            while times[-1] - times[0] > 10:
                times.pop(0)
                voltages.pop(0)

            line.x = times
            line.y = voltages

            if request_interval.value - (time.time() - loop_start_time) > 0:
                time.sleep(request_interval.value - (time.time() - loop_start_time))

def request_data(b):
    global requesting, request_thread

    if request_thread is not None:
        requesting = False
        request_thread.join()
        request_thread = None
        request.description = 'Request'
        return

    if not request_continuous.value:
        
        with fpga as f:
            value = f.request()

        response.value = f"{value:b} ({value})"

    else:
        requesting = True
        request.description = 'Stop'

        request_thread = threading.Thread(target=request_thread_entry)
        request_thread.start()

In [None]:
connect.on_click(lambda b: connect_to_port(port.value))
disconnect.on_click(disconnect_from_port)
request.on_click(request_data)

In [None]:
display(layout)

In [None]:
import pygame as pg
import numpy as np

fpga = FPGA_Sar(port_list[0])

player_y = 0
player_vy = 0
player_ay = 0

def iterate_player(screen, fpga):
    global player_y, player_vy, player_ay
    
    # player_ay = (2048 - fpga.request()) / 2048 * 0.1
    # player_vy += player_ay
    player_vy = (2048 - fpga.request()) / 2048 * 5
    player_y += player_vy

    if player_y < 0:
        player_y = 0
        player_vy = 0

    if player_y > screen.get_height() - 20:
        player_y = screen.get_height() - 20
        player_vy = 0

    pg.draw.rect(screen, (255, 0, 0), (50, player_y, 20, 20))

pipes = []

def spawn_pipes(screen):
    global pipes
    pipes.append([screen.get_width(), np.random.randint(50, screen.get_height() - 200), np.random.randint(50, 100)])

def iterate_pipes(screen):
    global pipes
    for pipe in pipes:
        pipe[0] -= 3
        pg.draw.rect(screen, (39, 166, 0), (pipe[0], 0, 50, pipe[1]))
        pg.draw.rect(screen, (39, 166, 0), (pipe[0], pipe[1] + pipe[2], 50, screen.get_height() - pipe[1] - pipe[2]))

# cloud is of format [distance from left, distance from bottom]
x_tot = 0
clouds = []
while x_tot < 1100:
    clouds.append([x_tot, np.random.randint(10, 150)])
    x_tot += np.random.randint(20, 60)

x_tot = 0
clouds2 = []
while x_tot < 1100:
    clouds2.append([x_tot, np.random.randint(150, 200)])
    x_tot += np.random.randint(20, 60)

def draw_pixelated_circle(screen, x, y, radius, color):
    for i in range(x - radius, x + radius, 5):
        for j in range(y - radius, y + radius, 5):
            if (i - x) ** 2 + (j - y) ** 2 < radius ** 2:
                # draw rect of size 10x10
                pg.draw.rect(screen, color, (i, j, 10, 10))

def iterate_clouds(screen):
    global clouds, clouds2

    for cloud in clouds2:
        cloud[0] -= 1
        if cloud[0] < -100:
            cloud[0] = 900
        draw_pixelated_circle(screen, cloud[0], screen.get_height() - cloud[1], 50, (140, 220, 220))
        pg.draw.rect(screen, (140, 220, 220), (cloud[0] - 50, screen.get_height() - cloud[1], 110, 200))

    # draw clouds
    for cloud in clouds:
        cloud[0] -= 2
        if cloud[0] < -100:
            cloud[0] = 900
        draw_pixelated_circle(screen, cloud[0], screen.get_height() - cloud[1], 50, (252, 251, 242))
        pg.draw.rect(screen, (252, 251, 242), (cloud[0] - 50, screen.get_height() - cloud[1], 110, 200))

last_pipe_ticks = 0

def check_rect_collision(rect1, rect2):
    return not (rect1[0] > rect2[0] + rect2[2] or rect1[0] + rect1[2] < rect2[0] or rect1[1] > rect2[1] + rect2[3] or rect1[1] + rect1[3] < rect2[1])

def check_collision():
    global player_y, pipes

    for pipe in pipes:
        if check_rect_collision((50, player_y, 20, 20), (pipe[0], 0, 50, pipe[1])) or check_rect_collision((50, player_y, 20, 20), (pipe[0], pipe[1] + pipe[2], 50, 600 - pipe[1] - pipe[2])):
            # draw game over screen
            screen.fill((70, 180, 200))
            pg.font.init()
            font = pg.font.SysFont('Comic Sans MS', 30)
            text = font.render('Game Over!', False, (0, 0, 0))
            screen.blit(text, (300, 300))
            text = font.render(f"Score: {len(pipes) - 3}", False, (0, 0, 0))
            screen.blit(text, (300, 350))
            text = font.render('Press any key to restart', False, (0, 0, 0))
            screen.blit(text, (300, 400))
            pg.display.flip()

            pg.event.clear()
            while True:
                event = pg.event.wait()
                if event.type == pg.KEYDOWN:
                    break
                if event.type == pg.QUIT:
                    pg.quit()
                    return

            pipes.clear()

def run_game_iteration(screen, fpga):

    global last_pipe_ticks

    # draw sky
    screen.fill((70, 180, 200))
    
    # add random pipes every 100 iterations
    if pg.time.get_ticks() - last_pipe_ticks > 2000:
        spawn_pipes(screen)
        last_pipe_ticks = pg.time.get_ticks()

    iterate_clouds(screen)
    iterate_pipes(screen)
    iterate_player(screen, fpga)

    check_collision()

pg.init()
screen = pg.display.set_mode((800, 600))
pg.display.set_caption('FPGA SAR ADC Demo - Flying Block')
clock = pg.time.Clock()

running = True

with fpga as f:
    try:
        while running:
            for event in pg.event.get():
                if event.type == pg.QUIT:
                    running = False
    
            run_game_iteration(screen, f)
    
            pg.display.flip()
            clock.tick(60)
    except Exception as e:
        pass

pg.quit()