In [1]:
from flask import Flask, render_template, request, redirect, url_for
import pandas as pd
import time
import numpy 
import asyncio
import struct
import datetime
import nest_asyncio
import bleak
from bleak import BleakScanner, BleakClient

In [2]:
app = Flask(__name__)

In [3]:
def adv_decrypt(data):
    # Расшифровка уровня топлива
    oil_level_raw = int.from_bytes(data[1:3], byteorder='little', signed=False)
    print(f"Уровень топлива: {oil_level_raw}")

    # Расшифровка напряжения встроенной батареи
    battery_voltage_raw = int.from_bytes(data[3:4], byteorder='little', signed=False)
    battery_voltage = battery_voltage_raw / 10.0
    print(f"Напряжение встроенной батареи: {battery_voltage} В")

    # Расшифровка температуры
    TD_temp_raw = int.from_bytes(data[4:5], byteorder='little', signed=True)
    print(f"Температура: {TD_temp_raw} Градус Цельсия")

    return oil_level_raw, battery_voltage, TD_temp_raw

In [4]:
class MyScanner:
    def __init__(self, timeout, start_serial, end_serial):
        self._scanner = BleakScanner(detection_callback=self.detection_callback)
        self.scanning = asyncio.Event()
        self.address = None
        self.timeout = timeout
        self.start_serial = start_serial
        self.end_serial = end_serial
        self.device_names = ["TD_" + str(i) for i in range(start_serial, end_serial+1)]
        self.df = pd.DataFrame({'Name': self.device_names, 
                                'MAC': [None] * len(self.device_names),
                                'RSSI': [None] * len(self.device_names),
                                'O-L' : [None] * len(self.device_names),
                                'T': [None] * len(self.device_names),
                                'B-W': [None] * len(self.device_names),
                                'HK': [None] * len(self.device_names),
                                'LK': [None] * len(self.device_names)})

        

    def detection_callback(self, device, advertisement_data):
        if device.name in self.device_names: 
            index = self.df.index[df['Name'] == device.name][0]
            if self.df.at[index, 'MAC'] is None:   
                self.df.at[index, 'MAC'] = device.address
            if self. df.at[index, 'RSSI'] is None:   
                self.df.at[index, 'RSSI'] = advertisement_data.rssi
            oil_level_raw, battery_voltage, TD_temp_raw = adv_decrypt(advertisement_data.manufacturer_data[3862])
            if self. df.at[index, 'O-L'] is None:   
                self.df.at[index, 'O-L'] = oil_level_raw
            if self. df.at[index, 'B-W'] is None:   
                self.df.at[index, 'B-W'] = battery_voltage   
            if self. df.at[index, 'T'] is None:   
                self.df.at[index, 'T'] = TD_temp_raw   
            
    async def run(self):
        await self._scanner.start()
        self.scanning.set()
        end_time = loop.time() + self.timeout
        while self.scanning.is_set():
            if loop.time() > end_time:
                self.scanning.clear()
                print('\t\tScan has timed out so we terminate')
            await asyncio.sleep(0.1)
        await self._scanner.stop()

    def get_address(self):
        return self.address
    def get_df(self):
        return self.df

In [5]:
@app.route('/')
def index():
    return render_template('index.html')

@app.route('/scan', methods=['POST'])
def scan():
    my_scanner = MyScanner()
    loop = asyncio.get_event_loop()
    timeout = int(request.form['timeout'])
    start_serial = int(request.form['start_serial'])
    end_serial = int(request.form['end_serial'])
    loop.run_until_complete(my_scanner.run(timeout, start_serial, end_serial))
    address = my_scanner.get_address()
    df = my_scanner.get_df()

    return render_template('scan.html', df=df)

@app.route('/report', methods=['POST'])
def report():
    df = pd.read_json(request.form['data'])
    filename = 'report.xlsx'
    df.to_excel(filename, index=False)
    return redirect(url_for('index'))

if __name__ == '__main__':
    app.run(debug=False)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
127.0.0.1 - - [29/May/2023 15:56:48] "GET / HTTP/1.1" 200 -


In [None]:
my_scanner = MyScanner()
loop = asyncio.get_event_loop()
loop.run_until_complete(my_scanner.run())
address = my_scanner.get_address()