In [1]:
from pynq.overlays.base import BaseOverlay
import os, time, socket, multiprocessing, asyncio

base = BaseOverlay("base.bit")

In [2]:
%%microblaze base.PMODB

#include <unistd.h>
#include "gpio.h"

// Function to write to a PMOD pin
int write_gpio(unsigned int pin, unsigned int val) {
    if (val > 1) {
        // Technically, an error, but just limit the user input to 1
        val = 1;
    }
    gpio pin_out = gpio_open(pin);
    gpio_set_direction(pin_out, GPIO_OUT);
    gpio_write(pin_out, val);

    return 0;
}

// Function to reset the PMOD pins
int reset_pins(void) {
    for (int i = 0; i < 8; i++)
        write_gpio(i, 0);
    return 0;
}

In [3]:
checking_btns = True
btn_num_pressed = -1

async def get_btn_pressed():
    global base, checking_btns, btn_num_pressed
    while True:
        await asyncio.sleep(0.25)
        if not checking_btns:
            continue

        btn_pressed = base.btns_gpio.read()
        if btn_pressed:
            btn_num_pressed_local = -1
            while btn_pressed:
                btn_pressed = btn_pressed >> 1
                btn_num_pressed_local += 1

            btn_num_pressed = btn_num_pressed_local
            checking_btns = False

async def send_to_server(_loop):
    global checking_btns, btn_num_pressed

    server_connected = False
    SERVER_IP = "192.168.8.164"
    LISTENING_PORT = 12345

    while True:
        await asyncio.sleep(0.25)
        if checking_btns:
            continue

        print(f"Button {btn_num_pressed} pressed!")

        # Button 0 connects to the server
        # Button 1 sounds the buzzer
        # Button 2 disconnects from the server
        if btn_num_pressed == 0:
            if not server_connected:
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.connect((SERVER_IP, LISTENING_PORT))
                except:
                    print(f"Could not connect to {SERVER_IP}:{LISTENING_PORT}")
                print(f"Connected to server!")
                server_connected = True
            else:
                print(f"Already connected to server!")

            checking_btns = True
        
        elif btn_num_pressed == 1:
            try:
                sock.sendall(b'buzz')
            except:
                print(f"Could not send 'buzz' to server")
            print(f"Sent 'buzz' to server!")
            checking_btns = True

        elif btn_num_pressed == 2:
            try:
                sock.sendall(b'stop')
                sock.close()
            except:
                print(f"Could not send 'stop' to server")
            print(f"Sent 'stop' to server!")
            loop.stop()

In [None]:
# freq is the tone frequency in Hz
# pin is the PMOD pin buzzer signal is connected to
def generate_tone(freq = 50, pin = 3):
    if freq == 0:
        print("Cannot pass 0 as tone frequency")
        return

    reset_pins()

    sleep_time = 1.0 / (2 * freq); # In seconds

    start = time.time()
    while True:
        write_gpio(pin, 1)
        time.sleep(sleep_time)
        write_gpio(pin, 0)
        time.sleep(sleep_time)
        # Sound the buzzer for approx. 0.5 seconds
        if time.time() - start > 0.5:
            break

def p_server(_cpu):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 1: Bind the socket to the pynq board <CLIENT-IP> at port <LISTENING-PORT>
    sock.bind(('', 12345))
    sock.listen(1)

    # 2: Accept connections
    conn, addr = sock.accept()
    print(f"Connected by {addr}")

    # 3: Receive bytes from the connection
    # 'buzz' - Generate tone
    # 'stop' - Disconnect
    while True:
        data = conn.recv(1024)
        data = data.decode("utf-8")
        print(f"Data received from client: {data}")
        if data == "buzz":
            generate_tone()
        elif data == "stop":
            sock.close()
        else:
            print(f"Unexpected data received from client: {data}")

def p_client(_cpu):
    loop = asyncio.new_event_loop()
    loop.create_task(send_to_server(loop))
    loop.create_task(get_btn_pressed())
    loop.run_forever()
    loop.close()

# Launch process1 on CPU0
p1 = multiprocessing.Process(target=p_server, args=(0, ))

# Start the process before issuing the taskset OS
# command to bind the CPU affinity. PID is not
# populated until process is started.
p1.start()

# -p is a bit mask (without the "-c" option). CPU0 would be 0x1.
print("\n")
os.system("taskset -p {} {}".format(0x1, p1.pid))

# Launch process2 on CPU1
p2 = multiprocessing.Process(target=p_client, args=(1, ))

# Start the process before issuing the taskset OS
# command to bind the CPU affinity. PID is not
# populated until process is started.
p2.start()

# -p is a bit mask (without the "-c" option), CPU1 would be 2'b10.
print("\n")
os.system("taskset -p {} {}".format(0x2, p2.pid))

p2.join() # wait for process2 to finish
p1.join() # wait for process1 to finish



pid 5721's current affinity mask: 3
pid 5721's new affinity mask: 1


pid 5724's current affinity mask: 3
pid 5724's new affinity mask: 2
