In [1]:
from fastcore.xtras import Path
import asn1tools

folder = Path("/home/rsilva/repositorios/teleparser/data")
folder.ls()

(#6) [Path('/home/rsilva/repositorios/teleparser/data/claro_volte'),Path('/home/rsilva/repositorios/teleparser/data/Exemplo-ECSCF-CDR.TXT'),Path('/home/rsilva/repositorios/teleparser/data/Exemplo-IBCF - CDR.TXT'),Path('/home/rsilva/repositorios/teleparser/data/IPBSA1ECSCF.202503181503566470'),Path('/home/rsilva/repositorios/teleparser/data/IPBSA1ECSCF.202503181727526053_NOK'),Path('/home/rsilva/repositorios/teleparser/data/SBC_TIM.asn')]

In [None]:
from collections import namedtuple
from dataclasses import dataclass
from typing import Optional, Tuple, Callable
from io import BufferedReader, BytesIO
from tqdm.auto import tqdm  # Use standard tqdm for compatibility with nesting
# Basic ASN.1 Reference
# https://luca.ntop.org/Teaching/Appunti/asn1.html


CLASS_SHIFT = 6
TWO_BIT_MASK = 3
ENCODE_SHIFT = 5
MODULO_2 = 1
CLASSNUM_MASK = 31
MASK_BIT7 = 127
SHIFT_7 = 7
HIGH_CLASS_NUM = 31
MASK_BIT8 = 128
SHIFT_8 = 8


# BerClass
UNIVERSAL = 0
APPLICATION = 1
CONTEXT = 2
PRIVATE = 3


EOC = {} #(0, False, 0, 0), (2, True, 1, 0)}


BerTag = namedtuple("BerTag", ["tag_class", "constructed", "number"])


@dataclass
class BerDecoder:
    """Basic Encoding Rules decoder"""

    buffer_manager: Optional[BufferedReader] = None

    @staticmethod
    def decode_tag(tag_bytes: bytes):
        first_byte = tag_bytes[0]
        tag_class = (first_byte >> CLASS_SHIFT) & TWO_BIT_MASK
        constructed = ((first_byte >> ENCODE_SHIFT) & MODULO_2) == 1
        number = first_byte & CLASSNUM_MASK

        if number == CLASSNUM_MASK:
            # Handle multi-byte tag
            number = 0
            for b in tag_bytes[1:]:
                number = (number << SHIFT_7) | (b & MASK_BIT7)
                if b >> SHIFT_7 == 0:
                    break

        return BerTag(tag_class, constructed, number)

    @staticmethod
    def _reached_eoc(tag: BerTag, length: int):
        return (tag.tag_class, tag.constructed, tag.number, length) in EOC

    def decode(
        self,
        stream: BufferedReader | BytesIO,
        offset: int = 0,
        depth: int = 0,
        schema: dict | None = None,
    ):
        if (tag_bytes := self._read_tag(stream)) is None:
            return None

        tag = self.decode_tag(tag_bytes)
        length, length_size = self._read_length(stream)
        offset += len(tag_bytes) + length_size

        if self._reached_eoc(tag, length) or length == 0:
            return self.decode(stream, offset, depth, schema)
        # Read value
        value = stream.read(length)
        if len(value) != length:
            raise ValueError("Unexpected end of data")
        
        return tag, length, value

    @staticmethod
    def _read_tag(stream: BufferedReader) -> Optional[bytes]:
        first_byte = stream.read(1)  # index makes a converted to int
        if not first_byte:
            return None

        tag_byte = first_byte
        if int.from_bytes(tag_byte, "big") & HIGH_CLASS_NUM == HIGH_CLASS_NUM:
            # Multi-byte tag
            while True:
                b = stream.read(1)
                if not b:
                    raise ValueError("Unexpected end of tag")
                tag_byte += b
                if int.from_bytes(b, "big") & MASK_BIT8 == 0:
                    break

        return tag_byte

    @staticmethod
    def _read_length(stream: BufferedReader) -> Tuple[int, int]:
        first_byte = stream.read(1)[0]
        # definite short form
        if first_byte >> SHIFT_7 == 0:
            return first_byte, 1

        length = 0
        length_size = first_byte & MASK_BIT7
        # indefinite form
        if length_size == 0:
            return 0, 1

        length_bytes = stream.read(length_size)
        if len(length_bytes) != length_size:
            raise ValueError("Unexpected end of length")

        #  definite long form
        #  When iterating on bytes it's already converted to int
        for b in length_bytes:
            length = (length << SHIFT_8) | b

        return length, length_size + 1

    def parse_blocks(self):
        with self.buffer_manager.open() as file_buffer:
            while (tlv := self.decode(file_buffer)) is not None:
                yield tlv

    def process(self):
        """Process the BER data and return a list of parsed blocks."""
        return list(
            tqdm(self.parse_blocks(), desc="Parsing TLVs", unit=" block", leave=False)
        )

    @property
    def transform_func(self):  # Just a placeholder for compatibility
        return None


In [13]:
sbc_tim_der = asn1tools.compile_files(f'{folder}/SBC_TIM.asn', codec='der')
sbc_tim_ber = asn1tools.compile_files(f"{folder}/SBC_TIM.asn", codec="ber")


In [29]:
sbc_tim_der.types.keys()

dict_keys(['IMSRecord', 'SCSCFRecord', 'PCSCFRecord', 'ICSCFRecord', 'MRFCRecord', 'MGCFRecord', 'BGCFRecord', 'IBCFRecord', 'AGCFRecord', 'ATSRecord', 'SCP-Connection', 'VCCASRecord', 'VideoASRecord', 'ALUCTSRecord', 'MediaXRecord', 'ConferenceASRecord', 'CentrexASRecord', 'SBCRecord', 'RCSRecord', 'ECSCFRecord', 'RecordType', 'NodeAddress', 'IPAddress', 'IPBinaryAddress', 'IPTextRepresentedAddress', 'TimeStamp', 'LocalSequenceNumber', 'UnsignedInter32', 'UnsignedInter64', 'ServiceContextID', 'Service-Identifier', 'Service-Indetity', 'SubscriptionID', 'SubscriptionIDType', 'ManagementExtensions', 'ManagementExtension', 'S-CSCF-Information', 'ServiceSpecificInfo', 'AccessCorrelationID', 'ACRInterimLost', 'ApplicationServersInformation', 'CarrierSelectRouting', 'CauseForRecordClosing', 'Early-Media-Components-List', 'IMS-Charging-Identifier', 'IMSCommunicationServiceIdentifier', 'Incomplete-CDR-Indication', 'InterOperatorIdentifierlist', 'InterOperatorIdentifiers', 'InvolvedParty', 'Lis

In [6]:
example_1 = (folder / 'IPBSA1ECSCF.202503181503566470').read_bytes()

In [76]:
from teleparser.buffer import BufferManager
file_path = Path(folder / "IPBSA1ECSCF.202503181503566470")

In [77]:
ber = BerDecoder(None)

In [78]:
with open(file_path, "rb") as f:
    while (tlv := ber.decode(f)) is not None:
        print(tlv)

OverflowError: cannot fit 'int' into an index-sized integer

In [47]:
example_1[544:573]

b'sip:+5567981331013@ims.tim.br'

In [49]:
for i, b in enumerate(example_1):
    if b == 31:
        print(i)
        break

138


In [68]:
example_1

b'\x00\x00\x14\xbc\x00\x00\x002\xa2\xa29<\x10\xc09<0\xc0\x00\x00\x00\r\x00\x00\x19F\x02\n.\xb2S\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01q\xa2)\xbfF\x82\x01l\x80\x01F\x82\x06UPDATE\x83\x01\x00\xa4\x1c\x81\x1aIPBSA1-ECSCF-01.ims.tim.br\x85 sbcthLNQN0kRyHVFPeb69xXCm2yiQOE5\xa6\x1f\x80\x1dsip:+5585999583692@ims.tim.br\xa7\t\x81\x07tel:188\x89\t%\x03\x18\x14DV-\x03\x00\x8a\t%\x03\x18\x14DV-\x03\x00\x8c\t%\x03\x18\x14YV-\x03\x00\x8d\t%\x03\x18\x14YV-\x03\x00\x8f\x05\x00\xb3" \x1a\x90\x01\x04\x91\x01\x03\xb2\t\x80\x01\xff\x81\x01\x01\x82\x01\xff\x93%BSA1-PCSCF-01.194.f2ae.20250318171450\x9dE3GPP-E-UTRAN-FDD;utran-cell-id-3gpp=724048D5061D2983;network-provided\xbf\x1f\x161\x14\x80\x01\x01\x81\x0f724028203713506\x9f%\x02\x015\x9f&\x02\x015\xbf:\x14\x80\x01\x00\x81\x0f35692579-159293\x9f\x81H\x02\x03\x84\x01\xb8\xa2)\xbfF\x82\x01\xb3\x80\x01F\x82\x03BYE\x83\x01\x00\xa4\x1c\x81\x1aIPBSA1-ECSCF-01.ims.tim.br\x85AsbcthLNQeQ2MpmpN_E9sv03ZZccAA3..@acIn:aBn:cBVH:

In [61]:
for i in range(len(example_1)):
    try:
        sbc_ok = sbc_tim_ber.decode("ECSCFRecord", example_1[i:])
        break
    except:
        pass

In [67]:
sbc_tim_der.decode("RecordType", example_1[i:])


DecodeTagError: RecordType: Expected INTEGER(RecordType) with tag '02', but got '31'. (At offset: 0)

In [13]:
(version, msg_len_bytes, flags, cmd_bytes, app_id, hbh_id, e2e_id) = struct.unpack(
    ">B3sB3sIII", bdata[2:22]
)


In [14]:
version

1

In [15]:
int.from_bytes(msg_len_bytes)

1320

In [None]:
bdata[1320:]

In [25]:
str(parse_command_flags(flags))

'ACR message - Flags: Proxiable, Retransmitted'

In [19]:
flags.value

{'R': True,
 'P': True,
 'E': False,
 'T': True,
 'isACR': True,
 'isACA': False,
 'isProxiable': True,
 'hasProtocolError': False,
 'isRetransmitted': True}

In [9]:
int.from_bytes(cmd_bytes)

271

In [10]:
app_id

3

In [11]:
hbh_id

1020721216

In [12]:
e2e_id

89225398