Skip to content

Commit

Permalink
Add JSON Bundle Feature
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Prescod committed May 20, 2022
1 parent 9f98330 commit aeb5bb0
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 32 deletions.
85 changes: 72 additions & 13 deletions examples/salesforce/LoadCompositeAPIData.apex
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,73 @@

public class LoadCompositeAPIData {

public void loadSingleJson(String url) {
// Load one of three JSON formats.
//
// 1. One with a top-level key called "tables" which links to other
// composite graph payload jsons, like this:
//
// https://gist.githubusercontent.com/prescod/6f3aebafde63971d1093549c3bef4e41/raw/8885df2618ece474c196d5d94b594dd2b5961c71/csvw_metadata.json
//
// Create the files with snowfakery.experimental.SalesforceCompositeAPIOutput.Folder
//
// 2. One with a top-level key called "data" which embeds compsite graph
// payloads as strings. Like this:
//
// https://gist.githubusercontent.com/prescod/6220fa27d8493be954be949c9f57f2b2/raw/b603a4d11ef20f0a30e79260322baa52f969068d/out.bundle.json'
//
// Create the files with snowfakery.experimental.SalesforceCompositeAPIOutput.Bundle
//
// 3. One which is just a single composite graph payload like this:
//
// https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json
//
// Which is recognizable by its top-level "graphs" key.
//
// Create the files with snowfakery.experimental.SalesforceCompositeAPIOutput

public void loadJsonSet(String set_url){
String json_record_sets = downloadJSON(set_url);
Map<String, Object> data = (Map<String, Object>)Json.deserializeUntyped(json_record_sets);
List<Object> tables = (List<Object>)data.get('tables');
if(tables != null){
loadDistributedJsonSet(tables);
return;
}

List<Object> graph_jsons = (List<Object>)data.get('data');
if(graph_jsons != null){
loadBundledJsonSet(graph_jsons);
return;
}

List<Object> graphs = (List<Object>)data.get('graphs');
if(graphs != null){
loadRecords(json_record_sets);
return;
}

}

// optimized method for a single composite graph (<500 records)
// This method doesn't parse the JSON to see what's in it.
public void loadSingleJsonGraphPayload(String url) {
System.debug('Loading JSON ' + url);
String json_records = downloadJSON(url);
loadRecords(json_records);
System.debug('Loaded JSON ' + url);
}

public void loadJsonSet(String set_url){
String json_record_sets = downloadJSON(set_url);
Map<String, Object> data = (Map<String, Object>)Json.deserializeUntyped(json_record_sets);
List<Object> tables = (List<Object>)data.get('tables');
public void loadDistributedJsonSet(List<Object> tables){
for(Object table_url: tables){
Map<String, Object> url_obj = (Map<String, Object>) table_url;
String url = (String)url_obj.get('url');
loadSingleJson(url);
loadSingleJsonGraphPayload(url);
}
}

public void loadBundledJsonSet(List<Object> graph_jsons){
for(Object graph_json: graph_jsons){
loadRecords((String)graph_json);
}
}

Expand All @@ -41,6 +93,7 @@ public class LoadCompositeAPIData {

request.setHeader('Authorization', 'OAuth ' + UserInfo.getSessionId());
request.setTimeout(120000);
System.debug(url);
return h.send(request);
}

Expand All @@ -57,8 +110,8 @@ public class LoadCompositeAPIData {

if(error!=null){
System.debug('Error: ' + error);
System.debug('DOWNLOADED Data');
System.debug(response_body);
// System.debug('DOWNLOADED Data');
// System.debug(response_body);
CalloutException e = new CalloutException( error);
throw e;
}
Expand Down Expand Up @@ -97,19 +150,25 @@ public class LoadCompositeAPIData {
}
}

JSONSObjectLoader loader = new JSONSObjectLoader();
LoadCompositeAPIData loader = new LoadCompositeAPIData();

// load a single Composite Graph JSON
loader.loadSingleJson('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json');
// load a single Composite Graph JSON Payload
loader.loadSingleJsonGraphPayload('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json');

// load anoter single one slightly less efficiently
loader.loadJsonSet('https://gist.githubusercontent.com/prescod/ffa992a7218906ab0dcf160b5d755259/raw/f9d40587a2ba9b04275241723637ed571bd55617/Graph%2520Gist');

// load a set of 3 Composite Graph JSONs
// load a set of 3 Composite Graph JSONs in a distributed set
loader.loadJsonSet('https://gist.githubusercontent.com/prescod/6f3aebafde63971d1093549c3bef4e41/raw/8885df2618ece474c196d5d94b594dd2b5961c71/csvw_metadata.json');

// load a single-file bundle of 3 Composite Graph JSONs
loader.loadJsonSet('https://gist.githubusercontent.com/prescod/6220fa27d8493be954be949c9f57f2b2/raw/b603a4d11ef20f0a30e79260322baa52f969068d/out.bundle.json');

System.debug('SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! ');
// experimentally, much more than 15 hits a cumulative
// maximum time allotted for callout error
//
// for (Integer i = 0; i < 15; i++) {
// loader.loadSingleJson('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json');
// loader.loadSingleJsonGraphPayload('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json');
// System.debug(i);
// }
72 changes: 53 additions & 19 deletions snowfakery/experimental/SalesforceCompositeAPIOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import typing as T
import datetime
from pathlib import Path
from tempfile import TemporaryDirectory

from snowfakery.output_streams import FileOutputStream, OutputStream

Expand Down Expand Up @@ -62,6 +63,8 @@ def flatten(
return "@{%s.id}" % target_reference

def close(self, **kwargs) -> T.Optional[T.Sequence[str]]:
# NOTE: Could improve loading performance by breaking graphs up
# to allow server-side parallelization, but I'd risk locking issues
data = {"graphs": [{"graphId": "graph", "compositeRequest": self.rows}]}
self.write(json.dumps(data, indent=2))
return super().close()
Expand All @@ -76,14 +79,19 @@ def __init__(self, output_folder, **kwargs):
if not Path.exists(self.target_path):
Path.mkdir(self.target_path, exist_ok=True)
self.recipe_sets = [[]]
self.current_batch = []
self.filenum = 0
self.filenames = []

def write_row(self, tablename: str, row_with_references: T.Dict) -> None:
self.recipe_sets[-1].append((tablename, row_with_references))

def write_single_row(self, tablename: str, row: T.Dict) -> None:
self.recipe_sets[-1].append((tablename, row))
assert 0, "Shouldn't be called. write_row should be called instead"

def close(self, **kwargs) -> T.Optional[T.Sequence[str]]:
self.flush_sets()
self.flush_batch()
table_metadata = [{"url": str(filename)} for filename in self.filenames]
metadata = {
"@context": "http://www.w3.org/ns/csvw",
Expand All @@ -95,29 +103,55 @@ def close(self, **kwargs) -> T.Optional[T.Sequence[str]]:
return [f"Created {self.target_path}"]

def complete_recipe(self, *args):
ready_rows = sum(len(s) for s in self.recipe_sets)
if ready_rows > 500:
self.flush_sets()
self.flush_sets()
self.recipe_sets.append([])

def flush_sets(self):
sets = self.recipe_sets
batches = [[]]
for set in sets:
if len(batches[-1]) + len(set) > 500:
batches.append(set.copy())
else:
batches[-1].extend(set)
for batch in batches:
if len(batch):
self.filenum += 1
filename = Path(self.target_path) / f"{self.filenum}.composite.json"
self.save_batch(filename, batch)

def save_batch(self, filename, batch):
while self.recipe_sets:
next_set = self.recipe_sets.pop(0)
if len(self.current_batch) + len(next_set) > 500:
self.flush_batch()
self.current_batch.extend(next_set)

def flush_batch(self):
self.filenum += 1
filename = Path(self.target_path) / f"{self.filenum}.composite.json"

with open(filename, "w") as open_file, SalesforceCompositeAPIOutput(
open_file
) as out:
self.filenames.append(filename)
for tablename, row in batch:
print(len(self.current_batch))
for tablename, row in self.current_batch:
out.write_row(tablename, row)

self.current_batch = []


class Bundle(FileOutputStream):
def __init__(self, file, **kwargs):
super().__init__(file, **kwargs)
self.tempdir = TemporaryDirectory()
self.folder_os = Folder(self.tempdir.name)

def write_row(self, tablename: str, row_with_references: T.Dict) -> None:
self.folder_os.write_row(tablename, row_with_references)

def write_single_row(self, tablename: str, row: T.Dict) -> None:
assert 0, "Shouldn't be called. write_row should be called instead"

def complete_recipe(self, *args):
self.folder_os.complete_recipe()

def close(self):
self.folder_os.close()
data = self.organize_bundle()
self.write(json.dumps(data, indent=2))
self.tempdir.cleanup()
return super().close()

def organize_bundle(self):
files = Path(self.tempdir.name).glob("*.composite.json")
data = [file.read_text() for file in files]
assert data
return {"bundle_format": 1, "data": data}

0 comments on commit aeb5bb0

Please sign in to comment.