Skip to content

Commit

Permalink
intel driver: adapt to new way to load driver
Browse files Browse the repository at this point in the history
  • Loading branch information
SilverBzH committed Dec 13, 2023
1 parent 11460c8 commit e552858
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 30 deletions.
3 changes: 1 addition & 2 deletions bumble/drivers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import pathlib
import platform
from typing import Dict, Iterable, Optional, Type, TYPE_CHECKING
from . import rtk, intel

from . import rtk
from . import rtk, intel
from .common import Driver

if TYPE_CHECKING:
Expand Down
43 changes: 29 additions & 14 deletions bumble/drivers/intel.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
from dataclasses import dataclass
from enum import IntEnum
import logging
import os
import pathlib
import platform
from typing import Optional, Tuple

from bumble.hci import (
hci_vendor_command_op_code, # type: ignore
HCI_Command,
HCI_Reset_Command,
STATUS_SPEC, # type: ignore
HCI_SUCCESS,
)
from enum import IntEnum
from typing import Optional
import asyncio
import os
import pathlib
import platform
from bumble.drivers import common

# -----------------------------------------------------------------------------
# Logging
Expand Down Expand Up @@ -217,29 +220,41 @@ async def download_fw_payload(host, fw: bytes, header_offset: int):
frag_len = 0


class Driver:
def __init__(self, host, version: IntelVersionTLV, firmware: bytes):
class Driver(common.Driver):
def __init__(self, host, version: IntelVersionTLV, firmware: bytes, fw_name: str):
self.host = host
self.version = version
self.firmware = firmware
self.fw_name = fw_name

@classmethod
async def for_host(cls, host): # type: ignore
async def for_host(cls, host, force=False): # type: ignore
try:
if not force and not cls.check(host):
return None

version = await fetch_intel_version(host) # type: ignore
fw = prepare_firmware(version)
return cls(host, version, fw)
fw, fw_name = prepare_firmware(version)
return cls(host, version, fw, fw_name)
except Exception:
logging.exception("Error preparing the firmware")
return None

async def init_controller(self):
try:
await download_firmware(self.host, self.version, self.firmware)
await self.host.send_command(HCI_Reset_Command(), check_result=True)
logger.info(f"loaded FW image {self.fw_name}")
except Exception:
logging.exception("Failed to download the firmware")
return None

@staticmethod
def check(host):
if host.hci_metadata.get('driver') == 'intel':
# Forced driver
return True

@staticmethod
def find_binary_path(file_name: str) -> Optional[pathlib.Path]:
# First check if an environment variable is set
Expand Down Expand Up @@ -334,7 +349,7 @@ def fetch_firmware_name(version: IntelVersionTLV) -> str:
return f"ibt-{upper_name:04x}-{lower_name:04x}.sfi"


def prepare_firmware(version: IntelVersionTLV) -> bytes:
def prepare_firmware(version: IntelVersionTLV) -> Tuple[bytes, str]:
fw_name = fetch_firmware_name(version)
logging.debug(f"Firmware: {fw_name}")
fw_path = Driver.find_binary_path(fw_name)
Expand All @@ -346,7 +361,7 @@ def prepare_firmware(version: IntelVersionTLV) -> bytes:
raise ValueError(
"Firmware size is less then the minimum required size of 644 bytes"
)
return fw
return (fw, fw_name)


async def download_firmware(host, version: IntelVersionTLV, fw: bytes):
Expand Down
15 changes: 7 additions & 8 deletions bumble/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,14 +469,13 @@ def supported_le_features(self):
# Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet: bytes) -> None:
hci_packet = HCI_Packet.from_bytes(packet)
self.on_hci_packet(hci_packet)
# if self.ready or (
# isinstance(hci_packet, HCI_Command_Complete_Event)
# and hci_packet.command_opcode == HCI_RESET_COMMAND
# ):
# self.on_hci_packet(hci_packet)
# else:
# logger.debug('`reset not done`, ignoring packet from controller')
if self.ready or (
isinstance(hci_packet, HCI_Command_Complete_Event)
and hci_packet.command_opcode == HCI_RESET_COMMAND
):
self.on_hci_packet(hci_packet)
else:
logger.debug('reset not done, ignoring packet from controller')

def on_transport_lost(self):
# Called by the source when the transport has been lost.
Expand Down
11 changes: 5 additions & 6 deletions tools/intel_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ async def do_load(transport: str, force: bool):
):
# Create a host to communicate with the device
host = Host(hci_source, hci_sink)
if not force:
await host.reset(driver_factory=None)

driver = await intel.Driver.for_host(host)
driver = await intel.Driver.for_host(host, force)
if not driver:
print("Firmware already loaded or no supported driver for this device.")
return
Expand All @@ -61,12 +59,13 @@ async def do_info(transport: str, force: bool):
):
# Create a host to communicate with the device
host = Host(hci_source, hci_sink)
if not force:
await host.reset(driver_factory=None)
if not force and not intel.Driver.check(host):
print("Device not supported by this Intel driver")
return

version = await intel.fetch_intel_version(host)
if not version:
print("Device not supported by this RTK driver")
print("Device not supported by this Intel driver")
return
try:
fw_name = intel.fetch_firmware_name(version)
Expand Down

0 comments on commit e552858

Please sign in to comment.