Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 80 additions & 76 deletions src/amplitude-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,8 @@ AmplitudeClient.prototype.init = function init(apiKey, opt_userId, opt_config, o

// load unsent events and identifies before any attempt to log new ones
if (this.options.saveEvents) {
// validate event properties for unsent events
for (let i = 0; i < this._unsentEvents.length; i++) {
var eventProperties = this._unsentEvents[i].event_properties;
var groups = this._unsentEvents[i].groups;
this._unsentEvents[i].event_properties = utils.validateProperties(eventProperties);
this._unsentEvents[i].groups = utils.validateGroups(groups);
}

// validate user properties for unsent identifys
for (let j = 0; j < this._unsentIdentifys.length; j++) {
var userProperties = this._unsentIdentifys[j].user_properties;
var identifyGroups = this._unsentIdentifys[j].groups;
this._unsentIdentifys[j].user_properties = utils.validateProperties(userProperties);
this._unsentIdentifys[j].groups = utils.validateGroups(identifyGroups);
}
_validateUnsentEventQueue(this._unsentEvents);
_validateUnsentEventQueue(this._unsentIdentifys);
}

this._lastEventTime = now;
Expand Down Expand Up @@ -212,8 +199,8 @@ AmplitudeClient.prototype.init = function init(apiKey, opt_userId, opt_config, o
}
}
if (this.options.saveEvents) {
this._unsentEvents = this._parseSavedUnsentEventsString(values[1]).concat(this._unsentEvents);
this._unsentIdentifys = this._parseSavedUnsentEventsString(values[2]).concat(this._unsentIdentifys);
this._unsentEvents = this._parseSavedUnsentEventsString(values[1]).map(event => ({event})).concat(this._unsentEvents);
this._unsentIdentifys = this._parseSavedUnsentEventsString(values[2]).map(event => ({event})).concat(this._unsentIdentifys);
}
if (DeviceInfo) {
Promise.all([
Expand Down Expand Up @@ -247,8 +234,8 @@ AmplitudeClient.prototype.init = function init(apiKey, opt_userId, opt_config, o
});
} else {
if (this.options.saveEvents) {
this._unsentEvents = this._loadSavedUnsentEvents(this.options.unsentKey).concat(this._unsentEvents);
this._unsentIdentifys = this._loadSavedUnsentEvents(this.options.unsentIdentifyKey).concat(this._unsentIdentifys);
this._unsentEvents = this._loadSavedUnsentEvents(this.options.unsentKey).map(event => ({event})).concat(this._unsentEvents);
this._unsentIdentifys = this._loadSavedUnsentEvents(this.options.unsentIdentifyKey).map(event => ({event})).concat(this._unsentIdentifys);
}
initFromStorage();
this.runQueuedFunctions();
Expand All @@ -262,6 +249,19 @@ AmplitudeClient.prototype.init = function init(apiKey, opt_userId, opt_config, o
}
};

// validate properties for unsent events
const _validateUnsentEventQueue = (queue) => {
for (let i = 0; i < queue.length; i++) {
const userProperties = queue[i].event.user_properties;
const eventProperties = queue[i].event.event_properties;
const groups = queue[i].event.groups;

queue[i].event.user_properties = utils.validateProperties(userProperties);
queue[i].event.event_properties = utils.validateProperties(eventProperties);
queue[i].event.groups = utils.validateGroups(groups);
}
};

/**
* @private
*/
Expand Down Expand Up @@ -486,20 +486,20 @@ AmplitudeClient.prototype._unsentCount = function _unsentCount() {
* Send events if ready. Returns true if events are sent.
* @private
*/
AmplitudeClient.prototype._sendEventsIfReady = function _sendEventsIfReady(callback) {
AmplitudeClient.prototype._sendEventsIfReady = function _sendEventsIfReady() {
if (this._unsentCount() === 0) {
return false;
}

// if batching disabled, send any unsent events immediately
if (!this.options.batchEvents) {
this.sendEvents(callback);
this.sendEvents();
return true;
}

// if batching enabled, check if min threshold met for batch size
if (this._unsentCount() >= this.options.eventUploadThreshold) {
this.sendEvents(callback);
this.sendEvents();
return true;
}

Expand Down Expand Up @@ -739,18 +739,22 @@ AmplitudeClient.prototype._saveReferrer = function _saveReferrer(referrer) {
*/
AmplitudeClient.prototype.saveEvents = function saveEvents() {
try {
const serializedUnsentEvents = JSON.stringify(this._unsentEvents.map(({event}) => event));

if (AsyncStorage) {
AsyncStorage.setItem(this.options.unsentKey + this._storageSuffix, JSON.stringify(this._unsentEvents));
AsyncStorage.setItem(this.options.unsentKey + this._storageSuffix, serializedUnsentEvents);
} else {
this._setInStorage(localStorage, this.options.unsentKey, JSON.stringify(this._unsentEvents));
this._setInStorage(localStorage, this.options.unsentKey, serializedUnsentEvents);
}
} catch (e) {}

try {
const serializedIdentifys = JSON.stringify(this._unsentIdentifys.map(unsentIdentify => unsentIdentify.event));

if (AsyncStorage) {
AsyncStorage.setItem(this.options.unsentIdentifyKey + this._storageSuffix, JSON.stringify(this._unsentIdentifys));
AsyncStorage.setItem(this.options.unsentIdentifyKey + this._storageSuffix, serializedIdentifys);
} else {
this._setInStorage(localStorage, this.options.unsentIdentifyKey, JSON.stringify(this._unsentIdentifys));
this._setInStorage(localStorage, this.options.unsentIdentifyKey, serializedIdentifys);
}
} catch (e) {}
};
Expand Down Expand Up @@ -1183,20 +1187,18 @@ AmplitudeClient.prototype._logEvent = function _logEvent(eventType, eventPropert
};

if (eventType === Constants.IDENTIFY_EVENT || eventType === Constants.GROUP_IDENTIFY_EVENT) {
this._unsentIdentifys.push(event);
this._unsentIdentifys.push({event, callback});
this._limitEventsQueued(this._unsentIdentifys);
} else {
this._unsentEvents.push(event);
this._unsentEvents.push({event, callback});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

this._limitEventsQueued(this._unsentEvents);
}

if (this.options.saveEvents) {
this.saveEvents();
}

if (!this._sendEventsIfReady(callback) && type(callback) === 'function') {
callback(0, 'No request sent', {reason: 'No events to send or upload queued'});
}
this._sendEventsIfReady(callback);

return eventId;
} catch (e) {
Expand Down Expand Up @@ -1400,25 +1402,31 @@ if (BUILD_COMPAT_2_0) {
* Remove events in storage with event ids up to and including maxEventId.
* @private
*/
AmplitudeClient.prototype.removeEvents = function removeEvents(maxEventId, maxIdentifyId) {
_removeEvents(this, '_unsentEvents', maxEventId);
_removeEvents(this, '_unsentIdentifys', maxIdentifyId);
AmplitudeClient.prototype.removeEvents = function removeEvents(maxEventId, maxIdentifyId, status, response) {
_removeEvents(this, '_unsentEvents', maxEventId, status, response);
_removeEvents(this, '_unsentIdentifys', maxIdentifyId, status, response);
};

/**
* Helper function to remove events up to maxId from a single queue.
* Does a true filter in case events get out of order or old events are removed.
* @private
*/
var _removeEvents = function _removeEvents(scope, eventQueue, maxId) {
var _removeEvents = function _removeEvents(scope, eventQueue, maxId, status, response) {
if (maxId < 0) {
return;
}

var filteredEvents = [];
for (var i = 0; i < scope[eventQueue].length || 0; i++) {
if (scope[eventQueue][i].event_id > maxId) {
filteredEvents.push(scope[eventQueue][i]);
const unsentEvent = scope[eventQueue][i];

if (unsentEvent.event.event_id > maxId) {
filteredEvents.push(unsentEvent);
} else {
if (unsentEvent.callback) {
unsentEvent.callback(status, response);
}
}
}
scope[eventQueue] = filteredEvents;
Expand All @@ -1428,32 +1436,26 @@ var _removeEvents = function _removeEvents(scope, eventQueue, maxId) {
* Send unsent events. Note: this is called automatically after events are logged if option batchEvents is false.
* If batchEvents is true, then events are only sent when batch criterias are met.
* @private
* @param {Amplitude~eventCallback} callback - (optional) callback to run after events are sent.
* Note the server response code and response body are passed to the callback as input arguments.
*/
AmplitudeClient.prototype.sendEvents = function sendEvents(callback) {
AmplitudeClient.prototype.sendEvents = function sendEvents() {
if (!this._apiKeySet('sendEvents()')) {
if (type(callback) === 'function') {
callback(0, 'No request sent', {reason: 'API key not set'});
}
this.removeEvents(Infinity, Infinity, 0, 'No request sent', {reason: 'API key not set'});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this work? maybe i'm mistaken, but doesn't this break on line 1421 above?
where scope = eventQueue = Infinity and scope[eventQueue] will be undefined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's confusing. There is a removeEvents function bound to this and there's a _removeEvents function. I'm calling the former which invokes the latter with scope.

return;
}

if (this.options.optOut) {
if (type(callback) === 'function') {
callback(0, 'No request sent', {reason: 'optOut is set to true'});
}
this.removeEvents(Infinity, Infinity, 0, 'No request sent', {reason: 'Opt out is set to true'});
return;
}

// How is it possible to get into this state?
if (this._unsentCount() === 0) {
if (type(callback) === 'function') {
callback(0, 'No request sent', {reason: 'No events to send'});
}
return;
}

// We only make one request at a time. sendEvents will be invoked again once
// the last request completes.
if (this._sending) {
if (type(callback) === 'function') {
callback(0, 'No request sent', {reason: 'Request already in progress. Events will be sent once this request is complete'});
}
return;
}

Expand All @@ -1466,7 +1468,7 @@ AmplitudeClient.prototype.sendEvents = function sendEvents(callback) {
var mergedEvents = this._mergeEventsAndIdentifys(numEvents);
var maxEventId = mergedEvents.maxEventId;
var maxIdentifyId = mergedEvents.maxIdentifyId;
var events = JSON.stringify(mergedEvents.eventsToSend);
var events = JSON.stringify(mergedEvents.eventsToSend.map(({event}) => event));
var uploadTime = new Date().getTime();

var data = {
Expand All @@ -1482,33 +1484,35 @@ AmplitudeClient.prototype.sendEvents = function sendEvents(callback) {
scope._sending = false;
try {
if (status === 200 && response === 'success') {
scope.removeEvents(maxEventId, maxIdentifyId);
scope.removeEvents(maxEventId, maxIdentifyId, status, response);

// Update the event cache after the removal of sent events.
if (scope.options.saveEvents) {
scope.saveEvents();
}

// Send more events if any queued during previous send.
if (!scope._sendEventsIfReady(callback) && type(callback) === 'function') {
callback(status, response);
}
scope._sendEventsIfReady();

// handle payload too large
} else if (status === 413) {
// utils.log('request too large');
// Can't even get this one massive event through. Drop it, even if it is an identify.
if (scope.options.uploadBatchSize === 1) {
scope.removeEvents(maxEventId, maxIdentifyId);
scope.removeEvents(maxEventId, maxIdentifyId, status, response);
}

// The server complained about the length of the request. Backoff and try again.
scope.options.uploadBatchSize = Math.ceil(numEvents / 2);
scope.sendEvents(callback);
scope.sendEvents();

} else if (type(callback) === 'function') { // If server turns something like a 400
callback(status, response);
}
// else {
// all the events are still queued, and will be retried when the next
// event is sent In the interest of debugging, it would be nice to have
// something like an event emitter for a better debugging experince
// here.
// }
} catch (e) {
// utils.log('failed upload');
}
Expand All @@ -1528,9 +1532,9 @@ AmplitudeClient.prototype._mergeEventsAndIdentifys = function _mergeEventsAndIde
var maxIdentifyId = -1;

while (eventsToSend.length < numEvents) {
var event;
var noIdentifys = identifyIndex >= this._unsentIdentifys.length;
var noEvents = eventIndex >= this._unsentEvents.length;
let unsentEvent;
let noIdentifys = identifyIndex >= this._unsentIdentifys.length;
let noEvents = eventIndex >= this._unsentEvents.length;

// case 0: no events or identifys left
// note this should not happen, this means we have less events and identifys than expected
Expand All @@ -1541,29 +1545,29 @@ AmplitudeClient.prototype._mergeEventsAndIdentifys = function _mergeEventsAndIde

// case 1: no identifys - grab from events
else if (noIdentifys) {
event = this._unsentEvents[eventIndex++];
maxEventId = event.event_id;
unsentEvent = this._unsentEvents[eventIndex++];
maxEventId = unsentEvent.event.event_id;

// case 2: no events - grab from identifys
} else if (noEvents) {
event = this._unsentIdentifys[identifyIndex++];
maxIdentifyId = event.event_id;
unsentEvent = this._unsentIdentifys[identifyIndex++];
maxIdentifyId = unsentEvent.event.event_id;

// case 3: need to compare sequence numbers
} else {
// events logged before v2.5.0 won't have a sequence number, put those first
if (!('sequence_number' in this._unsentEvents[eventIndex]) ||
this._unsentEvents[eventIndex].sequence_number <
this._unsentIdentifys[identifyIndex].sequence_number) {
event = this._unsentEvents[eventIndex++];
maxEventId = event.event_id;
if (!('sequence_number' in this._unsentEvents[eventIndex].event) ||
this._unsentEvents[eventIndex].event.sequence_number <
this._unsentIdentifys[identifyIndex].event.sequence_number) {
unsentEvent = this._unsentEvents[eventIndex++];
maxEventId = unsentEvent.event.event_id;
} else {
event = this._unsentIdentifys[identifyIndex++];
maxIdentifyId = event.event_id;
unsentEvent = this._unsentIdentifys[identifyIndex++];
maxIdentifyId = unsentEvent.event.event_id;
}
}

eventsToSend.push(event);
eventsToSend.push(unsentEvent);
}

return {
Expand Down
Loading