# Interacting with GPIO from MicroBlaze

In [1]:
from pynq.overlays.base import BaseOverlay
import time
import threading
import multiprocessing
import os
import socket
from multiprocessing import Process, Value, Array, Lock

base = BaseOverlay("base.bit")

In [2]:
%%microblaze base.PMODB

#include "gpio.h"
#include "pyprintf.h"

//Function to turn on/off a selected pin of PMODB
void write_gpioB(unsigned int pin, unsigned int val){
    if (val > 1){
        pyprintf("pin value must be 0 or 1");
    }
    gpio pin_out = gpio_open(pin);
    gpio_set_direction(pin_out, GPIO_OUT);
    gpio_write(pin_out, val);
}

//Function to read the value of a selected pin of PMODB
unsigned int read_gpioB(unsigned int pin){
    gpio pin_in = gpio_open(pin);
    gpio_set_direction(pin_in, GPIO_IN);
    return gpio_read(pin_in);
}

//Add a C++ function to reset all the GPIO pins on the chosen PMOD
void reset_gpioB(){
    for (unsigned int j = 0; j <= 7; j = j + 1) { 
        write_gpioB(j, 0);
        //write_gpio(pin,val)
        //we want the value reset to zero
        //12 pins, starts at 0
    }
}

In [3]:
#LED_Status = [0, 0, 2, 3] #Default Status
#LED_Status = [1, 1, 2, 1] #test
#start_LED = False #when connection is made, LED() will start
#running = True #when client breaks connection, LED() sill stop running

sleep_time = 1
frequency = 15 #for the PWM Brightness
Duty_Cycle = [0.04736614270344993, 0.217637640824031, 0.5310492251033824, 1.0] #25%, 50%, 75%, and 100% brightness.
#time_on = duty_cycle * (1/frequency) && time_off = (1-duty_cycle) * (1/frequency)
PWM_on = []
for i in range(4):
    PWM_on.append(Duty_Cycle[i]/frequency)
PWM_off = []
for i in range(4):
    PWM_off.append((1-Duty_Cycle[i])/frequency)
PWM_run_time = frequency*sleep_time

In [4]:
def LED(LED_Status_pynq,_LL1, running, _LL2,_Lt):
    global PWM_on, PWM_off, PWM_run_time, sleep_time
    blink = True
    Array_Change = False
    LED_Status = []
    with _LL1:
        for i in range(4):
            LED_Status.append(LED_Status_pynq[i])
    while blink == True:
        print('LED Status: ',LED_Status[:],'\n running: ',running.value)
        threadlock = _Lt.acquire(False)
        if threadlock == True:
            if Array_Change == False:
                if LED_Status[0] == 0: #IF OFF
                    time.sleep(sleep_time)
                else: #IF ON
                    if LED_Status[3] == 3: #full bright (NO PWM)
                        if LED_Status[2] == 0: #If NO PWM & Cycle Color
                            for i in range(1,4): #cycle write from 1 (blue), 2 (red) and 3 (green)
                                write_gpioB(i,1) #turn on  LED
                                time.sleep(sleep_time) #sleep with ON
                                write_gpioB(i,0) #turn off LED
                                if LED_Status[1] == 1: #if supposed to blink, sleep
                                    time.sleep(sleep_time)
                        else: #IF NO PWM & Solid Color
                            write_gpioB(LED_Status[2],1)
                            time.sleep(sleep_time)
                            if LED_Status[1] == 1: #if supposed to blink, sleep
                                    write_gpioB(LED_Status[2],0)
                                    time.sleep(sleep_time)
                    else: #If NOT full brightness (PWM needed)
                        if LED_Status[2] == 0: #PWM & cycle thru colors
                            for i in range(1,4):
                                for j in range(PWM_run_time):
                                    write_gpioB(i,1) #turn on LED
                                    time.sleep(PWM_on[LED_Status[3]]) #sleep with ON
                                    write_gpioB(i,0) #turn off LED
                                    time.sleep(PWM_off[LED_Status[3]]) #sleep with OFF
                                if LED_Status[1] == 1: #If blinking
                                    time.sleep(sleep_time)
                        else:
                            for i in range(PWM_run_time):
                                write_gpioB(LED_Status[2],1) #turn on LED
                                time.sleep(PWM_on[LED_Status[3]]) #sleep with ON
                                write_gpioB(LED_Status[2],0) #turn off LED
                                time.sleep(PWM_off[LED_Status[3]]) #sleep with OFF
                            if LED_Status[1] == 1: #If Blinking
                                time.sleep(sleep_time)
                #time.sleep(.0001)
            else:
                with _LL1:
                    LED_Status = LED_Status_pynq
                    Array_Change = False
            _Lt.release() #NO SLEEP Time!
        else:
            with _LL1:
                for i in range(4):
                    if LED_Status[i] != LED_Status_pynq[i]:
                        Array_Change = True
            with _LL2:
                if running.value == 0:
                    blink = False
            time.sleep(.25)

def Wait_LED(start_p, LED_Status0, running_p, _LP0, _LP1, _LP2):
    global sleep_time
    print('Waiting for LED to start.')
    threads = []
    while True:
        with _LP0: #will keep this for the remainer of the program once start_p==1
            if start_p.value == 1:
                print('Starting LED')
                fork = threading.Lock()
                for i in range(2):
                    #fork is the shared lock
                    #i is the user (led number too)
                    t = threading.Thread(target=LED, args=(LED_Status0, _LP1, running_p, _LP2, fork))
                    threads.append(t) #creates the new thread number
                    t.start()
                reset_gpioB()
                for t in threads:
                    t.join()
                break
        #print('waiting... ', start_p)
        time.sleep(sleep_time)
        
