Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the UCS2 encoding/decoding logic. #98

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions gsmmodem/dcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# -*- coding: utf-8 -*-

""" Data Coding Scheme related utility methods.
For details of the GSM standard, see https://en.wikipedia.org/wiki/Data_Coding_Scheme """

from enum import Enum, auto
from .pdu import decodeGsm7, unpackSeptets, decodeUcs2

class Charset(Enum):
GSM_7_BIT = auto()
EIGHT_BIT_DATA = auto()
UCS2 = auto()
RESERVED = auto()
UNDEFINED = auto()


def dcsToCharset(dcs):
"""See 3GPP TS 23.038 V9.1.1 (2010-02)"""

bits_7t4 = dcs >> 4
bits_3t0 = dcs & 0x0f
if bits_7t4 <= 0b0000:
return Charset.GSM_7_BIT
elif bits_7t4 <= 0b0001:
if bits3t0 == 0b0000:
# TODO:
# GSM 7 bit default alphabet; message preceded by language indication.
# The first 3 characters of the message are a two-character representation of the
# language encoded according to ISO 639 [12], followed by a CR character. The
# CR character is then followed by 90 characters of text.
return Charset.GSM_7_BIT
elif bits3t0 == 0b0001:
# TODO:
# UCS2; message preceded by language indication
# The message starts with a two GSM 7-bit default alphabet character
# representation of the language encoded according to ISO 639 [12]. This is padded
# to the octet boundary with two bits set to 0 and then followed by 40 characters of
# UCS2-encoded message.
# An MS not supporting UCS2 coding will present the two character language
# identifier followed by improperly interpreted user data.
return Charset.UCS2
else:
return Charset.UNDEFINED
elif (bits_7t4 == 0b0010) or (bits_7t4 == 0b0011):
return Charset.GSM_7_BIT
elif bits_7t4 <= 0b0111:
# TODO:
# Bit 5, if set to 0, indicates the text is uncompressed
# Bit 5, if set to 1, indicates the text is compressed using the compression algorithm defined in
# 3GPP TS 23.042 [13]
return [ Charset.GSM_7_BIT,
Charset.EIGHT_BIT_DATA,
Charset.UCS2,
Charset.RESERVED ][bits_3t0 >> 2]
elif bits_7t4 == 0b1000:
return Charset.RESERVED
elif bits_7t4 == 0b1001:
return [ Charset.GSM_7_BIT,
Charset.EIGHT_BIT_DATA,
Charset.UCS2,
Charset.RESERVED ][bits_3t0 >> 2]
elif bits_7t4 <= 0b1100:
return Charset.RESERVED
elif bits_7t4 <= 0b1101:
# TODO: I1 protocol message defined in 3GPP TS 24.294 [19]
return Charset.RESERVED
elif bits_7t4 <= 0b1110:
# TODO: Defined by the WAP Forum [15]
return Charset.RESERVED
elif bits_7t4 <= 0b1111:
bit2 = (bits_3t0 & 0b0100) >> 2
if bit2:
Charset.EIGHT_BIT_DATA
else:
Charset.GSM_7_BIT


def decodeWithDcs(data, dcs, logger = None):
charset = dcsToCharset(dcs)
logger.debug('dcs = ' + str(dcs))
logger.debug('charset = ' + str(charset))
logger.debug('data = ' + str(data))
retval = None
dataBytes = bytes.fromhex(data)
if charset == Charset.GSM_7_BIT:
retval = decodeGsm7(unpackSeptets(dataBytes))
elif charset == Charset.UCS2:
retval = decodeUcs2(iter(dataBytes), len(dataBytes))
elif charset == Charset.EIGHT_BIT_DATA:
retval = data
else:
# RESERVED
# UNDEFINED
if logger:
logger.debug('decodeWithDcs(): Unable to determine the encoding from the DCS data, returning the data as if it is 8-bit data anyway.')
retval = data

return retval
38 changes: 32 additions & 6 deletions gsmmodem/modem.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

from .serial_comms import SerialComms
from .exceptions import CommandError, InvalidStateException, CmeError, CmsError, InterruptedException, TimeoutException, PinRequiredError, IncorrectPinError, SmscNumberUnknownError
from .pdu import encodeSmsSubmitPdu, decodeSmsPdu, encodeGsm7, encodeTextMode
from .pdu import encodeSmsSubmitPdu, decodeSmsPdu, encodeGsm7, encodeTextMode, packSeptets
from .util import SimpleOffsetTzInfo, lineStartingWith, allLinesMatchingPattern, parseTextModeTimeStr, removeAtPrefix
from .dcs import decodeWithDcs

