Skip to content

Commit

Permalink
Add bulk querying
Browse files Browse the repository at this point in the history
  • Loading branch information
frankkessler committed Sep 23, 2016
1 parent 258f80f commit 82defc6
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 12 deletions.
62 changes: 50 additions & 12 deletions src/Bulk.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,27 @@ public function runBatch($operation, $objectType, $data, $options = [])
'batchTimeout' => 600,
'contentType' => 'JSON',
'pollIntervalSeconds' => 5,
'isBatchedResult' => false,
];

$options = array_replace($defaults, $options);

if($operation == 'query'){
$options['isBatchedResult'] = true;
}

$job = $this->createJob($operation, $objectType, $options['externalIdFieldName'], $options['contentType']);

if ($job->id) {
$totalNumberOfBatches = ceil(count($data) / $options['batchSize']);
//if data is array, we can split it into batches
if(is_array($data)) {
$totalNumberOfBatches = ceil(count($data) / $options['batchSize']);

for ($i = 1; $i <= $totalNumberOfBatches; $i++) {
$batches[] = $this->addBatch($job->id, array_splice($data, ($i - 1) * $options['batchSize'], $options['batchSize']));
for ($i = 1; $i <= $totalNumberOfBatches; $i++) {
$batches[] = $this->addBatch($job->id, array_splice($data, ($i - 1) * $options['batchSize'], $options['batchSize']));
}
}else{ //probably a string query so run in onee batch
$batches[] = $this->addBatch($job->id, $data);
}
} else {
$this->log('error', 'Job Failed: '.json_encode($job->toArrayAll()));
Expand All @@ -72,7 +82,7 @@ public function runBatch($operation, $objectType, $data, $options = [])

$batch = $this->batchDetails($job->id, $batch->id);
if (in_array($batch->state, ['Completed', 'Failed', 'Not Processed'])) {
$batchResult = $this->batchResult($job->id, $batch->id);
$batchResult = $this->batchResult($job->id, $batch->id, $options['isBatchedResult']);
$batch->records = $batchResult->records;
$batches_finished[] = $batch->id;
}
Expand All @@ -89,7 +99,7 @@ public function runBatch($operation, $objectType, $data, $options = [])
$time = time();
}

$job = $this->jobDetails($job->id);
$job = $this->closeJob($job->id);
$job->batches = $batches;

return $job;
Expand Down Expand Up @@ -183,11 +193,20 @@ public function addBatch($jobId, $data)

$url = '/services/async/'.SalesforceConfig::get('salesforce.api.version').'/job/'.$jobId.'/batch';

$result = $this->call_api('post', $url, [
'body' => json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_NUMERIC_CHECK),
'headers' => [
$headers = [];
//json_encode any arrays to send over to bulk api
if(is_array($data)){
$body = json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES | JSON_NUMERIC_CHECK);
$headers = [
'Content-type' => 'application/json',
],
];
}else{
$body = $data;
}

$result = $this->call_api('post', $url, [
'body' => $body,
'headers' => $headers,
]);

if ($result && is_array($result)) {
Expand Down Expand Up @@ -224,7 +243,7 @@ public function batchDetails($jobId, $batchId)
*
* @return BulkBatchResultResponse
*/
public function batchResult($jobId, $batchId)
public function batchResult($jobId, $batchId, $isBatchedResult=false, $resultId=null)
{
if (!$jobId || !$batchId) {
//throw exception
Expand All @@ -233,16 +252,35 @@ public function batchResult($jobId, $batchId)

$url = '/services/async/'.SalesforceConfig::get('salesforce.api.version').'/job/'.$jobId.'/batch/'.$batchId.'/result';

//if this is a query result, the main result page will have an array of result ids to follow for hte query results
if($resultId){
$url = $url.'/'.$resultId;
}

$result = $this->call_api('get', $url);

if ($result && is_array($result)) {
//maximum amount of batch records allowed it 10,000

//initialize array for records to be used later
if(!isset($result['records']) || !is_array($result['records'])){
$result['records'] = [];
}

//maximum amount of batch records allowed is 10,000
for ($i = 0; $i < 10000; $i++) {
//skip processing for the rest of the records if they don't exist
if (!isset($result[$i])) {
break;
}
$result['records'][$i] = $result[$i];

//batched results return a list of result ids that need to be processed to get the actual data
if($isBatchedResult){
$batchResult = $this->batchResult($jobId, $batchId, false, $result[$i]);
$result['records'] = array_merge($result['records'], $batchResult->records);
}else{
$result['records'][$i] = $result[$i];
}

unset($result[$i]);
}

Expand Down
70 changes: 70 additions & 0 deletions tests/BulkTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,47 @@ public function testRunBatch()
}
}

public function testRunQueryBatch()
{
// Create a mock and queue two responses.
$mock = new MockHandler([
new Response(200, [], json_encode($this->jobArray())),
new Response(200, [], json_encode($this->batchArray(['state' => 'Queued']))),
new Response(200, [], json_encode($this->batchArray())),
new Response(200, [], json_encode($this->dataQueryResultArray())),
new Response(200, [], json_encode($this->dataQueryDataResultArray())),
new Response(200, [], json_encode($this->jobArray())),
]);

$handler = HandlerStack::create($mock);

$salesforce = new \Frankkessler\Salesforce\Salesforce([
'handler' => $handler,
'salesforce.oauth.access_token' => 'TEST',
'salesforce.oauth.refresh_token' => 'TEST',
]);

$jobId = '750D00000004SkVIAU';
$batchId = '750D00000004SkGIAU';
$firstAccountId = '0014000001iM8r3AAC';

$operation = 'query';
$objectType = 'Account';
$data = $this->dataArray();

$job = $salesforce->bulk()->runBatch($operation, $objectType, $data);

$this->assertEquals($jobId, $job->id);

foreach ($job->batches as $batch) {
$this->assertEquals($batchId, $batch->id);
foreach ($batch->records as $record) {
$this->assertEquals($firstAccountId, $record['Id']);
break;
}
}
}

public function jobArray($overrides = [])
{
return array_replace([
Expand Down Expand Up @@ -234,4 +275,33 @@ public function dataResultArray()
],
];
}

public function dataQueryResultArray()
{
return ['742400000022G6b'];
}

public function dataQueryDataResultArray()
{
return json_decode(
'[
{
"attributes" : {
"type" : "Account",
"url" : "/services/data/v36.0/sobjects/Account/0014000001iM8r3AAC"
},
"Id" : "0014000001iM8r3AAC",
"Name" : "Greatest Bank"
}, {
"attributes" : {
"type" : "Account",
"url" : "/services/data/v36.0/sobjects/Account/0014000001iM8S5AAK"
},
"Id" : "0014000001iM8S5AAK",
"Name" : "Regions Insurance"
}
]',
true
);
}
}

0 comments on commit 82defc6

Please sign in to comment.