Skip to content

Commit

Permalink
RFCOMM: Slightly refactor and correct constants
Browse files Browse the repository at this point in the history
  • Loading branch information
zxzxwu committed Jan 31, 2024
1 parent 071fc27 commit 9bdaf7a
Showing 1 changed file with 24 additions and 83 deletions.
107 changes: 24 additions & 83 deletions bumble/rfcomm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import logging
import asyncio
import dataclasses
import enum
from typing import Callable, Dict, List, Optional, Tuple, Union, TYPE_CHECKING

Expand Down Expand Up @@ -60,23 +61,13 @@

RFCOMM_PSM = 0x0003


# Frame types
RFCOMM_SABM_FRAME = 0x2F # Control field [1,1,1,1,_,1,0,0] LSB-first
RFCOMM_UA_FRAME = 0x63 # Control field [0,1,1,0,_,0,1,1] LSB-first
RFCOMM_DM_FRAME = 0x0F # Control field [1,1,1,1,_,0,0,0] LSB-first
RFCOMM_DISC_FRAME = 0x43 # Control field [0,1,0,_,0,0,1,1] LSB-first
RFCOMM_UIH_FRAME = 0xEF # Control field [1,1,1,_,1,1,1,1] LSB-first
RFCOMM_UI_FRAME = 0x03 # Control field [0,0,0,_,0,0,1,1] LSB-first

RFCOMM_FRAME_TYPE_NAMES = {
RFCOMM_SABM_FRAME: 'SABM',
RFCOMM_UA_FRAME: 'UA',
RFCOMM_DM_FRAME: 'DM',
RFCOMM_DISC_FRAME: 'DISC',
RFCOMM_UIH_FRAME: 'UIH',
RFCOMM_UI_FRAME: 'UI'
}
class FrameType(enum.IntEnum):
SABM = 0x2F # Control field [1,1,1,1,_,1,0,0] LSB-first
UA = 0x63 # Control field [0,1,1,0,_,0,1,1] LSB-first
DM = 0x0F # Control field [1,1,1,1,_,0,0,0] LSB-first
DISC = 0x43 # Control field [0,1,0,_,0,0,1,1] LSB-first
UIH = 0xEF # Control field [1,1,1,_,1,1,1,1] LSB-first
UI = 0x03 # Control field [0,0,0,_,0,0,1,1] LSB-first

# MCC Types
RFCOMM_MCC_PN_TYPE = 0x20
Expand Down Expand Up @@ -118,7 +109,7 @@
0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF
])

RFCOMM_DEFAULT_WINDOW_SIZE = 16
RFCOMM_DEFAULT_WINDOW_SIZE = 7
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000

RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1
Expand Down Expand Up @@ -183,7 +174,7 @@ def compute_fcs(buffer: bytes) -> int:
class RFCOMM_Frame:
def __init__(
self,
frame_type: int,
frame_type: FrameType,
c_r: int,
dlci: int,
p_f: int,
Expand All @@ -206,14 +197,11 @@ def __init__(
self.length = bytes([(length << 1) | 1])
self.address = (dlci << 2) | (c_r << 1) | 1
self.control = frame_type | (p_f << 4)
if frame_type == RFCOMM_UIH_FRAME:
if frame_type == FrameType.UIH:
self.fcs = compute_fcs(bytes([self.address, self.control]))
else:
self.fcs = compute_fcs(bytes([self.address, self.control]) + self.length)

def type_name(self) -> str:
return RFCOMM_FRAME_TYPE_NAMES[self.type]

@staticmethod
def parse_mcc(data) -> Tuple[int, bool, bytes]:
mcc_type = data[0] >> 2
Expand All @@ -237,32 +225,32 @@ def make_mcc(mcc_type: int, c_r: int, data: bytes) -> bytes:

@staticmethod
def sabm(c_r: int, dlci: int):
return RFCOMM_Frame(RFCOMM_SABM_FRAME, c_r, dlci, 1)
return RFCOMM_Frame(FrameType.SABM, c_r, dlci, 1)

@staticmethod
def ua(c_r: int, dlci: int):
return RFCOMM_Frame(RFCOMM_UA_FRAME, c_r, dlci, 1)
return RFCOMM_Frame(FrameType.UA, c_r, dlci, 1)

@staticmethod
def dm(c_r: int, dlci: int):
return RFCOMM_Frame(RFCOMM_DM_FRAME, c_r, dlci, 1)
return RFCOMM_Frame(FrameType.DM, c_r, dlci, 1)

@staticmethod
def disc(c_r: int, dlci: int):
return RFCOMM_Frame(RFCOMM_DISC_FRAME, c_r, dlci, 1)
return RFCOMM_Frame(FrameType.DISC, c_r, dlci, 1)

@staticmethod
def uih(c_r: int, dlci: int, information: bytes, p_f: int = 0):
return RFCOMM_Frame(
RFCOMM_UIH_FRAME, c_r, dlci, p_f, information, with_credits=(p_f == 1)
FrameType.UIH, c_r, dlci, p_f, information, with_credits=(p_f == 1)
)

@staticmethod
def from_bytes(data: bytes) -> RFCOMM_Frame:
# Extract fields
dlci = (data[0] >> 2) & 0x3F
c_r = (data[0] >> 1) & 0x01
frame_type = data[1] & 0xEF
frame_type = FrameType(data[1] & 0xEF)
p_f = (data[1] >> 4) & 0x01
length = data[2]
if length & 0x01:
Expand Down Expand Up @@ -291,7 +279,7 @@ def __bytes__(self) -> bytes:

def __str__(self) -> str:
return (
f'{color(self.type_name(), "yellow")}'
f'{color(self.type.name, "yellow")}'
f'(c/r={self.c_r},'
f'dlci={self.dlci},'
f'p/f={self.p_f},'
Expand All @@ -301,6 +289,7 @@ def __str__(self) -> str:


# -----------------------------------------------------------------------------
@dataclasses.dataclass
class RFCOMM_MCC_PN:
dlci: int
cl: int
Expand All @@ -310,24 +299,6 @@ class RFCOMM_MCC_PN:
max_retransmissions: int
window_size: int

def __init__(
self,
dlci: int,
cl: int,
priority: int,
ack_timer: int,
max_frame_size: int,
max_retransmissions: int,
window_size: int,
) -> None:
self.dlci = dlci
self.cl = cl
self.priority = priority
self.ack_timer = ack_timer
self.max_frame_size = max_frame_size
self.max_retransmissions = max_retransmissions
self.window_size = window_size

@staticmethod
def from_bytes(data: bytes) -> RFCOMM_MCC_PN:
return RFCOMM_MCC_PN(
Expand All @@ -350,23 +321,13 @@ def __bytes__(self) -> bytes:
self.max_frame_size & 0xFF,
(self.max_frame_size >> 8) & 0xFF,
self.max_retransmissions & 0xFF,
self.window_size & 0xFF,
self.window_size & 0x07,
]
)

def __str__(self) -> str:
return (
f'PN(dlci={self.dlci},'
f'cl={self.cl},'
f'priority={self.priority},'
f'ack_timer={self.ack_timer},'
f'max_frame_size={self.max_frame_size},'
f'max_retransmissions={self.max_retransmissions},'
f'window_size={self.window_size})'
)


# -----------------------------------------------------------------------------
@dataclasses.dataclass
class RFCOMM_MCC_MSC:
dlci: int
fc: int
Expand All @@ -375,16 +336,6 @@ class RFCOMM_MCC_MSC:
ic: int
dv: int

def __init__(
self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int
) -> None:
self.dlci = dlci
self.fc = fc
self.rtc = rtc
self.rtr = rtr
self.ic = ic
self.dv = dv

@staticmethod
def from_bytes(data: bytes) -> RFCOMM_MCC_MSC:
return RFCOMM_MCC_MSC(
Expand All @@ -409,16 +360,6 @@ def __bytes__(self) -> bytes:
]
)

def __str__(self) -> str:
return (
f'MSC(dlci={self.dlci},'
f'fc={self.fc},'
f'rtc={self.rtc},'
f'rtr={self.rtr},'
f'ic={self.ic},'
f'dv={self.dv})'
)


# -----------------------------------------------------------------------------
class DLC(EventEmitter):
Expand Down Expand Up @@ -471,7 +412,7 @@ def send_frame(self, frame: RFCOMM_Frame) -> None:
self.multiplexer.send_frame(frame)

def on_frame(self, frame: RFCOMM_Frame) -> None:
handler = getattr(self, f'on_{frame.type_name()}_frame'.lower())
handler = getattr(self, f'on_{frame.type.name}_frame'.lower())
handler(frame)

def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
Expand Down Expand Up @@ -711,7 +652,7 @@ def on_pdu(self, pdu: bytes) -> None:
if frame.dlci == 0:
self.on_frame(frame)
else:
if frame.type == RFCOMM_DM_FRAME:
if frame.type == FrameType.DM:
# DM responses are for a DLCI, but since we only create the dlc when we
# receive a PN response (because we need the parameters), we handle DM
# frames at the Multiplexer level
Expand All @@ -724,7 +665,7 @@ def on_pdu(self, pdu: bytes) -> None:
dlc.on_frame(frame)

def on_frame(self, frame: RFCOMM_Frame) -> None:
handler = getattr(self, f'on_{frame.type_name()}_frame'.lower())
handler = getattr(self, f'on_{frame.type.name}_frame'.lower())
handler(frame)

def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
Expand Down

0 comments on commit 9bdaf7a

Please sign in to comment.