In [20]:
%serialconnect --port=/dev/cu.usbserial-7 --baud=115200 

import display

display = display.Display()

[34mConnecting to --port=/dev/cu.usbserial-7 --baud=115200 [0m
[34mReady.
[0m

In [21]:
import sys

sys.path.append("")

from micropython import const

import uasyncio as asyncio
import aioble
from bluetooth import UUID

import random
import struct
import json

class BleClient:
    SERVER_NAME = "SnakeMultiplayer"
    SNAKE_SERVICE_UUID = UUID("ca07faee-7e95-4856-b44b-bbaef52ec7b4")
    APPLE_POSITION_CHARACTERISTIC_UUID = UUID("cec23f9f-cdc0-4577-8798-8dd4b01724d8")
    SNAKE1_POSITIONS_CHARACTERISTIC_UUID = UUID("e72c988f-7279-4b82-b808-42884a4ba48f")
    SNAKE2_POSITIONS_CHARACTERISTIC_UUID = UUID("1673be03-ef60-4fb9-869c-4513a8e212f1")
    SNAKE2_DIRECTIONS_CHARACTERISTIC_UUID = UUID("1533130f-7251-4a87-a976-1b5bc0fae798")

    transmitted_snake1_tails = []
    transmitted_snake2_tails = []

    def __init__(self) -> None:
        self.snake_service_connection = None
        self.snake1_tails_characteristic = None
        self.snake2_tails_characteristic = None
        self.apple_position_characteristic = None

    async def read_snake1_tails(self):
        dir(self.snake1_tails_characteristic)
        value = await self.snake1_tails_characteristic.read()
        data = list(json.loads(value))
        snakeLength = data[0]
        newSnakeTails = data[1]

        for i in range(0, len(newSnakeTails)):
            self.transmitted_snake1_tails.append(newSnakeTails[i])

        while len(self.transmitted_snake1_tails) > snakeLength:
            self.transmitted_snake1_tails.pop(0)

        return self.transmitted_snake1_tails
    
    async def read_snake2_tails(self):
        value = await self.snake2_tails_characteristic.read()
        print(value)
        data = list(json.loads(value))
        snakeLength = data[0]
        newSnakeTails = data[1]

        for i in range(0, len(newSnakeTails)):
            self.transmitted_snake2_tails.append(newSnakeTails[i])

        while len(self.transmitted_snake2_tails) > snakeLength:
            self.transmitted_snake2_tails.pop(0)

        return self.transmitted_snake2_tails
    
    async def read_apple_posistion(self):
        # value = await self.apple_position_characteristic.read()
        # return list(json.loads(value))
        return [0,0]
        

    async def find_server(self):
        if (self.snake_service_connection is not None):
            return True
        
        async with aioble.scan(5000, interval_us=30000, window_us=30000, active=True) as scanner:
            async for result in scanner:
                if result.name() == self.SERVER_NAME and self.SNAKE_SERVICE_UUID in result.services():
                    print("its a match")
                    device = result.device
                    connection = await device.connect()
                    self.snake_service_connection = await connection.service(self.SNAKE_SERVICE_UUID)
                    self.snake1_tails_characteristic = await self.snake_service_connection.characteristic(self.SNAKE1_POSITIONS_CHARACTERISTIC_UUID)
                    self.snake2_tails_characteristic = await self.snake_service_connection.characteristic(self.SNAKE2_POSITIONS_CHARACTERISTIC_UUID)
                    self.apple_position_characteristic = await self.snake_service_connection.characteristic(self.APPLE_POSITION_CHARACTERISTIC_UUID)
                    # await self.snake1_tails_characteristic.subscribe(notify=True)
                    return True
            
        return False

In [22]:
BLOCK_SIZE = 4

DISPLAY_WIDTH = 128
DISPLAY_HEIGHT = 64

GAME_WIDTH = int(DISPLAY_WIDTH / BLOCK_SIZE)
GAME_HEIGHT = int(DISPLAY_HEIGHT / BLOCK_SIZE)

def draw_game(snake1, snake2, apple):
    
    display.clear()
    
    display.text("Score", 0, 0)
    for s1 in snake1:
        draw_point(s1[0], s1[1])
    
    for s2 in snake2:
        draw_point(s2[0], s2[1])

    draw_point(apple[0], apple[1])

    display.show()



def draw_point(x, y):
    display.fill_rect(x * BLOCK_SIZE, y * BLOCK_SIZE, BLOCK_SIZE, BLOCK_SIZE)



In [23]:
from time import sleep
bleClient = BleClient()

oldSnake1Tails = []

async def main():
    global bleClient, draw_game

    while True:
        result = await bleClient.find_server()
        if result is False:
            continue

        snake1Tails = await bleClient.read_snake1_tails()
        snake2Tails = await bleClient.read_snake2_tails()
        applePos = await bleClient.read_apple_posistion()
    
        print(snake1Tails)




        draw_game(snake1Tails, snake2Tails, applePos)

        sleep(0.02)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

its a match


In [None]:
# import sys

# sys.path.append("")

# from micropython import const

# import uasyncio as asyncio
# import aioble
# import bluetooth

# import random
# import struct

# # org.bluetooth.service.environmental_sensing
# _ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# # org.bluetooth.characteristic.temperature
# _ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)


# # Helper to decode the temperature characteristic encoding (sint16, hundredths of a degree).
# def _decode_temperature(data):
#     return struct.unpack("<h", data)[0] / 100


# async def find_temp_sensor():
#     # Scan for 5 seconds, in active mode, with very low interval/window (to
#     # maximise detection rate).
#     async with aioble.scan(5000, interval_us=30000, window_us=30000, active=True) as scanner:
#         async for result in scanner:
#             # See if it matches our name and the environmental sensing service.
#             if result.name() == "mpy-temp" and _ENV_SENSE_UUID in result.services():
#                 return result.device
#     return None


# async def main():
#     device = await find_temp_sensor()
#     if not device:
#         print("Temperature sensor not found")
#         return

#     try:
#         print("Connecting to", device)
#         connection = await device.connect()
#     except asyncio.TimeoutError:
#         print("Timeout during connection")
#         return

#     async with connection:
#         try:
#             temp_service = await connection.service(_ENV_SENSE_UUID)
#             temp_characteristic = await temp_service.characteristic(_ENV_SENSE_TEMP_UUID)
#         except asyncio.TimeoutError:
#             print("Timeout discovering services/characteristics")
#             return

#         while True:
#             temp_deg_c = _decode_temperature(await temp_characteristic.read())
#             print("Temperature: {:.2f}".format(temp_deg_c))
#             await asyncio.sleep_ms(1000)


# asyncio.run(main())

[31mNo serial connected
[0m  %serialconnect to connect
  %esptool to flash the device
  %lsmagic to list commands