diff --git a/adafruit_fona/adafruit_fona.py b/adafruit_fona/adafruit_fona.py index 4de1c8d..d3e1be3 100644 --- a/adafruit_fona/adafruit_fona.py +++ b/adafruit_fona/adafruit_fona.py @@ -47,12 +47,15 @@ # pylint: disable=bad-whitespace FONA_DEFAULT_TIMEOUT_MS = 500 # TODO: Check this against arduino... -# COMMANDS +# Commands CMD_AT = b"AT" -# REPLIES +# Replies REPLY_OK = b"OK" REPLY_AT = b"AT" +# Maximum number of fona800 and fona808 sockets +FONA_MAX_SOCKETS = const(6) + # FONA Versions FONA_800_L = const(0x01) FONA_800_H = const(0x6) @@ -61,11 +64,9 @@ FONA_3G_A = const(0x4) FONA_3G_E = const(0x5) -FONA_MAX_SOCKETS = const(6) - -FONA_SMS_STORAGE_SIM = b'"SM"' -FONA_SMS_STORAGE_INTERNAL = b'"SM"' - +# FONA preferred SMS storage +FONA_SMS_STORAGE_SIM = b'"SM"' # Storage on the SIM +FONA_SMS_STORAGE_INTERNAL = b'"ME"' # Internal storage on the FONA # pylint: enable=bad-whitespace @@ -73,24 +74,26 @@ class FONA: """CircuitPython FONA module interface. :param ~busio.uart UART: FONA UART connection. - :param ~digialio RST: FONA RST Pin. + :param ~digialio RST: FONA RST pin. + :param ~digialio RI: Optional FONA Ring Interrupt (RI) pin. :param bool debug: Enable debugging output. """ - # Connection modes - TCP_MODE = const(0) - UDP_MODE = const(1) + TCP_MODE = const(0) # TCP socket + UDP_MODE = const(1) # UDP socket # pylint: disable=too-many-arguments - def __init__(self, uart, rst, debug=False): + def __init__(self, uart, rst, ri=None, debug=False): self._buf = b"" # shared buffer self._fona_type = 0 self._debug = debug self._uart = uart self._rst = rst - self._rst.switch_to_output() + self._ri = ri + if self._ri is not None: + self._ri.switch_to_input() if not self._init_fona(): raise RuntimeError("Unable to find FONA. Please check connections.") @@ -101,15 +104,14 @@ def _init_fona(self): timeout = 7000 while timeout > 0: - while self._uart.in_waiting: - if self._send_check_reply(CMD_AT, reply=REPLY_OK): - break + if self._send_check_reply(CMD_AT, reply=REPLY_OK): + break if self._send_check_reply(CMD_AT, reply=REPLY_AT): break time.sleep(0.5) timeout -= 500 - if timeout <= 0: + if timeout <= 0: # no response to AT, last ditch attempt self._send_check_reply(CMD_AT, reply=REPLY_OK) time.sleep(0.1) self._send_check_reply(CMD_AT, reply=REPLY_OK) @@ -121,9 +123,9 @@ def _init_fona(self): self._send_check_reply(b"ATE0", reply=REPLY_OK) time.sleep(0.1) + self._read_line() if not self._send_check_reply(b"ATE0", reply=REPLY_OK): return False - time.sleep(0.1) # turn on hangupitude self._send_check_reply(b"AT+CVHU=0", reply=REPLY_OK) @@ -132,8 +134,8 @@ def _init_fona(self): self._buf = b"" self._uart.reset_input_buffer() - self.uart_write(b"ATI\r\n") - self.read_line(multiline=True) + self._uart_write(b"ATI\r\n") + self._read_line(multiline=True) if self._buf.find(b"SIM808 R14") != -1: self._fona_type = FONA_808_V2 @@ -146,8 +148,8 @@ def _init_fona(self): if self._fona_type == FONA_800_L: # determine if SIM800H - self.uart_write(b"AT+GMM\r\n") - self.read_line(multiline=True) + self._uart_write(b"AT+GMM\r\n") + self._read_line(multiline=True) if self._buf.find(b"SIM800H") != -1: self._fona_type = FONA_800_H @@ -155,19 +157,17 @@ def _init_fona(self): def factory_reset(self): """Resets modem to factory configuration.""" - self.uart_write(b"ATZ\r\n") + self._uart_write(b"ATZ\r\n") if not self._expect_reply(REPLY_OK): return False return True def reset(self): - """Performs a hardware reset on the modem. - NOTE: This may take a few seconds to complete. - - """ + """Performs a hardware reset on the modem.""" if self._debug: print("* Reset FONA") + self._rst.switch_to_output() self._rst.value = True time.sleep(0.01) self._rst.value = False @@ -187,16 +187,16 @@ def iemi(self): print("FONA IEMI") self._uart.reset_input_buffer() - self.uart_write(b"AT+GSN\r\n") - self.read_line(multiline=True) + self._uart_write(b"AT+GSN\r\n") + self._read_line(multiline=True) iemi = self._buf[0:15] return iemi.decode("utf-8") @property def local_ip(self): """Returns the local IP Address, False if not set.""" - self.uart_write(b"AT+CIFSR\r\n") - self.read_line() + self._uart_write(b"AT+CIFSR\r\n") + self._read_line() try: ip_addr = self.pretty_ip(self._buf) except ValueError: @@ -208,16 +208,14 @@ def iccid(self): """Returns SIM Card's ICCID number as a string.""" if self._debug: print("ICCID") - self.uart_write(b"AT+CCID\r\n") - self.read_line(timeout=2000) # 6.2.23, 2sec max. response time + self._uart_write(b"AT+CCID\r\n") + self._read_line(timeout=2000) # 6.2.23, 2sec max. response time iccid = self._buf.decode() - self.read_line() # eat 'OK' return iccid @property def gprs(self): """Returns module's GPRS state.""" - if not self._send_parse_reply(b"AT+CGATT?", b"+CGATT: ", ":"): return False if not self._buf: @@ -233,14 +231,10 @@ def set_gprs(self, apn=None, enable=True): if enable: apn_name, apn_user, apn_pass = apn - apn_name = apn_name.encode() - apn_user = apn_user.encode() - apn_pass = apn_pass.encode() - # enable multi connection mode (3,1) if not self._send_check_reply(b"AT+CIPMUX=1", reply=REPLY_OK): return False - self.read_line() + self._read_line() # enable receive data manually (7,2) if not self._send_check_reply(b"AT+CIPRXGET=1", reply=REPLY_OK): @@ -264,33 +258,33 @@ def set_gprs(self, apn=None, enable=True): # Send command AT+SAPBR=3,1,"APN","" # where is the configured APN value. self._send_check_reply_quoted( - b'AT+SAPBR=3,1,"APN",', apn_name, REPLY_OK, 10000 + b'AT+SAPBR=3,1,"APN",', apn_name.encode(), REPLY_OK, 10000 ) # send AT+CSTT,"apn","user","pass" self._uart.reset_input_buffer() - self.uart_write(b'AT+CSTT="' + apn_name) + self._uart_write(b'AT+CSTT="' + apn_name.encode()) if apn_user is not None: - self.uart_write(b'","' + apn_user) + self._uart_write(b'","' + apn_user.encode()) if apn_pass is not None: - self.uart_write(b'","' + apn_pass) - self.uart_write(b'"\r\n') + self._uart_write(b'","' + apn_pass.encode()) + self._uart_write(b'"\r\n') if not self._get_reply(REPLY_OK): return False # Set username if not self._send_check_reply_quoted( - b'AT+SAPBR=3,1,"USER",', apn_user, REPLY_OK, 10000 + b'AT+SAPBR=3,1,"USER",', apn_user.encode(), REPLY_OK, 10000 ): return False # Set password if not self._send_check_reply_quoted( - b'AT+SAPBR=3,1,"PWD",', apn_pass, REPLY_OK, 100000 + b'AT+SAPBR=3,1,"PWD",', apn_pass.encode(), REPLY_OK, 100000 ): return False @@ -318,6 +312,7 @@ def set_gprs(self, apn=None, enable=True): @property def network_status(self): """Returns cellular network status""" + self._read_line() if self._debug: print("Network status") if not self._send_parse_reply(b"AT+CREG?", b"+CREG: ", idx=1): @@ -347,8 +342,7 @@ def rssi(self): if 2 <= reply_num <= 30: rssi = map_range(reply_num, 2, 30, -110, -54) - # read out the 'ok' - self.read_line() + self._read_line() # eat the 'ok' return rssi @property @@ -367,11 +361,7 @@ def gps(self): status = int(self._buf[10:11].decode("utf-8")) if status == 1: status = 3 # assume 3D fix - self.read_line() - elif self._fona_type == FONA_3G_A or self._fona_type == FONA_3G_E: - raise NotImplementedError( - "FONA 3G not currently supported by this library." - ) + self._read_line() else: raise NotImplementedError( "FONA 808 v1 not currently supported by this library." @@ -396,24 +386,22 @@ def gps(self, gps_on=False): if self._fona_type == FONA_808_V2: if not self._send_parse_reply(b"AT+CGPSPWR?", b"+CGPSPWR: ", ":"): return False - self.read_line() + self._read_line() if not self._send_parse_reply(b"AT+CGNSPWR?", b"+CGNSPWR: ", ":"): return False state = self._buf if gps_on and not state: - self.read_line() - if self._fona_type == FONA_808_V2: - # try GNS + self._read_line() + if self._fona_type == FONA_808_V2: # try GNS if not self._send_check_reply(b"AT+CGNSPWR=1", reply=REPLY_OK): return False else: if not self._send_parse_reply(b"AT+CGPSPWR=1", reply_data=REPLY_OK): return False else: - if self._fona_type == FONA_808_V2: - # try GNS + if self._fona_type == FONA_808_V2: # try GNS if not self._send_check_reply(b"AT+CGNSPWR=0", reply=REPLY_OK): return False if not self._send_check_reply(b"AT+CGPSPWR=0", reply=REPLY_OK): @@ -421,29 +409,6 @@ def gps(self, gps_on=False): return True - def get_host_by_name(self, hostname): - """Converts a hostname to a packed 4-byte IP address. - Returns a 4 bytearray. - :param str hostname: Destination server. - - """ - if self._debug: - print("*** Get host by name") - if isinstance(hostname, str): - hostname = bytes(hostname, "utf-8") - - if not self._send_check_reply( - b'AT+CDNSGIP="' + hostname + b'"\r\n', reply=REPLY_OK - ): - return False - - # attempt to parse a response - self.read_line() - while not self._parse_reply(b"+CDNSGIP:", idx=2): - self.read_line() - - return self._buf - def pretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name """Converts a bytearray IP address to a dotted-quad string for printing""" return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3]) @@ -473,6 +438,31 @@ def enable_sms_notification(self, enable=True): return False return True + def receive_sms(self): + """Checks for a message notification from the FONA module, + replies back with the a tuple containing (sender, message). + NOTE: This method needs to be polled consistently due to the lack + of hw-based interrupts in CircuitPython. Adding time.sleep delays + may cause missed messages if using the RI pin instead of UART. + + """ + if self._ri is not None: # poll the RI pin + if self._ri.value: + return False, False + if not self._uart.in_waiting: # otherwise, poll the UART + return False, False + + self._read_line() # parse the rcv'd URC + if not self._parse_reply(b"+CMTI: ", idx=1): + return False, False + slot = self._buf + sender, message = self.read_sms(slot) + + if not self.delete_sms(slot): # delete sms from module memory + return False, False + + return sender, message.strip() + def send_sms(self, phone_number, message): """Sends a message SMS to a phone number. :param int phone_number: Destination phone number. @@ -486,38 +476,34 @@ def send_sms(self, phone_number, message): if not self._send_check_reply(b"AT+CMGF=1", reply=REPLY_OK): return False - self.uart_write(b'AT+CMGS="+' + str(phone_number).encode() + b'"' + b"\r") - self.read_line() + self._uart_write(b'AT+CMGS="+' + str(phone_number).encode() + b'"' + b"\r") + self._read_line() - # expect > - if self._buf[0] != 62: + if self._buf[0] != 62: # expect '>' # promoting mark ('>') not found return False - self.read_line() + self._read_line() # write out message and ^z - self.uart_write((message + chr(26)).encode()) + self._uart_write((message + chr(26)).encode()) if self._fona_type == FONA_3G_A or self._fona_type == FONA_3G_E: - # eat 2x CRLF - self.read_line(200) - self.read_line(200) - # read +CMGS, wait ~10sec. - self.read_line(10000) + self._read_line(200) # eat first 'CRLF' + self._read_line(200) # eat second 'CRLF' + # read +CMGS, wait ~10sec. + self._read_line(10000) if not "+CMGS" in self._buf: return False - # read OK if not self._expect_reply(REPLY_OK): return False return True - @property def num_sms(self, sim_storage=True): """Returns the number of SMS messages stored in memory, False if none stored. - :param bool sim_storage: SMS storage on the SIM, otherwise internal storage on FONA chip. + """ if not self._send_check_reply(b"AT+CMGF=1", reply=REPLY_OK): return False @@ -531,27 +517,15 @@ def num_sms(self, sim_storage=True): ): return self._buf - self.read_line() # eat OK + self._read_line() # eat OK if self._send_parse_reply(b"AT+CPMS?", b'"SM",', idx=1): return self._buf - self.read_line() # eat OK + self._read_line() # eat OK if self._send_parse_reply(b"AT+CPMS?", b'"SM_P",', idx=1): return self._buf return False - def delete_all_sms(self): - """Deletes all SMS messages on the FONA SIM.""" - self.read_line() - if not self._send_check_reply(b"AT+CMGF=1", reply=REPLY_OK): - return False - - if not self._send_check_reply( - b'AT+CMGDA="DEL ALL"', reply=REPLY_OK, timeout=25000 - ): - return False - return True - def delete_sms(self, sms_slot): """Deletes a SMS message in the provided SMS slot. :param int sms_slot: SMS SIM or FONA memory slot number. @@ -567,6 +541,24 @@ def delete_sms(self, sms_slot): return True + def delete_all_sms(self): + """Deletes all SMS messages on the FONA SIM.""" + self._read_line() + if not self._send_check_reply(b"AT+CMGF=1", reply=REPLY_OK): + return False + + if self._fona_type == FONA_3G_A or self._fona_type == FONA_3G_E: + num_sms = self.num_sms() + for slot in range(0, num_sms): + if not self.delete_sms(slot): + return False + else: # DEL ALL on 808 + if not self._send_check_reply( + b'AT+CMGDA="DEL ALL"', reply=REPLY_OK, timeout=25000 + ): + return False + return True + def read_sms(self, sms_slot): """Reads and parses SMS messages from FONA device. Returns the SMS sender's phone number and the message contents as a tuple. @@ -578,8 +570,8 @@ def read_sms(self, sms_slot): if not self._send_check_reply(b"AT+CSDH=1", reply=REPLY_OK): return False - self.uart_write(b"AT+CMGR=" + str(sms_slot).encode() + b"\r\n") - self.read_line(1000) + self._uart_write(b"AT+CMGR=" + str(sms_slot).encode() + b"\r\n") + self._read_line(1000) resp = self._buf # get sender @@ -597,12 +589,34 @@ def read_sms(self, sms_slot): self._uart.readinto(self._buf) message = bytes(self._buf).decode() self._uart.reset_input_buffer() - self.read_line() # eat 'OK' + self._read_line() # eat 'OK' return sender, message ### Socket API (TCP, UDP) ### + def get_host_by_name(self, hostname): + """Converts a hostname to a packed 4-byte IP address. + Returns a 4 bytearray. + :param str hostname: Destination server. + + """ + self._read_line() + if self._debug: + print("*** Get host by name") + if isinstance(hostname, str): + hostname = bytes(hostname, "utf-8") + + if not self._send_check_reply( + b'AT+CDNSGIP="' + hostname + b'"\r\n', reply=REPLY_OK + ): + return False + + self._read_line() + while not self._parse_reply(b"+CDNSGIP:", idx=2): + self._read_line() + return self._buf + def get_socket(self): """Returns an avaliable socket (INITIAL or CLOSED state). @@ -610,20 +624,20 @@ def get_socket(self): if self._debug: print("*** Get socket") - self.uart_write(b"AT+CIPSTATUS\r\n") - self.read_line(100) # OK - self.read_line(100) # table header + self._uart_write(b"AT+CIPSTATUS\r\n") + self._read_line(100) # OK + self._read_line(100) # table header allocated_socket = 0 for sock in range(0, FONA_MAX_SOCKETS): # check if INITIAL state - self.read_line(100) + self._read_line(100) self._parse_reply(b"C:", idx=5) if self._buf.strip('"') == "INITIAL" or self._buf.strip('"') == "CLOSED": allocated_socket = sock break # read out the rest of the responses for _ in range(allocated_socket, FONA_MAX_SOCKETS): - self.read_line(100) + self._read_line(100) if self._debug: print("Allocated socket #%d" % allocated_socket) return allocated_socket @@ -637,8 +651,8 @@ def remote_ip(self, sock_num): sock_num < FONA_MAX_SOCKETS ), "Provided socket exceeds the maximum number of \ sockets for the FONA module." - self.uart_write(b"AT+CIPSTATUS=" + str(sock_num).encode() + b"\r\n") - self.read_line(100) + self._uart_write(b"AT+CIPSTATUS=" + str(sock_num).encode() + b"\r\n") + self._read_line(100) self._parse_reply(b"+CIPSTATUS:", idx=3) return self._buf @@ -654,10 +668,10 @@ def socket_status(self, sock_num): sockets for the FONA module." if not self._send_check_reply(b"AT+CIPSTATUS", reply=REPLY_OK, timeout=100): return False - self.read_line() + self._read_line() for state in range(0, sock_num + 1): # read "C: " for each active connection - self.read_line() + self._read_line() if state == sock_num: break self._parse_reply(b"C:", idx=5) @@ -666,7 +680,7 @@ def socket_status(self, sock_num): # eat the rest of the sockets for _ in range(sock_num, FONA_MAX_SOCKETS): - self.read_line() + self._read_line() if not "CONNECTED" in state: return False @@ -691,8 +705,8 @@ def socket_available(self, sock_num): if self._debug: print("\t {} bytes available.".format(self._buf)) - self.read_line() - self.read_line() + self._read_line() + self._read_line() return data @@ -718,37 +732,28 @@ def socket_connect(self, sock_num, dest, port, conn_mode=TCP_MODE): ), "Provided socket exceeds the maximum number of \ sockets for the FONA module." - if self._debug: - print( - "* FONA socket connect, socket={}, protocol={}, port={}, ip={}".format( - sock_num, conn_mode, port, dest - ) - ) - # Query local IP Address - self.uart_write(b"AT+CIFSR\r\n") - self.read_line() + self._uart_write(b"AT+CIFSR\r\n") + self._read_line() # Start connection - self.uart_write(b"AT+CIPSTART=" + str(sock_num).encode()) + self._uart_write(b"AT+CIPSTART=" + str(sock_num).encode()) if conn_mode == 0: - self.uart_write(b',"TCP","') + self._uart_write(b',"TCP","') else: - self.uart_write(b',"UDP","') - self.uart_write(dest.encode() + b'","' + str(port).encode() + b'"\r\n') + self._uart_write(b',"UDP","') + self._uart_write(dest.encode() + b'","' + str(port).encode() + b'"\r\n') if not self._expect_reply(REPLY_OK): return False if not self._expect_reply(b"CONNECT OK"): return False - return True - def socket_close(self, sock_num, quick_close=1): + def socket_close(self, sock_num): """Close TCP or UDP connection :param int sock_num: Desired socket number. - :param int quick_close: Quickly or slowly close the socket. Enabled by default """ if self._debug: @@ -758,11 +763,15 @@ def socket_close(self, sock_num, quick_close=1): ), "Provided socket exceeds the maximum number of \ sockets for the FONA module." - self.uart_write(b"AT+CIPCLOSE=" + str(sock_num).encode() + b",") - self.uart_write(str(quick_close).encode() + b"\r\n") - self.read_line() - if not self._parse_reply(b"CLOSE OK", idx=0): - return False + self._uart_write(b"AT+CIPCLOSE=" + str(sock_num).encode() + b"\r\n") + self._read_line(3000) + + if self._fona_type == FONA_3G_A or self._fona_type == FONA_3G_E: + if not self._expect_reply(REPLY_OK): + return False + else: + if not self._expect_reply(b"CLOSE OK"): + return False return True def socket_read(self, sock_num, length): @@ -772,6 +781,7 @@ def socket_read(self, sock_num, length): :param int length: Desired length to read. """ + self._read_line() assert ( sock_num < FONA_MAX_SOCKETS ), "Provided socket exceeds the maximum number of \ @@ -779,43 +789,39 @@ def socket_read(self, sock_num, length): if self._debug: print("* socket read") - self.uart_write(b"AT+CIPRXGET=2,") - self.uart_write(str(sock_num).encode()) - self.uart_write(b",") - self.uart_write(str(length).encode() + b"\r\n") - - self.read_line() + self._uart_write(b"AT+CIPRXGET=2," + str(sock_num).encode() + b",") + self._uart_write(str(length).encode() + b"\r\n") + self._read_line() if not self._parse_reply(b"+CIPRXGET:"): return False - self._buf = self._uart.read(length) - - return self._buf + return self._uart.read(length) - def socket_write(self, sock_num, buffer): + def socket_write(self, sock_num, buffer, timeout=3000): """Writes bytes to the socket. :param int sock_num: Desired socket number to write to. :param bytes buffer: Bytes to write to socket. + :param int timeout: Socket write timeout, in milliseconds. """ - self.read_line() + self._read_line() assert ( sock_num < FONA_MAX_SOCKETS ), "Provided socket exceeds the maximum number of \ sockets for the FONA module." self._uart.reset_input_buffer() - self.uart_write(b"AT+CIPSEND=" + str(sock_num).encode()) - self.uart_write(b"," + str(len(buffer)).encode() + b"\r\n") - self.read_line() + self._uart_write(b"AT+CIPSEND=" + str(sock_num).encode()) + self._uart_write(b"," + str(len(buffer)).encode() + b"\r\n") + self._read_line() if self._buf[0] != 62: # promoting mark ('>') not found return False - self.uart_write(buffer + b"\r\n") - self.read_line(3000) + self._uart_write(buffer + b"\r\n") + self._read_line(timeout) if "SEND OK" not in self._buf.decode(): return False @@ -824,12 +830,7 @@ def socket_write(self, sock_num, buffer): ### UART Reply/Response Helpers ### - @property - def in_waiting(self): - """Returns number of bytes available to be read in the input buffer.""" - return self._uart.in_waiting - - def uart_write(self, buffer): + def _uart_write(self, buffer): """UART ``write`` with optional debug that prints the buffer before sending. :param bytes buffer: Buffer of bytes to send to the bus. @@ -846,7 +847,7 @@ def _send_parse_reply(self, send_data, reply_data, divider=",", idx=0): :param str divider: Separator """ - self.read_line() + self._read_line() self._get_reply(send_data) if not self._parse_reply(reply_data, divider, idx): @@ -864,11 +865,11 @@ def _get_reply( self._uart.reset_input_buffer() if data is not None: - self.uart_write(data + b"\r\n") + self._uart_write(data + b"\r\n") else: - self.uart_write(prefix + suffix + b"\r\n") + self._uart_write(prefix + suffix + b"\r\n") - return self.read_line(timeout) + return self._read_line(timeout) def _parse_reply(self, reply, divider=",", idx=0): """Attempts to find reply in UART buffer, reads up to divider. @@ -894,7 +895,7 @@ def _parse_reply(self, reply, divider=",", idx=0): return True - def read_line(self, timeout=FONA_DEFAULT_TIMEOUT_MS, multiline=False): + def _read_line(self, timeout=FONA_DEFAULT_TIMEOUT_MS, multiline=False): """Reads one or multiple lines into the buffer. Optionally prints the buffer after reading. :param int timeout: Time to wait for UART serial to reply, in seconds. @@ -943,7 +944,7 @@ def _send_check_reply( :param bytes reply: Expected response from module. """ - self.read_line() + self._read_line() if send is None: if not self._get_reply(prefix=prefix, suffix=suffix, timeout=timeout): return False @@ -984,16 +985,16 @@ def _get_reply_quoted(self, prefix, suffix, timeout): """ self._uart.reset_input_buffer() - self.uart_write(prefix + b'"' + suffix + b'"\r\n') + self._uart_write(prefix + b'"' + suffix + b'"\r\n') - return self.read_line(timeout) + return self._read_line(timeout) def _expect_reply(self, reply, timeout=10000): """Reads line from FONA module and compares to reply from FONA module. :param bytes reply: Expected reply from module. """ - self.read_line(timeout) + self._read_line(timeout) if reply not in self._buf: return False return True diff --git a/adafruit_fona/adafruit_fona_gsm.py b/adafruit_fona/adafruit_fona_network.py similarity index 64% rename from adafruit_fona/adafruit_fona_gsm.py rename to adafruit_fona/adafruit_fona_network.py index 2c84cde..7b429ed 100755 --- a/adafruit_fona/adafruit_fona_gsm.py +++ b/adafruit_fona/adafruit_fona_network.py @@ -20,22 +20,25 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """ -`adafruit_fona_gsm` +`adafruit_fona_network` ================================================================================= -Interface for 2G GSM cellular modems such as the Adafruit FONA808. +Interface for connecting to and interacting with GSM and CDMA cellular networks. * Author(s): Brent Rubell """ +# Network types +NET_GSM = 0x01 +NET_CDMA = 0x02 -class GSM: - """Interface for interacting with FONA 2G GSM modems. - """ + +class CELLULAR: + """Interface for connecting to and interacting with GSM and CDMA cellular networks.""" def __init__(self, fona, apn): - """Initializes interface with 2G GSM modem. + """Initializes interface with cellular network. :param adafruit_fona fona: The Adafruit FONA module we are using. :param tuple apn: Tuple containing APN name, (optional) APN username, and APN password. @@ -43,10 +46,12 @@ def __init__(self, fona, apn): """ self._iface = fona self._apn = apn - self._gsm_connected = False + self._network_connected = False + self._network_type = NET_CDMA - # Enable GPS module - self._iface.gps = True + if not self._iface.version == 0x4 or self._iface.version == 0x5: + self._network_type = NET_GSM + self._iface.gps = True def __enter__(self): return self @@ -56,7 +61,7 @@ def __exit__(self, exception_type, exception_value, traceback): @property def imei(self): - """Returns the GSM modem's IEMI number, as a string.""" + """Returns the modem's IEMI number, as a string.""" return self._iface.iemi @property @@ -66,31 +71,31 @@ def iccid(self): @property def is_attached(self): - """Returns if the modem is attached to the network - and the GPS has a fix.""" - if self._iface.gps == 3 and self._iface.network_status == 1: - return True + """Returns if the modem is attached to the network.""" + if self._network_type == NET_GSM: + if self._iface.gps == 3 and self._iface.network_status == 1: + return True + else: # Attach CDMA network + if self._iface.ue_system_info == 1 and self._iface.network_status == 1: + return True return False @property def is_connected(self): - """Returns if attached to GSM - and an IP Addresss was obtained. - - """ - if not self._gsm_connected: + """Returns if attached to network and an IP Addresss was obtained.""" + if not self._network_connected: return False return True def connect(self): - """Connect to GSM network.""" + """Connect to cellular network.""" if self._iface.set_gprs(self._apn, True): - self._gsm_connected = True + self._network_connected = True else: # reset context for next connection attempt self._iface.set_gprs(self._apn, False) def disconnect(self): - """Disconnect from GSM network.""" + """Disconnect from cellular network.""" self._iface.set_gprs(self._apn, False) - self._gsm_connected = False + self._network_connected = False diff --git a/adafruit_fona/adafruit_fona_socket.py b/adafruit_fona/adafruit_fona_socket.py index 5fea18c..68b806c 100644 --- a/adafruit_fona/adafruit_fona_socket.py +++ b/adafruit_fona/adafruit_fona_socket.py @@ -103,7 +103,10 @@ def __init__( raise RuntimeError("Only AF_INET family supported by cellular sockets.") self._sock_type = type self._buffer = b"" - self._timeout = 0 + if hasattr(_the_interface, "tx_timeout"): + self._timeout = _the_interface.tx_timeout + else: + self._timeout = 3000 # FONA800 self._socknum = _the_interface.get_socket() SOCKETS.append(self._socknum) @@ -157,7 +160,7 @@ def send(self, data): :param bytes data: Desired data to send to the socket. """ - _the_interface.socket_write(self._socknum, data) + _the_interface.socket_write(self._socknum, data, self._timeout) gc.collect() def recv(self, bufsize=0): diff --git a/adafruit_fona/fona_3g.py b/adafruit_fona/fona_3g.py new file mode 100755 index 0000000..b1ee0a8 --- /dev/null +++ b/adafruit_fona/fona_3g.py @@ -0,0 +1,329 @@ +# The MIT License (MIT) +# +# Copyright Limor Fried/Ladyada for Adafruit Industries +# Copyright (c) 2020 Brent Rubell for Adafruit Industries +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +""" +:py:class:`~adafruit_fona.fona_3g.FONA3G` +`adafruit_fona_3g` +================================================================================ + +FONA3G cellular module instance. + +* Author(s): ladyada, Brent Rubell + +Implementation Notes +-------------------- + +**Software and Dependencies:** + +* Adafruit CircuitPython firmware for the supported boards: + https://github.com/adafruit/circuitpython/releases + +""" +from micropython import const +from .adafruit_fona import FONA, REPLY_OK + +__version__ = "0.0.0-auto.0" +__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_FONA.git" + +FONA_MAX_SOCKETS = const(10) + + +class FONA3G(FONA): + """FONA 3G module interface. + :param ~busio.uart UART: FONA UART connection. + :param ~digialio RST: FONA RST pin. + :param ~digialio RI: Optional FONA Ring Interrupt (RI) pin. + :param bool debug: Enable debugging output. + + """ + + def __init__(self, uart, rst, ri=None, debug=False): + uart.baudrate = 4800 + super(FONA3G, self).__init__(uart, rst, ri, debug) + + def set_baudrate(self, baudrate): + """Sets the FONA's UART baudrate.""" + if not self._send_check_reply( + b"AT+IPREX=" + str(baudrate).encode(), reply=REPLY_OK + ): + return False + return True + + @property + def gps(self): + """Returns True if the GPS session is active, False if it's stopped..""" + if not self._send_check_reply(b"AT+CGPS?", reply=b"+CGPS: 1,1"): + return False + return True + + @gps.setter + def gps(self, gps_on=False): + """Sets GPS module power, parses returned buffer. + :param bool gps_on: Enables the GPS module, disabled by default. + + """ + # check if GPS is already enabled + if not self._send_parse_reply(b"AT+CGPS?", b"+CGPS: "): + return False + + state = self._buf + + if gps_on and not state: + self._read_line() + if not self._send_check_reply(b"AT+CGPS=1", reply=REPLY_OK): + return False + else: + if not self._send_check_reply(b"AT+CGPS=0", reply=REPLY_OK): + return False + self._read_line(2000) # eat '+CGPS: 0' + return True + + @property + def ue_system_info(self): + """Returns True if UE system is online, otherwise False.""" + self._send_parse_reply(b"AT+CPSI?\r\n", b"+CPSI: ") + if not self._buf == "GSM" or self._buf == "WCDMA": # 5.15 + return False + return True + + @property + def local_ip(self): + """Returns the IP address of the current active socket.""" + if not self._send_parse_reply(b"AT+IPADDR", b"+IPADDR:"): + return False + return self._buf + + # pylint: disable=too-many-return-statements + def set_gprs(self, apn=None, enable=True): + """Configures and brings up GPRS. + :param bool enable: Enables or disables GPRS. + + """ + if enable: + if not self._send_check_reply(b"AT+CGATT=1", reply=REPLY_OK, timeout=10000): + return False + + if apn is not None: # Configure APN + apn_name, apn_user, apn_pass = apn + if not self._send_check_reply_quoted( + b'AT+CGSOCKCONT=1,"IP",', apn_name.encode(), REPLY_OK, 10000 + ): + return False + + if apn_user is not None: + self._uart_write(b"AT+CGAUTH=1,1,") + self._uart_write(b'"' + apn_pass.encode() + b'"') + self._uart_write(b',"' + apn_user.encode() + b'"\r\n') + + if not self._get_reply(REPLY_OK, timeout=10000): + return False + + # Enable PDP Context + if not self._send_check_reply( + b"AT+CIPMODE=1", reply=REPLY_OK, timeout=10000 + ): # Transparent mode + return False + + # Open network + if not self._send_check_reply( + b"AT+NETOPEN=,,1", reply=b"Network opened", timeout=120000 + ): + return False + self._read_line() + + if not self.local_ip: + return True + else: + # reset PDP state + if not self._send_check_reply( + b"AT+NETCLOSE", reply=b"Network closed", timeout=20000 + ): + return False + return True + + ### Socket API (TCP, UDP) ### + + @property + def tx_timeout(self): + """Returns CIPSEND timeout, in milliseconds.""" + self._read_line() + if not self._send_parse_reply(b"AT+CIPTIMEOUT?", b"+CIPTIMEOUT:", idx=2): + return False + return True + + @tx_timeout.setter + def tx_timeout(self, timeout): + """Sets CIPSEND timeout.""" + self._read_line() + if not self._send_check_reply( + b"AT+CIPTIMEOUT=" + str(timeout).encode(), reply=REPLY_OK + ): + return False + return True + + def get_host_by_name(self, hostname): + """Converts a hostname to a 4-byte IP address. + :param str hostname: Domain name. + """ + self._read_line() + if self._debug: + print("*** Get host by name") + if isinstance(hostname, str): + hostname = bytes(hostname, "utf-8") + + self._uart_write(b'AT+CDNSGIP="' + hostname + b'"\r\n') + self._read_line(10000) # Read the +CDNSGIP, takes a while + + if not self._parse_reply(b"+CDNSGIP: ", idx=2): + return False + return self._buf + + def get_socket(self): + """Returns an unused socket.""" + if self._debug: + print("*** Get socket") + + self._read_line() + self._uart_write(b"AT+CIPOPEN?\r\n") # Query which sockets are busy + + socket = 0 + for socket in range(0, FONA_MAX_SOCKETS): + self._read_line(120000) + try: # SIMCOM5320 lacks a socket connection status, this is a workaround + self._parse_reply(b"+CIPOPEN: ", idx=1) + except IndexError: + break + + for _ in range(socket, FONA_MAX_SOCKETS): + self._read_line() # eat the rest of '+CIPOPEN' responses + + if self._debug: + print("Allocated socket #%d" % socket) + return socket + + def socket_connect(self, sock_num, dest, port, conn_mode=0): + """Connects to a destination IP address or hostname. + By default, we use conn_mode TCP_MODE but we may also use UDP_MODE. + :param int sock_num: Desired socket number + :param str dest: Destination dest address. + :param int port: Destination dest port. + :param int conn_mode: Connection mode (TCP/UDP) + + """ + if self._debug: + print( + "*** Socket connect, protocol={}, port={}, ip={}".format( + conn_mode, port, dest + ) + ) + + self._uart.reset_input_buffer() + assert ( + sock_num < FONA_MAX_SOCKETS + ), "Provided socket exceeds the maximum number of \ + sockets for the FONA module." + self._send_check_reply(b"AT+CIPHEAD=0", reply=REPLY_OK) # do not show ip header + self._send_check_reply( + b"AT+CIPSRIP=0", reply=REPLY_OK + ) # do not show remote ip/port + self._send_check_reply(b"AT+CIPRXGET=1", reply=REPLY_OK) # manually get data + + self._uart_write(b"AT+CIPOPEN=" + str(sock_num).encode()) + if conn_mode == 0: + self._uart_write(b',"TCP","') + else: + self._uart_write(b',"UDP","') + self._uart_write(dest.encode() + b'",' + str(port).encode() + b"\r\n") + + if not self._expect_reply(b"Connect ok"): + return False + return True + + def remote_ip(self, sock_num): + """Returns the IP address of sender.""" + self._read_line() + assert ( + sock_num < FONA_MAX_SOCKETS + ), "Provided socket exceeds the maximum number of \ + sockets for the FONA module." + + self._uart_write(b"AT+CIPOPEN?\r\n") + for _ in range(0, sock_num + 1): + self._read_line() + self._parse_reply(b"+CIPOPEN:", idx=2) + ip_addr = self._buf + + for _ in range(sock_num, FONA_MAX_SOCKETS): + self._read_line() # eat the rest of '+CIPOPEN' responses + return ip_addr + + def socket_write(self, sock_num, buffer, timeout=120000): + """Writes bytes to the socket. + :param int sock_num: Desired socket number to write to. + :param bytes buffer: Bytes to write to socket. + :param int timeout: Socket write timeout, in milliseconds. Defaults to 120000ms. + + """ + self._read_line() + assert ( + sock_num < FONA_MAX_SOCKETS + ), "Provided socket exceeds the maximum number of \ + sockets for the FONA module." + + self._uart.reset_input_buffer() + + self._uart_write( + b"AT+CIPSEND=" + + str(sock_num).encode() + + b"," + + str(len(buffer)).encode() + + b"\r\n" + ) + self._read_line() + if self._buf[0] != 62: + # promoting mark ('>') not found + return False + + self._uart_write(buffer + b"\r\n") + self._read_line() # eat 'OK' + + self._read_line(3000) # expect +CIPSEND: rx,tx + if not self._parse_reply(b"+CIPSEND:", idx=1): + return False + if not self._buf == len(buffer): # assert data sent == buffer size + return False + + self._read_line(timeout) + if "Send ok" not in self._buf.decode(): + return False + return True + + def socket_status(self, sock_num): + """Returns True if socket is connected, False otherwise. + :param int sock_num: Desired socket number. + + """ + if not self._send_parse_reply(b"AT+CIPCLOSE?", b"+CIPCLOSE:", idx=sock_num): + return False + if not self._buf == 1: + return False + return True diff --git a/examples/fona_aio_post.py b/examples/fona_aio_post.py old mode 100755 new mode 100644 index 1aa23cc..ee984d7 --- a/examples/fona_aio_post.py +++ b/examples/fona_aio_post.py @@ -1,9 +1,11 @@ +# pylint: disable=unused-import import time import board import busio import digitalio from adafruit_fona.adafruit_fona import FONA -from adafruit_fona.adafruit_fona_gsm import GSM +from adafruit_fona.fona_3g import FONA3G +import adafruit_fona.adafruit_fona_network as network import adafruit_fona.adafruit_fona_socket as cellular_socket import adafruit_requests as requests @@ -14,26 +16,31 @@ print("GPRS secrets are kept in secrets.py, please add them there!") raise -# Create a serial connection for the FONA connection using 4800 baud. -# These are the defaults you should use for the FONA Shield. -# For other boards set RX = GPS module TX, and TX = GPS module RX pins. -uart = busio.UART(board.TX, board.RX, baudrate=4800) +# Create a serial connection for the FONA +uart = busio.UART(board.TX, board.RX) rst = digitalio.DigitalInOut(board.D4) -# Initialize FONA module (this may take a few seconds) +# Use this for FONA800 and FONA808 fona = FONA(uart, rst) -# initialize gsm -gsm = GSM(fona, (secrets["apn"], secrets["apn_username"], secrets["apn_password"])) +# Use this for FONA3G +# fona = FONA3G(uart, rst) -while not gsm.is_attached: +# Initialize cellular data network +network = network.CELLULAR( + fona, (secrets["apn"], secrets["apn_username"], secrets["apn_password"]) +) + +while not network.is_attached: print("Attaching to network...") time.sleep(0.5) +print("Attached!") -while not gsm.is_connected: +while not network.is_connected: print("Connecting to network...") - gsm.connect() - time.sleep(5) + network.connect() + time.sleep(0.5) +print("Network Connected!") # Initialize a requests object with a socket and cellular interface requests.set_socket(cellular_socket, fona) diff --git a/examples/fona_cheerlights.py b/examples/fona_cheerlights.py index 8ed164d..cbe0075 100755 --- a/examples/fona_cheerlights.py +++ b/examples/fona_cheerlights.py @@ -1,9 +1,11 @@ +# pylint: disable=unused-import import time import board import busio import digitalio from adafruit_fona.adafruit_fona import FONA -from adafruit_fona.adafruit_fona_gsm import GSM +from adafruit_fona.fona_3g import FONA3G +import adafruit_fona.adafruit_fona_network as network import adafruit_fona.adafruit_fona_socket as cellular_socket import adafruit_requests as requests @@ -18,26 +20,31 @@ print("GPRS secrets are kept in secrets.py, please add them there!") raise -# Create a serial connection for the FONA connection using 4800 baud. -# These are the defaults you should use for the FONA Shield. -# For other boards set RX = GPS module TX, and TX = GPS module RX pins. -uart = busio.UART(board.TX, board.RX, baudrate=4800) +# Create a serial connection for the FONA +uart = busio.UART(board.TX, board.RX) rst = digitalio.DigitalInOut(board.D4) -# Initialize FONA module (this may take a few seconds) +# Use this for FONA800 and FONA808 fona = FONA(uart, rst) -# initialize gsm -gsm = GSM(fona, (secrets["apn"], secrets["apn_username"], secrets["apn_password"])) +# Use this for FONA3G +# fona = FONA3G(uart, rst) -while not gsm.is_attached: +# Initialize cellular data network +network = network.CELLULAR( + fona, (secrets["apn"], secrets["apn_username"], secrets["apn_password"]) +) + +while not network.is_attached: print("Attaching to network...") time.sleep(0.5) +print("Attached!") -while not gsm.is_connected: +while not network.is_connected: print("Connecting to network...") - gsm.connect() - time.sleep(5) + network.connect() + time.sleep(0.5) +print("Network Connected!") # Initialize a requests object with a socket and cellular interface requests.set_socket(cellular_socket, fona) diff --git a/examples/fona_simpletest.py b/examples/fona_simpletest.py index 8eb5e1d..aa84879 100644 --- a/examples/fona_simpletest.py +++ b/examples/fona_simpletest.py @@ -1,13 +1,15 @@ +# pylint: disable=unused-import import time import board import busio import digitalio from adafruit_fona.adafruit_fona import FONA -from adafruit_fona.adafruit_fona_gsm import GSM +from adafruit_fona.fona_3g import FONA3G +import adafruit_fona.adafruit_fona_network as network import adafruit_fona.adafruit_fona_socket as cellular_socket import adafruit_requests as requests -print("FONA WebClient Test") +print("FONA Webclient Test") TEXT_URL = "http://wifitest.adafruit.com/testwifi/index.html" JSON_URL = "http://api.coindesk.com/v1/bpi/currentprice/USD.json" @@ -19,26 +21,31 @@ print("GPRS secrets are kept in secrets.py, please add them there!") raise -# Create a serial connection for the FONA connection using 4800 baud. -# These are the defaults you should use for the FONA Shield. -# For other boards set RX = GPS module TX, and TX = GPS module RX pins. -uart = busio.UART(board.TX, board.RX, baudrate=4800) +# Create a serial connection for the FONA connection +uart = busio.UART(board.TX, board.RX) rst = digitalio.DigitalInOut(board.D4) -# Initialize FONA module (this may take a few seconds) +# Use this for FONA800 and FONA808 fona = FONA(uart, rst) -# initialize gsm -gsm = GSM(fona, (secrets["apn"], secrets["apn_username"], secrets["apn_password"])) +# Use this for FONA3G +# fona = FONA3G(uart, rst) -while not gsm.is_attached: +# Initialize cellular data network +network = network.CELLULAR( + fona, (secrets["apn"], secrets["apn_username"], secrets["apn_password"]) +) + +while not network.is_attached: print("Attaching to network...") time.sleep(0.5) +print("Attached!") -while not gsm.is_connected: +while not network.is_connected: print("Connecting to network...") - gsm.connect() - time.sleep(5) + network.connect() + time.sleep(0.5) +print("Network Connected!") print("My IP address is:", fona.local_ip) print("IP lookup adafruit.com: %s" % fona.get_host_by_name("adafruit.com")) diff --git a/examples/fona_sms.py b/examples/fona_sms.py index 3dba7c3..c1e56d1 100755 --- a/examples/fona_sms.py +++ b/examples/fona_sms.py @@ -1,8 +1,10 @@ +# pylint: disable=unused-import import time import board import busio import digitalio -from adafruit_fona.adafruit_fona import FONA +from adafruit_fona.adafruit_fona import FONA, FONA_3G_A, FONA_3G_E +from adafruit_fona.fona_3g import FONA3G print("FONA SMS") @@ -10,10 +12,13 @@ uart = busio.UART(board.TX, board.RX) rst = digitalio.DigitalInOut(board.D4) -# Initialize FONA module (this may take a few seconds) +# Use this for FONA800 and FONA808 fona = FONA(uart, rst) -# Initialize Network +# Use this for FONA3G +# fona = FONA3G(uart, rst) + +# Initialize network while fona.network_status != 1: print("Connecting to network...") time.sleep(1) @@ -26,10 +31,16 @@ raise RuntimeError("FONA did not successfully send SMS") print("SMS Sent!") -# Ask the FONA how many SMS message it has -num_sms = fona.num_sms +# Ask the FONA how many SMS message it has stored +num_sms = fona.num_sms() print("%d SMS's on SIM Card" % num_sms) -# Read out all the SMS messages on the FONA's SIM -for slot in range(1, num_sms): +# FONA3G SMS memory slots start at 0 +if fona.version == FONA_3G_A or fona.version == FONA_3G_E: + sms_idx = 0 +else: # FONA800 and FONA808 SMS slots start at 1 + sms_idx = 1 + +# Read num_sms messages from the FONA +for slot in range(sms_idx, num_sms): print(fona.read_sms(slot)) diff --git a/examples/fona_sms_response.py b/examples/fona_sms_response.py index 6c3078a..c94c4c2 100755 --- a/examples/fona_sms_response.py +++ b/examples/fona_sms_response.py @@ -1,17 +1,24 @@ +# pylint: disable=unused-import import time import board import busio import digitalio from adafruit_fona.adafruit_fona import FONA +from adafruit_fona.fona_3g import FONA3G print("FONA SMS Response") # Create a serial connection for the FONA connection uart = busio.UART(board.TX, board.RX) rst = digitalio.DigitalInOut(board.D4) +# Ring Indicator (RI) interrupt pin +ri = digitalio.DigitalInOut(board.D5) -# Initialize FONA module (this may take a few seconds) -fona = FONA(uart, rst, debug=True) +# Use this for FONA800 and FONA808 +fona = FONA(uart, rst, ri) + +# Use this for FONA3G +# fona = FONA3G(uart, rst, ri) # Initialize Network while fona.network_status != 1: @@ -23,31 +30,15 @@ # Enable FONA SMS notification fona.enable_sms_notification = True -# store incoming notification info -notification_buf = bytearray(64) - -print("FONA Ready!") +print("FONA Ready!\nWaiting for SMS...") while True: - if fona.in_waiting: # data is available from FONA - notification_buf = fona.read_line()[1] - # Split out the sms notification slot num. - notification_buf = notification_buf.decode() - sms_slot = notification_buf.split(",")[1] + sender, message = fona.receive_sms() - print("NEW SMS!\n\t Slot: ", sms_slot) - - # Get sms message and address - sender, message = fona.read_sms(sms_slot) - print("FROM: ", sender) - print("MSG: ", message) + if message: + print("Incoming SMS from {}: {}".format(sender, message)) # Reply back! print("Sending response...") if not fona.send_sms(int(sender), "Hey, I got your text!"): print("SMS Send Failed") print("SMS Sent!") - - # Delete the original message - if not fona.delete_sms(sms_slot): - print("Could not delete SMS in slot", sms_slot) - print("OK!")