diff --git a/examples/salesforce/LoadCompositeAPIData.apex b/examples/salesforce/LoadCompositeAPIData.apex index 29d19db4..8913169c 100644 --- a/examples/salesforce/LoadCompositeAPIData.apex +++ b/examples/salesforce/LoadCompositeAPIData.apex @@ -6,21 +6,73 @@ public class LoadCompositeAPIData { - public void loadSingleJson(String url) { + // Load one of three JSON formats. + // + // 1. One with a top-level key called "tables" which links to other + // composite graph payload jsons, like this: + // + // https://gist.githubusercontent.com/prescod/6f3aebafde63971d1093549c3bef4e41/raw/8885df2618ece474c196d5d94b594dd2b5961c71/csvw_metadata.json + // + // Create the files with snowfakery.experimental.SalesforceCompositeAPIOutput.Folder + // + // 2. One with a top-level key called "data" which embeds compsite graph + // payloads as strings. Like this: + // + // https://gist.githubusercontent.com/prescod/6220fa27d8493be954be949c9f57f2b2/raw/b603a4d11ef20f0a30e79260322baa52f969068d/out.bundle.json' + // + // Create the files with snowfakery.experimental.SalesforceCompositeAPIOutput.Bundle + // + // 3. One which is just a single composite graph payload like this: + // + // https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json + // + // Which is recognizable by its top-level "graphs" key. + // + // Create the files with snowfakery.experimental.SalesforceCompositeAPIOutput + + public void loadJsonSet(String set_url){ + String json_record_sets = downloadJSON(set_url); + Map data = (Map)Json.deserializeUntyped(json_record_sets); + List tables = (List)data.get('tables'); + if(tables != null){ + loadDistributedJsonSet(tables); + return; + } + + List graph_jsons = (List)data.get('data'); + if(graph_jsons != null){ + loadBundledJsonSet(graph_jsons); + return; + } + + List graphs = (List)data.get('graphs'); + if(graphs != null){ + loadRecords(json_record_sets); + return; + } + + } + + // optimized method for a single composite graph (<500 records) + // This method doesn't parse the JSON to see what's in it. + public void loadSingleJsonGraphPayload(String url) { System.debug('Loading JSON ' + url); String json_records = downloadJSON(url); loadRecords(json_records); System.debug('Loaded JSON ' + url); } - public void loadJsonSet(String set_url){ - String json_record_sets = downloadJSON(set_url); - Map data = (Map)Json.deserializeUntyped(json_record_sets); - List tables = (List)data.get('tables'); + public void loadDistributedJsonSet(List tables){ for(Object table_url: tables){ Map url_obj = (Map) table_url; String url = (String)url_obj.get('url'); - loadSingleJson(url); + loadSingleJsonGraphPayload(url); + } + } + + public void loadBundledJsonSet(List graph_jsons){ + for(Object graph_json: graph_jsons){ + loadRecords((String)graph_json); } } @@ -41,6 +93,7 @@ public class LoadCompositeAPIData { request.setHeader('Authorization', 'OAuth ' + UserInfo.getSessionId()); request.setTimeout(120000); + System.debug(url); return h.send(request); } @@ -57,8 +110,8 @@ public class LoadCompositeAPIData { if(error!=null){ System.debug('Error: ' + error); - System.debug('DOWNLOADED Data'); - System.debug(response_body); + // System.debug('DOWNLOADED Data'); + // System.debug(response_body); CalloutException e = new CalloutException( error); throw e; } @@ -97,19 +150,25 @@ public class LoadCompositeAPIData { } } -JSONSObjectLoader loader = new JSONSObjectLoader(); LoadCompositeAPIData loader = new LoadCompositeAPIData(); -// load a single Composite Graph JSON -loader.loadSingleJson('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json'); +// load a single Composite Graph JSON Payload +loader.loadSingleJsonGraphPayload('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json'); + +// load anoter single one slightly less efficiently +loader.loadJsonSet('https://gist.githubusercontent.com/prescod/ffa992a7218906ab0dcf160b5d755259/raw/f9d40587a2ba9b04275241723637ed571bd55617/Graph%2520Gist'); -// load a set of 3 Composite Graph JSONs +// load a set of 3 Composite Graph JSONs in a distributed set loader.loadJsonSet('https://gist.githubusercontent.com/prescod/6f3aebafde63971d1093549c3bef4e41/raw/8885df2618ece474c196d5d94b594dd2b5961c71/csvw_metadata.json'); + +// load a single-file bundle of 3 Composite Graph JSONs +loader.loadJsonSet('https://gist.githubusercontent.com/prescod/6220fa27d8493be954be949c9f57f2b2/raw/b603a4d11ef20f0a30e79260322baa52f969068d/out.bundle.json'); + System.debug('SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! '); // experimentally, much more than 15 hits a cumulative // maximum time allotted for callout error // // for (Integer i = 0; i < 15; i++) { -// loader.loadSingleJson('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json'); +// loader.loadSingleJsonGraphPayload('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json'); // System.debug(i); // } diff --git a/snowfakery/experimental/SalesforceCompositeAPIOutput.py b/snowfakery/experimental/SalesforceCompositeAPIOutput.py index bdba4a95..62da3d92 100644 --- a/snowfakery/experimental/SalesforceCompositeAPIOutput.py +++ b/snowfakery/experimental/SalesforceCompositeAPIOutput.py @@ -20,6 +20,7 @@ import typing as T import datetime from pathlib import Path +from tempfile import TemporaryDirectory from snowfakery.output_streams import FileOutputStream, OutputStream @@ -62,6 +63,8 @@ def flatten( return "@{%s.id}" % target_reference def close(self, **kwargs) -> T.Optional[T.Sequence[str]]: + # NOTE: Could improve loading performance by breaking graphs up + # to allow server-side parallelization, but I'd risk locking issues data = {"graphs": [{"graphId": "graph", "compositeRequest": self.rows}]} self.write(json.dumps(data, indent=2)) return super().close() @@ -76,14 +79,19 @@ def __init__(self, output_folder, **kwargs): if not Path.exists(self.target_path): Path.mkdir(self.target_path, exist_ok=True) self.recipe_sets = [[]] + self.current_batch = [] self.filenum = 0 self.filenames = [] + def write_row(self, tablename: str, row_with_references: T.Dict) -> None: + self.recipe_sets[-1].append((tablename, row_with_references)) + def write_single_row(self, tablename: str, row: T.Dict) -> None: - self.recipe_sets[-1].append((tablename, row)) + assert 0, "Shouldn't be called. write_row should be called instead" def close(self, **kwargs) -> T.Optional[T.Sequence[str]]: self.flush_sets() + self.flush_batch() table_metadata = [{"url": str(filename)} for filename in self.filenames] metadata = { "@context": "http://www.w3.org/ns/csvw", @@ -95,29 +103,55 @@ def close(self, **kwargs) -> T.Optional[T.Sequence[str]]: return [f"Created {self.target_path}"] def complete_recipe(self, *args): - ready_rows = sum(len(s) for s in self.recipe_sets) - if ready_rows > 500: - self.flush_sets() + self.flush_sets() self.recipe_sets.append([]) def flush_sets(self): - sets = self.recipe_sets - batches = [[]] - for set in sets: - if len(batches[-1]) + len(set) > 500: - batches.append(set.copy()) - else: - batches[-1].extend(set) - for batch in batches: - if len(batch): - self.filenum += 1 - filename = Path(self.target_path) / f"{self.filenum}.composite.json" - self.save_batch(filename, batch) - - def save_batch(self, filename, batch): + while self.recipe_sets: + next_set = self.recipe_sets.pop(0) + if len(self.current_batch) + len(next_set) > 500: + self.flush_batch() + self.current_batch.extend(next_set) + + def flush_batch(self): + self.filenum += 1 + filename = Path(self.target_path) / f"{self.filenum}.composite.json" + with open(filename, "w") as open_file, SalesforceCompositeAPIOutput( open_file ) as out: self.filenames.append(filename) - for tablename, row in batch: + print(len(self.current_batch)) + for tablename, row in self.current_batch: out.write_row(tablename, row) + + self.current_batch = [] + + +class Bundle(FileOutputStream): + def __init__(self, file, **kwargs): + super().__init__(file, **kwargs) + self.tempdir = TemporaryDirectory() + self.folder_os = Folder(self.tempdir.name) + + def write_row(self, tablename: str, row_with_references: T.Dict) -> None: + self.folder_os.write_row(tablename, row_with_references) + + def write_single_row(self, tablename: str, row: T.Dict) -> None: + assert 0, "Shouldn't be called. write_row should be called instead" + + def complete_recipe(self, *args): + self.folder_os.complete_recipe() + + def close(self): + self.folder_os.close() + data = self.organize_bundle() + self.write(json.dumps(data, indent=2)) + self.tempdir.cleanup() + return super().close() + + def organize_bundle(self): + files = Path(self.tempdir.name).glob("*.composite.json") + data = [file.read_text() for file in files] + assert data + return {"bundle_format": 1, "data": data}