Skip to content

Commit

Permalink
Support multi-file datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Prescod committed May 19, 2022
1 parent ee22b9f commit 9f98330
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 27 deletions.
49 changes: 31 additions & 18 deletions examples/salesforce/LoadCompositeAPIData.apex
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,31 @@


public class LoadCompositeAPIData {
public void loadJson(String url) {

public void loadSingleJson(String url) {
System.debug('Loading JSON ' + url);
String json_records = downloadJSON(url);
loadRecords(json_records);
System.debug('Loaded JSON ' + url);
}

public String downloadJSON(String url){
public void loadJsonSet(String set_url){
String json_record_sets = downloadJSON(set_url);
Map<String, Object> data = (Map<String, Object>)Json.deserializeUntyped(json_record_sets);
List<Object> tables = (List<Object>)data.get('tables');
for(Object table_url: tables){
Map<String, Object> url_obj = (Map<String, Object>) table_url;
String url = (String)url_obj.get('url');
loadSingleJson(url);
}
}

private String downloadJSON(String url){
HttpResponse response = makeHTTPCall('GET', url, null);
return response.getBody();
}

public HttpResponse makeHTTPCall(String method, String url, String post_body){
private HttpResponse makeHTTPCall(String method, String url, String post_body){
Http h = new Http();
HttpRequest request = new HttpRequest();
request.setEndpoint(url);
Expand All @@ -31,19 +44,15 @@ public class LoadCompositeAPIData {
return h.send(request);
}

public void loadRecords(String json_records){
private void loadRecords(String json_records){
String error = null;
String graph_url = System.URL.getSalesforceBaseUrl().toExternalForm() + '/services/data/v54.0/composite/graph';
HttpResponse response = makeHTTPCall('POST', graph_url, json_records);
String response_body = response.getBody();
if(response.getStatusCode()!=200){
error = 'Error creating objects! ' + response.getStatus() + ' ' + response_body;
}else{
try{
error = parseResponse(response_body);
}catch(Exception e){
error = 'Error creating objects! ' + e.getMessage();
}
error = parseResponse(response_body);
}

if(error!=null){
Expand Down Expand Up @@ -90,13 +99,17 @@ public class LoadCompositeAPIData {

JSONSObjectLoader loader = new JSONSObjectLoader();
LoadCompositeAPIData loader = new LoadCompositeAPIData();
// experimentally, much more than 20 hits a cumulative

// load a single Composite Graph JSON
loader.loadSingleJson('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json');

// load a set of 3 Composite Graph JSONs
loader.loadJsonSet('https://gist.githubusercontent.com/prescod/6f3aebafde63971d1093549c3bef4e41/raw/8885df2618ece474c196d5d94b594dd2b5961c71/csvw_metadata.json');
System.debug('SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! SUCCESS! ');
// experimentally, much more than 15 hits a cumulative
// maximum time allotted for callout error
for (Integer i = 0; i < 20;) {
loader.LoadJson('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json');
System.debug(++i);
loader.LoadJson('https://gist.githubusercontent.com/prescod/86a64c63fe294b8123b895b489852e6f/raw/e8ae51b7f5af5db6b5b800a3f9c40ebd0c7a1998/composite2.json');
System.debug(++i);
loader.LoadJson('https://gist.githubusercontent.com/prescod/21b4a0b9fe5cac171abb8f6a633cb39a/raw/cd7e0f1025a61946b4f923999afa546f099282fa/composite3.json');
System.debug(++i);
}
//
// for (Integer i = 0; i < 15; i++) {
// loader.loadSingleJson('https://gist.githubusercontent.com/prescod/13302ecbd08fc3fe92db7d6ee4614d25/raw/c88949d2170c7c11f94958ec672ec8b972cc10d4/composite.json');
// System.debug(i);
// }
4 changes: 4 additions & 0 deletions snowfakery/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ def _get_output_streams(dburls, output_files, output_format, output_folder):

if output_stream_cls.uses_folder:
output_streams.append(output_stream_cls(output_folder))
elif output_folder and str(output_folder) != "." and not output_files:
raise exc.DataGenError(
"--output-folder can only be used with --output-file=<something> or --output-format=csv"
)

if output_files:
for f in output_files:
Expand Down
8 changes: 0 additions & 8 deletions snowfakery/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,14 +276,6 @@ def validate_options(
"Sorry, you need to pick --dburl or --output-file "
"because they are mutually exclusive."
)
if (
output_folder
and str(output_folder) != "."
and not (output_files or output_format == "csv")
):
raise click.ClickException(
"--output-folder can only be used with --output-file=<something> or --output-format=csv"
)

if target_number and reps:
raise click.ClickException(
Expand Down
2 changes: 2 additions & 0 deletions snowfakery/data_generator_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ def loop_over_templates_until_finished(self, continuing):
self.iteration_count += 1
continuing = True
self.globals.reset_slots()
# let the output stream know that the recipe was finished
self.output_stream.complete_recipe()

def loop_over_templates_once(self, statement_list, continuing: bool):
for statement in statement_list:
Expand Down
61 changes: 60 additions & 1 deletion snowfakery/experimental/SalesforceCompositeAPIOutput.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
#
# Note that Salesforce will complain if the dataset has more than 500 rows.

# TODO: Add tests

import json
import typing as T
import datetime
from pathlib import Path

from snowfakery.output_streams import FileOutputStream
from snowfakery.output_streams import FileOutputStream, OutputStream


class SalesforceCompositeAPIOutput(FileOutputStream):
Expand Down Expand Up @@ -62,3 +65,59 @@ def close(self, **kwargs) -> T.Optional[T.Sequence[str]]:
data = {"graphs": [{"graphId": "graph", "compositeRequest": self.rows}]}
self.write(json.dumps(data, indent=2))
return super().close()


class Folder(OutputStream):
uses_folder = True

def __init__(self, output_folder, **kwargs):
super().__init__(None, **kwargs)
self.target_path = Path(output_folder)
if not Path.exists(self.target_path):
Path.mkdir(self.target_path, exist_ok=True)
self.recipe_sets = [[]]
self.filenum = 0
self.filenames = []

def write_single_row(self, tablename: str, row: T.Dict) -> None:
self.recipe_sets[-1].append((tablename, row))

def close(self, **kwargs) -> T.Optional[T.Sequence[str]]:
self.flush_sets()
table_metadata = [{"url": str(filename)} for filename in self.filenames]
metadata = {
"@context": "http://www.w3.org/ns/csvw",
"tables": table_metadata,
}
metadata_filename = self.target_path / "csvw_metadata.json"
with open(metadata_filename, "w") as f:
json.dump(metadata, f, indent=2)
return [f"Created {self.target_path}"]

def complete_recipe(self, *args):
ready_rows = sum(len(s) for s in self.recipe_sets)
if ready_rows > 500:
self.flush_sets()
self.recipe_sets.append([])

def flush_sets(self):
sets = self.recipe_sets
batches = [[]]
for set in sets:
if len(batches[-1]) + len(set) > 500:
batches.append(set.copy())
else:
batches[-1].extend(set)
for batch in batches:
if len(batch):
self.filenum += 1
filename = Path(self.target_path) / f"{self.filenum}.composite.json"
self.save_batch(filename, batch)

def save_batch(self, filename, batch):
with open(filename, "w") as open_file, SalesforceCompositeAPIOutput(
open_file
) as out:
self.filenames.append(filename)
for tablename, row in batch:
out.write_row(tablename, row)
5 changes: 5 additions & 0 deletions snowfakery/output_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ def __enter__(self, *args):
def __exit__(self, *args):
self.close()

def complete_recipe(self, *args):
"""Let the output stream know that a complete recipe
set was generated."""
pass


class SmartStream:
"""Common code for managing stream/file opening/closing
Expand Down

0 comments on commit 9f98330

Please sign in to comment.