def test(start_LED_s, LED_Status_s, running_s,_LS0,_LS1,_LS2):
    itera = 0
    while True:
        itera +=1
        if itera>3:
            with _LS0:
                start_LED_s.value = 1 #Now we can allow the LED() function to begin.
            print('start_LED = ',start_LED_s.value)
            print('Led status: ', LED_Status_s[:])
            for i in range(2):
                time.sleep(1)
            print('on:')
            data = ['1','1', '3', '0']
            with _LS1:
                for i in range(4): #length of data list ALWAYS = 4, could do 'len(data)' instead though.
                    LED_Status_s[i] = int(data[i])
            for i in range(5):
                time.sleep(1)
            print('LED status change')
            data = ['1','0', '0', '3']
            with _LS1:
                for i in range(4): #length of data list ALWAYS = 4, could do 'len(data)' instead though.
                    LED_Status_s[i] = int(data[i])
            for i in range(10):
                time.sleep(1)
            print('LED status change')
            data = ['1','1', '1', '2']
            with _LS1:
                for i in range(4): #length of data list ALWAYS = 4, could do 'len(data)' instead though.
                    LED_Status_s[i] = int(data[i])
            for i in range(6):
                time.sleep(1)
            print('About to disconnect')
            for i in range(2):
                time.sleep(1)
            with _LS2:#disconnect
                running_s.value = 0
            print('Disconnected!')
            break
        print(itera)
        time.sleep(1)

In [5]:
reset_gpioB()

start_i = Value('i', 0) #SHARED!
running_i = Value('i',1) #SHARED!
LED_Status_i = Array('i', [0, 0, 2, 2]) #SHARED
_L0 = Lock() #Lock0 for Start
_L1 = Lock() #Lock1 For LED status
_L2 = Lock() #Lock2 for running

p_GPIOB = multiprocessing.Process(target=test,args=(start_i,LED_Status_i,running_i,_L0,_L1,_L2))
p_GPIOB.start()
os.system("taskset -p -c {} {}".format(1, p_GPIOB.pid))
p_server = multiprocessing.Process(target=Wait_LED,args=(start_i,LED_Status_i,running_i,_L0,_L1,_L2))
p_server.start()
os.system("taskset -p -c {} {}".format(0, p_server.pid))

1
Waiting for LED to start.


0

2
3
start_LED = Starting LED 
1
Led status: LED Status:   LED Status:  [0, 0, 2, 2][0, 0, 2, 2]
[0, 0, 2, 2] 
 running:   
 running: 1 
1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0, 2, 2]LED Status:   
 running: [0, 0, 2, 2] 1 

 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
on:
LED Status:  [0, 0, 2, 2] 
 running: LED Status:   1[0, 0, 2, 2] 

 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0, 2, 2] LED Status: 
 running:   [0, 0, 2, 2]1 
LED Status: 
 running:   [1, 1, 3, 0]1
 
 running:  1
LED Status:  [1, 1, 3, 0] 
 running:  1
LED Status:  [1, 1, 3, 0] 
 running:  1
LED Status:  [1, 1, 3, 0] 
 running: LED Status:   [0, 0, 2, 2]1 

 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0, 2, 2] 
 running:  1
LED Status:  [0, 0

Process Process-2:


LED Status: 

Traceback (most recent call last):


 [1, 1, 1, 2]

  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()


 

  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)



 running: 

  File "<ipython-input-4-fc8dd15409c2>", line 82, in Wait_LED
    t.join()
  File "/usr/lib/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()


 

  File "/usr/lib/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):


0


KeyboardInterrupt


LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0
LED Status:  [1, 1, 1, 2] 
 running:  0


In [None]:
def run_pynq_server_R(start_LED_s, LED_Status_s, running_s,_LS0,_LS1,_LS2):
    global 
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.bind(('192.168.2.99', 12345))
    sock.listen()
    conn, addr = sock.accept()
    print("Ramin's Server is waiting for a connection!")
    with conn:
        print("{} connected to Ramin's Server!".format(addr))
        with _LS0:
            start_LED_s.value = 1 #Now we can allow the LED() function to begin.
        while True:
            time.sleep(0.0001)
            data = conn.recv(1024)
            data = data.decode()
            if (data == 'disconnect'):
                print("Client is disconnecting from Ramin's Server.")
                with _LS2:
                    running_s.value = 0
                break
            data = list(data) #Here we convert from string into list, then update all indexes from str to int
            with _LS1:
                for i in range(4): #length of data list ALWAYS = 4, could do 'len(data)' instead though.
                    LED_Status_s[i] = int(data[i])
    print("Client has disconnected from Ramin's Server.")


In [None]:
start_i = Value('i', 0) #SHARED!
running_i = Value('i',1) #SHARED!
LED_Status_i = Array('i', [1, 0, 2, 2]) #SHARED
_L0 = Lock() #Lock0 for Start
_L1 = Lock() #Lock1 For LED status
_L2 = Lock() #Lock2 for running

p_server = multiprocessing.Process(target=run_pynq_server_R,args=(start_i,LED_Status_i,running_i,_L0,_L1,_L2))
p_server.start()
os.system("taskset -p -c {} {}".format(0, p_server.pid))

p_GPIOB = multiprocessing.Process(target=Wait_LED,args=(start_i,LED_Status_i,running_i,_L0,_L1,_L2))
p_GPIOB.start()
os.system("taskset -p -c {} {}".format(1, p_GPIOB.pid))

p_server.join();
p_GPIOB.join();