Oczywiście! Poniżej przedstawiam przykład asynchronicznego odczytywania i przetwarzania danych z portu szeregowego przy użyciu biblioteki pyserial-asyncio:

In [None]:
import asyncio
import serial_asyncio


async def odczytuj_dane(ser):
    while True:
        dane = await ser.readline()
        dane = dane.decode().strip()
        if dane:
            print("Odczytano dane:", dane)

async def przetwarzaj_dane():
    while True:
        # Tutaj umieść logikę przetwarzania danych
        await asyncio.sleep(1)  # Przykładowe opóźnienie

async def main():
    loop = asyncio.get_running_loop()

    # Inicjalizacja portu szeregowego
    ser = await serial_asyncio.open_serial_connection(url='/dev/ttyUSB0', baudrate=9600)

    # Uruchomienie zadania odczytu danych
    loop.create_task(odczytuj_dane(ser[0]))

    # Uruchomienie zadania przetwarzania danych
    loop.create_task(przetwarzaj_dane())

    # Główna pętla programu
    while True:
        await asyncio.sleep(1)  # Możesz wykonywać inne operacje w pętli

asyncio.run(main())


W tym przykładzie używamy biblioteki pyserial-asyncio do asynchronicznego odczytywania danych z portu szeregowego. Funkcja odczytuj_dane() jest odpowiedzialna za odczyt danych z portu i wyświetlanie ich na bieżąco. Funkcja przetwarzaj_dane() to przykładowa funkcja przetwarzająca dane, którą można dostosować do swoich potrzeb.

W funkcji main() tworzymy pętlę asyncio, inicjalizujemy port szeregowy przy użyciu serial_asyncio.open_serial_connection() i uruchamiamy zadania odczytu danych i przetwarzania danych przy użyciu loop.create_task(). Następnie w głównej pętli programu można wykonywać inne operacje.

Dzięki użyciu biblioteki pyserial-asyncio odczytywanie danych nie blokuje przetwarzania danych, ponieważ oba zadania działają asynchronicznie.

Mam nadzieję, że ten przykład jest przydatny! Jeśli masz jakiekolwiek pytania, służę pomocą.

Oczywiście! Poniżej przedstawiam zmodyfikowany przykład, w którym funkcja, która odczytuje dane, buforuje je, a funkcja przetwarzająca zapisuje te dane z bufora do pliku:

In [None]:
import asyncio
import serial_asyncio


async def odczytuj_dane(ser, bufor):
    while True:
        dane = await ser.readline()
        dane = dane.decode().strip()
        if dane:
            bufor.append(dane)

async def przetwarzaj_dane(bufor, plik):
    while True:
        await asyncio.sleep(1)  # Przykładowe opóźnienie
        if bufor:
            dane = bufor.pop(0)
            plik.write(dane + '\n')
            plik.flush()  # Wymuszenie zapisu do pliku

async def main():
    loop = asyncio.get_running_loop()

    # Inicjalizacja portu szeregowego
    ser = await serial_asyncio.open_serial_connection(url='/dev/ttyUSB0', baudrate=9600)

    # Inicjalizacja bufora danych i pliku
    bufor = []
    plik = open('dane.txt', 'a')

    # Uruchomienie zadania odczytu danych
    loop.create_task(odczytuj_dane(ser[0], bufor))

    # Uruchomienie zadania przetwarzania danych
    loop.create_task(przetwarzaj_dane(bufor, plik))

    # Główna pętla programu
    while True:
        await asyncio.sleep(1)  # Możesz wykonywać inne operacje w pętli

asyncio.run(main())


W tym zmodyfikowanym przykładzie dodaliśmy bufor danych bufor, który przechowuje odczytane dane z portu szeregowego. Funkcja odczytuj_dane() odczytuje dane z portu i dodaje je do bufora. Funkcja przetwarzaj_dane() sprawdza, czy bufor zawiera dane, a następnie zapisuje je do pliku.

W funkcji main() inicjalizujemy bufor danych oraz otwieramy plik w trybie 'a' (dodawanie do istniejącego pliku lub tworzenie nowego). Następnie uruchamiamy zadania odczytu danych i przetwarzania danych, przekazując im odpowiednie argumenty.

Odczytane dane są dodawane do bufora, a następnie przetwarzane przez funkcję przetwarzaj_dane(), która zapisuje je do pliku za pomocą plik.write(). Funkcja plik.flush() jest wywoływana, aby wymusić zapis danych do pliku.

Mam nadzieję, że ten zmodyfikowany przykład jest przydatny! Jeśli masz jakiekolwiek pytania, służę pomocą.