Skip to content

Commit

Permalink
Add BatchProcessor
Browse files Browse the repository at this point in the history
Summary:
The BatchProcessor provides functions to enable batch sending
Conversions API event requests.

Reviewed By: josejia

Differential Revision: D23771748

fbshipit-source-id: 4164f3fd
  • Loading branch information
marksliva authored and facebook-github-bot committed Sep 23, 2020
1 parent c2a0d7e commit 8bf5037
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -9,6 +9,7 @@ All notable changes to this project will be documented in this file.

### Added
- Added HttpServiceInterface to enable the default request object to be overridden by a user-defined HTTP Request Service object. Available for Conversions API create event requests.
- Added batching support to Conversions API. Create batched event requests by using BatchProcessor.

## v8.0.0

Expand Down
10 changes: 9 additions & 1 deletion gulpfile.js
Expand Up @@ -43,7 +43,15 @@ function roll(format, entry, name, entry_dir, dest_dir) {
babel(
babelrc.default({
config: {
presets: [['es2015', {modules: false}]],
presets: [
[
'babel-preset-env',
{
'targets': 'defaults',
'exclude': ['transform-regenerator']
}
]
],
plugins: [
'external-helpers',
'syntax-flow',
Expand Down
1 change: 1 addition & 0 deletions package.json
Expand Up @@ -44,6 +44,7 @@
"babel-plugin-async-to-promises": "^1.0.5",
"babel-plugin-external-helpers": "~6.8.0",
"babel-plugin-transform-flow-strip-types": "^6.22.0",
"babel-preset-env": "^1.7.0",
"babel-preset-es2015": "^6.24.1",
"babel-preset-flow": "^6.23.0",
"babel-preset-stage-2": "^6.24.1",
Expand Down
1 change: 1 addition & 0 deletions src/bundle.es6
Expand Up @@ -21,6 +21,7 @@ export { default as DeliveryCategory } from './../src/objects/serverside/deliver
export { default as HttpMethod } from './../src/objects/serverside/http-method';
export { default as HttpServiceClientConfig } from './../src/objects/serverside/http-service-client-config';
export { default as HttpServiceInterface } from './../src/objects/serverside/http-service-interface';
export { default as BatchProcessor } from './../src/objects/serverside/batch-processor';

export { default as Ad } from './../src/objects/ad';
export { default as AdAccount } from './../src/objects/ad-account';
Expand Down
75 changes: 75 additions & 0 deletions src/objects/serverside/batch-processor.js
@@ -0,0 +1,75 @@
/**
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE file in the root directory of this source tree.
* @flow
*/

import ServerEvent from './server-event';
import EventRequest from './event-request';
import EventResponse from './event-response';

export default class BatchProcessor {
_batch_size: number
_concurrent_requests: number

constructor(batch_size: number, concurrent_requests: number) {
this._batch_size = batch_size;
this._concurrent_requests = concurrent_requests;
}

*processEventRequestsGenerator(event_requests: Array<EventRequest>): Generator<Array<Promise<EventResponse>>,void,EventRequest> {
let start = 0;
let end = this._concurrent_requests;
let requests = event_requests.slice(start, end);
while (requests.length > 0) {
yield requests.map(request => request.execute());
start = end;
end += this._concurrent_requests;
requests = event_requests.slice(start, end);
}
return;
}

async processEventRequests(event_requests: Array<EventRequest>) {
const generator = this.processEventRequestsGenerator(event_requests);
while (true) {
const batch = generator.next().value;
if (!batch || batch.length === 0) {
generator.return();
return;
}
await Promise.all(batch);
}
}

*processEventsGenerator(event_request_to_clone: EventRequest, all_events: Array<ServerEvent>): Generator<Array<Promise<EventResponse>>,void,EventRequest> {
let index = 0;
while (index < all_events.length) {
let event_requests = [];
while (index < all_events.length && event_requests.length < this._concurrent_requests) {
const event_request = event_request_to_clone.cloneWithoutEvents();
const events = all_events.slice(index, index + this._batch_size);
event_request.setEvents(events);
event_requests.push(event_request);
index += this._batch_size;
}
yield event_requests.map(request => request.execute());
}
return;
}

async processEvents(event_request_to_clone: EventRequest, all_events: Array<ServerEvent>) {
const generator = this.processEventsGenerator(event_request_to_clone, all_events);
while (true) {
const batch = generator.next().value;
if (!batch || batch.length === 0) {
generator.return();
return;
}
await Promise.all(batch);
}
}
}
21 changes: 18 additions & 3 deletions src/objects/serverside/event-request.js
Expand Up @@ -49,19 +49,19 @@ export default class EventRequest {
*/
constructor(access_token: string, pixel_id: string, events: Array<ServerEvent> = [],
partner_agent: ?string = null, test_event_code: ?string = null,
namespace_id: string, upload_id: string, upload_tag: string, upload_source: string,
namespace_id: ?string = null, upload_id: ?string = null,
upload_tag: ?string = null, upload_source: ?string = null,
debug_mode_flag: bool = false, http_service: ?HttpServiceInterface = null ) {

this._access_token = access_token;
this._pixel_id = pixel_id;
this._events = events;
this._partner_agent = partner_agent;
this._test_event_code = test_event_code;
this._debug_mode = debug_mode_flag;
this._namespace_id = namespace_id;
this._upload_id = upload_id;
this._upload_tag = upload_tag;
this._upload_source = upload_source;
this._debug_mode = debug_mode_flag;

this._http_service = http_service;
this._api = FacebookAdsApi.init(this._access_token);
Expand Down Expand Up @@ -417,4 +417,19 @@ export default class EventRequest {
response._data['num_processed_entries']);
});
}

cloneWithoutEvents(): EventRequest {
return new EventRequest(
this._access_token,
this._pixel_id,
[],
this._partner_agent,
this._test_event_code,
this._namespace_id,
this._upload_id,
this._upload_tag,
this._upload_source,
this._debug_mode,
);
}
}
137 changes: 137 additions & 0 deletions tests/batch-processor_tests.js
@@ -0,0 +1,137 @@
/**
* Copyright (c) 2017-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the license found in the
* LICENSE file in the root directory of this source tree.
*/

'use strict';
const {BatchProcessor, Content, CustomData, DeliveryCategory} = require('facebook-nodejs-business-sdk');
const {describe} = require('mocha');
const {expect} = require('chai');


describe('BatchProcessor', function() {
it('processEventRequests processes all event requests', async function() {
const event_requests = getEventRequests(5);
const batchProcessor = new BatchProcessor(2, 2);
await batchProcessor.processEventRequests(event_requests);

event_requests.forEach(event_request => expect(event_request.called_execute_count).to.equal(1));
});

it('processEventRequestsGenerator returns event request promises in batches', async function() {
const event_requests = getEventRequests(5);
const batchProcessor = new BatchProcessor(2, 2);
let iterations = 0;
let promises = [];
const generator = batchProcessor.processEventRequestsGenerator(event_requests);
let batch;
while(true) {
batch = generator.next().value;
if (!batch || batch.length === 0) {
generator.return();
break;
}
promises.push(batch);
iterations += 1;
}
await Promise.all(promises);

event_requests.forEach(event_request => expect(event_request.called_execute_count).to.equal(1));
expect(iterations).to.equal(3);
});

it('processEventsGenerator returns event request promises in batches', async function() {
const event_request = new EventRequestMock();
const events = getEvents(19);
const batchProcessor = new BatchProcessor(2, 2);
let iterations = 0;
let promises = [];
const generator = batchProcessor.processEventsGenerator(event_request, events);
let batch;
while(true) {
batch = generator.next().value;
if (!batch || batch.length === 0) {
generator.return();
break;
}
promises.push(batch);
iterations += 1;
}
await Promise.all(promises);

expect(iterations).to.equal(5);
});

it('processEvents processes all events', async function() {
const event_request = new EventRequestMock();
const events = getEvents(11);
const batchProcessor = new BatchProcessor(2, 3);
await batchProcessor.processEvents(event_request, events);

expect(event_request.cloned_event_requests.length).to.equal(6);
event_request.cloned_event_requests.forEach(event_request => {
expect(event_request.called_execute_count).to.equal(1)
});
const event_batches = event_request.cloned_event_requests.map(request => request.set_events);
expect(event_batches).to.deep.equal([
[events.slice(0, 2)],
[events.slice(2, 4)],
[events.slice(4, 6)],
[events.slice(6, 8)],
[events.slice(8, 10)],
[events.slice(10, 12)],
]);
});
});


// Test helpers
class EventRequestMock {
constructor() {
this.called_execute_count = 0;
this.set_events = []
this.cloned_event_requests = []
}

execute() {
return new Promise((resolve, reject) => {
this.called_execute_count += 1;
return resolve();
});
}

cloneWithoutEvents() {
const cloned_event_request = new EventRequestMock();
this.cloned_event_requests.push(cloned_event_request);
return cloned_event_request;
}

setEvents(events) {
this.set_events.push(events);
}
}

class EventMock {
constructor(num) {
this.num = num;
}
}

function getEventRequests(num) {
let events = [];
for (let i = 0 ; i < num; i++) {
events.push(new EventRequestMock());
}
return events;
}

function getEvents(num) {
let events = [];
for (let i = 0 ; i < num; i++) {
events.push(new EventMock(i));
}
return events;
}
30 changes: 30 additions & 0 deletions tests/event-request_tests.js
Expand Up @@ -7,7 +7,9 @@
*/

'use strict';

const {
Event,
EventRequest,
EventResponse,
FacebookAdsApi,
Expand Down Expand Up @@ -157,4 +159,32 @@ describe('EventRequest', function() {
expect(actual_params).to.deep.equal(expected_params);
});
});

it('cloneWithoutEvents clones the EventRequest object without the events', async function() {
const event_request = new EventRequest(
'access_token-1',
'pixel_id-2',
[new Event()],
'partner_agent-3',
'test_event_code-4',
'namespace_id-5',
'upload_id-6',
'upload_tag-7',
'upload_source-8',
true
);
const cloned = event_request.cloneWithoutEvents();

expect(cloned.access_token).to.equal(event_request.access_token);
expect(cloned.pixel_id).to.equal(event_request.pixel_id);
expect(cloned.events).to.deep.equal([]);
expect(cloned.partner_agent).to.equal(event_request.partner_agent);
expect(cloned.test_event_code).to.equal(event_request.test_event_code);
expect(cloned.namespace_id).to.equal(event_request.namespace_id);
expect(cloned.upload_id).to.equal(event_request.upload_id);
expect(cloned.upload_tag).to.equal(event_request.upload_tag);
expect(cloned.upload_source).to.equal(event_request.upload_source);
expect(cloned.debug_mode).to.equal(event_request.debug_mode);
expect(cloned === event_request).to.equal(false);
});
});

0 comments on commit 8bf5037

Please sign in to comment.