#from . import compat # For Python 2.6 compatibility
from gsmmodem.util import lineMatching
Expand Down Expand Up @@ -174,6 +175,7 @@ def __init__(self, port, baudrate=115200, incomingCallCallbackFunc=None, smsRece
self._pollCallStatusRegex = None # Regular expression used when polling outgoing call status
self._writeWait = 0 # Time (in seconds to wait after writing a command (adjusted when 515 errors are detected)
self._smsTextMode = False # Storage variable for the smsTextMode property
self._ussdTextMode = False # Storage variable for the ussdTextMode property
self._gsmBusy = 0 # Storage variable for the GSMBUSY property
self._smscNumber = None # Default SMSC number
self._smsRef = 0 # Sent SMS reference counter
Expand Down Expand Up @@ -248,8 +250,8 @@ def connect(self, pin=None, waitingForModemToStartInSeconds=0):
# Huawei modems use ^DTMF to send DTMF tones
callUpdateTableHint = 1 # Huawei
if '^USSDMODE' in commands:
# Enable Huawei text-mode USSD
self.write('AT^USSDMODE=0', parseError=False)
# Disable Huawei text-mode USSD. It doesn't work with unicode responses.
self.write('AT^USSDMODE=1', parseError=False)
if '+WIND' in commands:
callUpdateTableHint = 2 # Wavecom
enableWind = True
Expand Down Expand Up @@ -605,6 +607,19 @@ def smsTextMode(self, textMode):
self._smsTextMode = textMode
self._compileSmsRegexes()

@property
def ussdTextMode(self):
""" :return: True if the modem is set to use text mode for USSD, False if it is set to use PDU mode """
return self._ussdTextMode
@ussdTextMode.setter
def ussdTextMode(self, textMode):
""" Set to True for the modem to use text mode for USSD, or False for it to use PDU mode """
if textMode != self._ussdTextMode and '^USSDMODE' in self.supportedCommands:
if self.alive:
self.write('AT^USSDMODE={0}'.format(0 if textMode else 1))
self.write('AT+CSCS="{0}"'.format('IRA' if textMode else 'UCS2'))
self._ussdTextMode = textMode

@property
def smsSupportedEncoding(self):
"""
Expand Down Expand Up @@ -965,6 +980,8 @@ def sendUssd(self, ussdString, responseTimeout=15):
:rtype: gsmmodem.modem.Ussd
"""
self._ussdSessionEvent = threading.Event()
if not self.ussdTextMode:
ussdString = packSeptets(encodeGsm7(ussdString)).hex().upper()
try:
cusdResponse = self.write('AT+CUSD=1,"{0}",15'.format(ussdString), timeout=responseTimeout) # Should respond with "OK"
except Exception:
Expand Down Expand Up @@ -1532,18 +1549,27 @@ def _parseCusdResponse(self, lines):
self.log.debug('Multiple +CUSD responses received; filtering...')
# Some modems issue a non-standard "extra" +CUSD notification for releasing the session
for cusdMatch in cusdMatches:
if self.ussdTextMode:
m = cusdMatch.group(2)
else:
m = decodeWithDcs(cusdMatch.group(2), int(cusdMatch.group(3)), self.log)
if cusdMatch.group(1) == '2':
# Set the session to inactive, but ignore the message
self.log.debug('Ignoring "session release" message: %s', cusdMatch.group(2))
self.log.debug('Ignoring "session release" message: %s', m)
sessionActive = False
else:
# Not a "session release" message
message = cusdMatch.group(2)
message = m
if sessionActive and cusdMatch.group(1) != '1':
sessionActive = False
else:
sessionActive = cusdMatches[0].group(1) == '1'
message = cusdMatches[0].group(2)
if self.ussdTextMode:
message = cusdMatches[0].group(2)
else:
message = decodeWithDcs(cusdMatches[0].group(2),
int(cusdMatches[0].group(3)),
self.log)
return Ussd(self, sessionActive, message)

def _placeHolderCallback(self, *args):
Expand Down
19 changes: 6 additions & 13 deletions gsmmodem/pdu.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# -*- coding: utf8 -*-
# -*- coding: utf-8 -*-

""" SMS PDU encoding methods """

Expand Down Expand Up @@ -904,16 +904,14 @@ def unpackSeptets(septets, numberOfSeptets=None, prevOctet=None, shift=7):

def decodeUcs2(byteIter, numBytes):
""" Decodes UCS2-encoded text from the specified byte iterator, up to a maximum of numBytes """
userData = []
i = 0
userData = bytearray()
try:
while i < numBytes:
userData.append(unichr((next(byteIter) << 8) | next(byteIter)))
i += 2
for i in range(numBytes):
userData.append(next(byteIter))
except StopIteration:
# Not enough bytes in iterator to reach numBytes; return what we have
pass
return ''.join(userData)
return userData.decode('utf-16-be')

def encodeUcs2(text):
""" UCS2 text encoding algorithm
Expand All @@ -925,12 +923,7 @@ def encodeUcs2(text):
:return: A bytearray containing the string encoded in UCS2 encoding
:rtype: bytearray
"""
result = bytearray()

for b in map(ord, text):
result.append(b >> 8)
result.append(b & 0xFF)
return result
return text.encode('utf-16-be')

def divideTextUcs2(plainText):
""" UCS-2 message dividing algorithm
Expand Down