Skip to content

Commit

Permalink
Merge pull request #58 from heroku/batch-list
Browse files Browse the repository at this point in the history
Support batch list and batch request API
  • Loading branch information
lambacck committed Oct 24, 2017
2 parents 2a71365 + 66dc5f4 commit e106ada
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 5 deletions.
2 changes: 1 addition & 1 deletion salesforce_bulk/bulk_states.py
@@ -1,6 +1,6 @@
ABORTED = 'Aborted'
FAILED = 'Failed'
NOT_PROCESSED = 'Not Processed'
NOT_PROCESSED = 'NotProcessed'
COMPLETED = 'Completed'

ERROR_STATES = (
Expand Down
37 changes: 34 additions & 3 deletions salesforce_bulk/salesforce_bulk.py
Expand Up @@ -55,17 +55,18 @@ def __reduce__(self):

class BulkBatchFailed(BulkApiError):

def __init__(self, job_id, batch_id, state_message):
def __init__(self, job_id, batch_id, state_message, state=None):
self.job_id = job_id
self.batch_id = batch_id
self.state_message = state_message
self.state = state

message = 'Batch {0} of job {1} failed: {2}'.format(batch_id, job_id,
state_message)
super(BulkBatchFailed, self).__init__(message)

def __reduce__(self):
return BulkBatchFailed, (self.job_id, self.batch_id, self.state_message)
return BulkBatchFailed, (self.job_id, self.batch_id, self.state_message, self.state)


job_to_http_content_type = {
Expand Down Expand Up @@ -211,6 +212,26 @@ def check_status(self, resp):
msg = "Bulk API HTTP Error result: {0}".format(resp.text)
self.raise_error(msg, resp.status_code)

def get_batch_list(self, job_id):
url = self.endpoint + "/job/{}/batch".format(job_id)
resp = requests.get(url, headers=self.headers())
self.check_status(resp)
results = self.parse_response(resp)
if isinstance(results, dict):
return results['batchInfo']

return results

def get_query_batch_request(self, batch_id, job_id=None):
""" Fetch the request sent for the batch. Note should only used for query batches """
if not job_id:
job_id = self.lookup_job_id(batch_id)

url = self.endpoint + "/job/{}/batch/{}/request".format(job_id, batch_id)
resp = requests.get(url, headers=self.headers())
self.check_status(resp)
return resp.text

def close_job(self, job_id):
doc = self.create_close_job_doc()
url = self.endpoint + "/job/%s" % job_id
Expand Down Expand Up @@ -361,6 +382,16 @@ def parse_response(self, resp):
return resp.json()

tree = ET.fromstring(resp.content)
if nsclean.sub("", tree.tag) == 'batchInfoList':
results = []
for subtree in tree:
result = {}
results.append(result)
for child in subtree:
result[nsclean.sub("", child.tag)] = child.text

return results

result = {}
for child in tree:
result[nsclean.sub("", child.tag)] = child.text
Expand Down Expand Up @@ -394,7 +425,7 @@ def is_batch_done(self, batch_id, job_id=None):
batch_state = self.batch_state(batch_id, job_id=job_id, reload=True)
if batch_state in bulk_states.ERROR_STATES:
status = self.batch_status(batch_id, job_id)
raise BulkBatchFailed(job_id, batch_id, status['stateMessage'])
raise BulkBatchFailed(job_id, batch_id, status.get('stateMessage'), batch_state)
return batch_state == bulk_states.COMPLETED

# Wait for the given batch to complete, waiting at most timeout seconds
Expand Down
57 changes: 56 additions & 1 deletion salesforce_bulk/tests/test_salesforce_bulk.py
Expand Up @@ -23,7 +23,7 @@
import unicodecsv

from salesforce_bulk import SalesforceBulk, BulkApiError, UploadResult
from salesforce_bulk import CsvDictsAdapter
from salesforce_bulk import CsvDictsAdapter, bulk_states
from salesforce_bulk.salesforce_bulk import BulkJobAborted, BulkBatchFailed

nsclean = re.compile('{.*}')
Expand Down Expand Up @@ -288,6 +288,61 @@ def test_query(self):
['Email', 'Id', 'Name']
)

def test_query_pk_chunk(self):
bulk = SalesforceBulk(self.sessionId, self.endpoint)
self.bulk = bulk

job_id = bulk.create_query_job("Contact", contentType=self.contentType, pk_chunking=True)
self.jobs.append(job_id)
self.assertIsNotNone(re.match("\w+", job_id))

query = "Select Id,Name,Email from Contact"
batch_id = bulk.query(job_id, query)
self.assertIsNotNone(re.match("\w+", batch_id))

try:
i = 0
while not bulk.is_batch_done(batch_id):
print("Job not done yet...")
print(bulk.batch_status(batch_id))
time.sleep(2)
i += 1
if i == 20:
raise Exception
except BulkBatchFailed as e:
if e.state != bulk_states.NOT_PROCESSED:
raise

batches = bulk.get_batch_list(job_id)
print (batches)
batch_ids = [x['id'] for x in batches if x['state'] != bulk_states.NOT_PROCESSED]
requests = [bulk.get_query_batch_request(x, job_id) for x in batch_ids]
print (requests)
for request in requests:
self.assertTrue(request.startswith(query))

all_results = []

i = 0
while not all(bulk.is_batch_done(j, job_id) for j in batch_ids):
print("Job not done yet...")
print(bulk.batch_status(batch_id, job_id))
time.sleep(2)
i += 1
if i == 20:
raise Exception

for batch_id in batch_ids:
results = bulk.get_all_results_for_query_batch(batch_id, job_id)
for result in results:
all_results.extend(self.parse_results(result))

self.assertTrue(len(all_results) > 0)
self.assertEqual(
sorted(all_results[0].keys()),
['Email', 'Id', 'Name']
)

def test_upload(self):
bulk = SalesforceBulk(self.sessionId, self.endpoint)
self.bulk = bulk
Expand Down

0 comments on commit e106ada

Please sign in to comment.