Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add messaging send_all and send_multicast functions #283

Merged
merged 8 commits into from May 13, 2019
Merged
Changes from 6 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -54,6 +54,33 @@ def __init__(self, data=None, notification=None, android=None, webpush=None, apn
self.condition = condition


class MulticastMessage(object):
"""A message that can be sent to multiple tokens via Firebase Cloud Messaging.
Contains payload information as well as recipient information. In particular, the message must
contain exactly one of token, topic or condition fields.
Args:
tokens: A list of registration token of the device to which the message should be sent.
data: A dictionary of data fields (optional). All keys and values in the dictionary must be
strings.
notification: An instance of ``messaging.Notification`` (optional).
android: An instance of ``messaging.AndroidConfig`` (optional).
webpush: An instance of ``messaging.WebpushConfig`` (optional).
apns: An instance of ``messaging.ApnsConfig`` (optional).
"""
def __init__(self, tokens, data=None, notification=None, android=None, webpush=None, apns=None):
_Validators.check_string_list('MulticastMessage.tokens', tokens)
if len(tokens) > 100:
raise ValueError('MulticastMessage.tokens must not contain more than 100 tokens.')
self.tokens = tokens
self.data = data
self.notification = notification
self.android = android
self.webpush = webpush
self.apns = apns


class Notification(object):
"""A notification that can be included in a message.
@@ -150,7 +177,7 @@ class WebpushConfig(object):
data: A dictionary of data fields (optional). All keys and values in the dictionary must be
strings. When specified, overrides any data fields set via ``Message.data``.
notification: A ``messaging.WebpushNotification`` to be included in the message (optional).
fcm_options: A ``messaging.WebpushFcmOptions`` instance to be included in the messsage
fcm_options: A ``messaging.WebpushFcmOptions`` instance to be included in the message
This conversation was marked as resolved by ZachOrr

This comment has been minimized.

Copy link
@hiranya911

hiranya911 May 3, 2019

Member

Good catch 👍

(optional).
.. _Webpush Specification: https://tools.ietf.org/html/rfc8030#section-5
@@ -14,9 +14,14 @@

"""Firebase Cloud Messaging module."""

import json
import requests
import six

import googleapiclient
from googleapiclient import http
from googleapiclient import _auth

import firebase_admin
from firebase_admin import _http_client
from firebase_admin import _messaging_utils
@@ -34,17 +39,22 @@
'ApiCallError',
'Aps',
'ApsAlert',
'BatchResponse',
'CriticalSound',
'ErrorInfo',
'Message',
'MulticastMessage',
'Notification',
'SendResponse',
'TopicManagementResponse',
'WebpushConfig',
'WebpushFcmOptions',
'WebpushNotification',
'WebpushNotificationAction',

'send',
'send_all',
'send_multicast',
'subscribe_to_topic',
'unsubscribe_from_topic',
]
@@ -58,6 +68,7 @@
ApsAlert = _messaging_utils.ApsAlert
CriticalSound = _messaging_utils.CriticalSound
Message = _messaging_utils.Message
MulticastMessage = _messaging_utils.MulticastMessage
Notification = _messaging_utils.Notification
WebpushConfig = _messaging_utils.WebpushConfig
WebpushFcmOptions = _messaging_utils.WebpushFcmOptions
@@ -88,6 +99,56 @@ def send(message, dry_run=False, app=None):
"""
return _get_messaging_service(app).send(message, dry_run)

def send_all(messages, dry_run=False, app=None):
"""Sends the given list of messages via Firebase Cloud Messaging as a single batch.
If the ``dry_run`` mode is enabled, the message will not be actually delivered to the
recipients. Instead FCM performs all the usual validations, and emulates the send operation.
Args:
messages: A list of ``messaging.Message`` instances.
dry_run: A boolean indicating whether to run the operation in dry run mode (optional).
app: An App instance (optional).
Returns:
BatchResponse: A ``messaging.BatchResponse`` instance.
Raises:
ApiCallError: If an error occurs while sending the message to FCM service.
ValueError: If the input arguments are invalid.
"""
return _get_messaging_service(app).send_all(messages, dry_run)

def send_multicast(multicast_message, dry_run=False, app=None):
"""Sends the given mutlicast message to all tokens via Firebase Cloud Messaging (FCM).
If the ``dry_run`` mode is enabled, the message will not be actually delivered to the
recipients. Instead FCM performs all the usual validations, and emulates the send operation.
Args:
multicast_message: An instance of ``messaging.MulticastMessage``.
dry_run: A boolean indicating whether to run the operation in dry run mode (optional).
app: An App instance (optional).
Returns:
BatchResponse: A ``messaging.BatchResponse`` instance.
Raises:
ApiCallError: If an error occurs while sending the message to FCM service.
ValueError: If the input arguments are invalid.
"""
if not isinstance(multicast_message, MulticastMessage):
raise ValueError('Message must be an instance of messaging.MulticastMessage class.')
messages = [Message(
data=multicast_message.data,
notification=multicast_message.notification,
android=multicast_message.android,
webpush=multicast_message.webpush,
apns=multicast_message.apns,
token=token
) for token in multicast_message.tokens]
return _get_messaging_service(app).send_all(messages, dry_run)

def subscribe_to_topic(tokens, topic, app=None):
"""Subscribes a list of registration tokens to an FCM topic.
@@ -192,10 +253,57 @@ def __init__(self, code, message, detail=None):
self.detail = detail


class BatchResponse(object):
"""The response received from a batch request to the FCM API."""

def __init__(self, responses):
self._responses = responses
self._success_count = len([resp for resp in responses if resp.success])

@property
def responses(self):
"""A list of ``messaging.SendResponse`` objects (possibly empty)."""
return self._responses

@property
def success_count(self):
return self._success_count

@property
def failure_count(self):
return len(self.responses) - self.success_count


class SendResponse(object):
"""The response received from an individual batched request to the FCM API."""

def __init__(self, resp, exception):
self._exception = exception
self._message_id = None
if resp:
self._message_id = resp.get('name', None)

@property
def message_id(self):
"""A message ID string that uniquely identifies the sent the message."""
return self._message_id

@property
def success(self):
"""A boolean indicating if the request was successful."""
return self._message_id is not None and not self._exception

@property
def exception(self):
"""A ApiCallError if an error occurs while sending the message to FCM service."""
return self._exception


class _MessagingService(object):
"""Service class that implements Firebase Cloud Messaging (FCM) functionality."""

FCM_URL = 'https://fcm.googleapis.com/v1/projects/{0}/messages:send'
FCM_BATCH_URL = 'https://fcm.googleapis.com/batch'
IID_URL = 'https://iid.googleapis.com'
IID_HEADERS = {'access_token_auth': 'true'}
JSON_ENCODER = _messaging_utils.MessageEncoder()
@@ -234,9 +342,13 @@ def __init__(self, app):
'projectId option, or use service account credentials. Alternatively, set the '
'GOOGLE_CLOUD_PROJECT environment variable.')
self._fcm_url = _MessagingService.FCM_URL.format(project_id)
self._fcm_headers = {
'X-GOOG-API-FORMAT-VERSION': '2',
'X-FIREBASE-CLIENT': 'fire-admin-python/{0}'.format(firebase_admin.__version__),
}
self._client = _http_client.JsonHttpClient(credential=app.credential.get_credential())
self._timeout = app.options.get('httpTimeout')
self._client_version = 'fire-admin-python/{0}'.format(firebase_admin.__version__)
self._transport = _auth.authorized_http(app.credential.get_credential())

@classmethod
def encode_message(cls, message):
@@ -245,16 +357,15 @@ def encode_message(cls, message):
return cls.JSON_ENCODER.default(message)

def send(self, message, dry_run=False):
data = {'message': _MessagingService.encode_message(message)}
if dry_run:
data['validate_only'] = True
data = self._message_data(message, dry_run)
try:
headers = {
'X-GOOG-API-FORMAT-VERSION': '2',
'X-FIREBASE-CLIENT': self._client_version,
}
resp = self._client.body(
'post', url=self._fcm_url, headers=headers, json=data, timeout=self._timeout)
'post',
url=self._fcm_url,
headers=self._fcm_headers,
json=data,
timeout=self._timeout
)
except requests.exceptions.RequestException as error:
if error.response is not None:
self._handle_fcm_error(error)
@@ -264,6 +375,42 @@ def send(self, message, dry_run=False):
else:
return resp['name']

def send_all(self, messages, dry_run=False):
"""Sends the given messages to FCM via the batch API."""
if not isinstance(messages, list):
raise ValueError('Messages must be an list of messaging.Message instances.')
if len(messages) > 100:
raise ValueError('send_all messages must not contain more than 100 messages.')

responses = []

def batch_callback(_, response, error):
exception = None
if error:
This conversation was marked as resolved by ZachOrr

This comment has been minimized.

Copy link
@hiranya911

hiranya911 May 6, 2019

Member

This doesn't look right. I'd except error here to be the ApiCallError raised in postproc.

exception = self._parse_batch_error(error)
send_response = SendResponse(response, exception)
responses.append(send_response)

batch = http.BatchHttpRequest(batch_callback, _MessagingService.FCM_BATCH_URL)
for message in messages:
body = json.dumps(self._message_data(message, dry_run))
req = http.HttpRequest(
http=self._transport,
postproc=self._postproc,
uri=self._fcm_url,
method='POST',
body=body,
headers=self._fcm_headers
)
batch.add(req)

try:
batch.execute()
except googleapiclient.http.HttpError as error:
raise self._parse_batch_error(error)
else:
return BatchResponse(responses)

def make_topic_management_request(self, tokens, topic, operation):
"""Invokes the IID service for topic management functionality."""
if isinstance(tokens, six.string_types):
@@ -299,6 +446,26 @@ def make_topic_management_request(self, tokens, topic, operation):
else:
return TopicManagementResponse(resp)

def _message_data(self, message, dry_run):
data = {'message': _MessagingService.encode_message(message)}
if dry_run:
data['validate_only'] = True
return data

def _postproc(self, resp, body):
"""Handle response from batch API request."""
if resp.status is not 200:
data = {}
try:
parsed_body = json.loads(body.decode())
if isinstance(parsed_body, dict):
data = parsed_body
except ValueError:
pass
code, msg = _MessagingService._parse_fcm_error(data, body, resp.status)
raise ApiCallError(code, msg)
This conversation was marked as resolved by ZachOrr

This comment has been minimized.

Copy link
@hiranya911

hiranya911 May 6, 2019

Member

I think I understand why this is not working properly. I found the following code in the googleapiclient:

     try:
        if resp.status >= 300:
          raise HttpError(resp, content, uri=request.uri)
        response = request.postproc(resp, content)
      except HttpError as e:
        exception = e

So we need to raise a googleapiclient.errors.HttpError from our postproc. Then we can get the content from it, and turn it into an ApiCallError in our callback.

This comment has been minimized.

Copy link
@ZachOrr

ZachOrr May 7, 2019

Author Contributor

This is my confusion - if we get a >= 300 from whatever our API call returns, we don't hit our postproc function. We catch the googleapiclient.errors.HttpError https://github.com/firebase/firebase-admin-python/pull/283/files#diff-3101917f5d55cf93592ea47ca59a7015R409

Our postproc appears to only be called when we get < 300 status code. Which was my question earlier about "when would we get a non-200 and non-error response code from FCM". It feels like we can just throw a generic error in our postproc if it's non-200, right? Other cases should already be handled?

This comment has been minimized.

Copy link
@hiranya911

hiranya911 May 8, 2019

Member

FCM mainly responds with 200 and 4xx/5xx responses. In case of 4xx/5xx the above code will raise an HTTPError, which we can catch in our callback, and convert into an ApiCallError. I believe you're already doing this.

So that brings us to postproc. You're right that postproc will only execute for 2xx responses. AFAIK FCM only returns 200 in this range. So we can relax our logic a little bit as follows:

def _postproc(self, resp, body):
        """Handle response from batch API request."""
        # This only gets called for 2xx responses.
        return json.loads(body.decode())

This essentially ignores the success response code, and treats any 2xx response as a success. But I think that's ok for FCM.

return json.loads(body.decode())

def _handle_fcm_error(self, error):
"""Handles errors received from the FCM API."""
data = {}
@@ -309,20 +476,8 @@ def _handle_fcm_error(self, error):
except ValueError:
pass

error_dict = data.get('error', {})
server_code = None
for detail in error_dict.get('details', []):
if detail.get('@type') == 'type.googleapis.com/google.firebase.fcm.v1.FcmError':
server_code = detail.get('errorCode')
break
if not server_code:
server_code = error_dict.get('status')
code = _MessagingService.FCM_ERROR_CODES.get(server_code, _MessagingService.UNKNOWN_ERROR)

msg = error_dict.get('message')
if not msg:
msg = 'Unexpected HTTP response with status: {0}; body: {1}'.format(
error.response.status_code, error.response.content.decode())
code, msg = _MessagingService._parse_fcm_error(
data, error.response.content, error.response.status_code)
raise ApiCallError(code, msg, error)

def _handle_iid_error(self, error):
@@ -342,3 +497,39 @@ def _handle_iid_error(self, error):
msg = 'Unexpected HTTP response with status: {0}; body: {1}'.format(
error.response.status_code, error.response.content.decode())
raise ApiCallError(code, msg, error)

def _parse_batch_error(self, error):
"""Parses a googleapiclient.http.HttpError content in to an ApiCallError."""
if error.content is None:
msg = 'Failed to call messaging API: {0}'.format(error)
return ApiCallError(self.INTERNAL_ERROR, msg, error)

data = {}
try:
parsed_body = json.loads(error.content.decode())
if isinstance(parsed_body, dict):
data = parsed_body
except ValueError:
pass

code, msg = _MessagingService._parse_fcm_error(data, error.content, error.resp.status)
return ApiCallError(code, msg, error)

@classmethod
def _parse_fcm_error(cls, data, content, status_code):
"""Parses an error response from the FCM API to a ApiCallError."""
error_dict = data.get('error', {})
server_code = None
for detail in error_dict.get('details', []):
if detail.get('@type') == 'type.googleapis.com/google.firebase.fcm.v1.FcmError':
server_code = detail.get('errorCode')
break
if not server_code:
server_code = error_dict.get('status')
code = _MessagingService.FCM_ERROR_CODES.get(server_code, _MessagingService.UNKNOWN_ERROR)

msg = error_dict.get('message')
if not msg:
msg = 'Unexpected HTTP response with status: {0}; body: {1}'.format(
status_code, content.decode())
return code, msg
@@ -6,6 +6,7 @@ tox >= 3.6.0

cachecontrol >= 0.12.4
google-api-core[grpc] >= 1.7.0, < 2.0.0dev; platform.python_implementation != 'PyPy'
google-api-python-client >= 1.7.8
This conversation was marked as resolved by ZachOrr

This comment has been minimized.

Copy link
@hiranya911

hiranya911 May 2, 2019

Member

Might want to also add this to setup.py.

google-cloud-firestore >= 0.31.0; platform.python_implementation != 'PyPy'
google-cloud-storage >= 1.13.0
six >= 1.6.1
@@ -39,6 +39,7 @@
install_requires = [
'cachecontrol>=0.12.4',
'google-api-core[grpc] >= 1.7.0, < 2.0.0dev; platform.python_implementation != "PyPy"',
'google-api-python-client >= 1.7.8',
'google-cloud-firestore>=0.31.0; platform.python_implementation != "PyPy"',
'google-cloud-storage>=1.13.0',
'six>=1.6.1'
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.