Skip to content

Commit

Permalink
Intersection from Hybrid to AtMostOne
Browse files Browse the repository at this point in the history
  • Loading branch information
pierremotard committed Jun 18, 2021
1 parent 242cbc5 commit 01005f1
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2135,9 +2135,9 @@ private static BuiltinFunction createBuiltinFunction(
"intersect"
),
"item*",
"object+",
"object",
ObjectIntersectFunctionIterator.class,
BuiltinFunction.BuiltinFunctionExecutionMode.INHERIT_FROM_FIRST_ARGUMENT
BuiltinFunction.BuiltinFunctionExecutionMode.LOCAL
);
/**
* function that projects objects by filtering their pairs and leaves non-objects intact
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
import org.rumbledb.api.Item;
import org.rumbledb.context.DynamicContext;
import org.rumbledb.exceptions.ExceptionMetadata;
import org.rumbledb.exceptions.IteratorFlowException;
import org.rumbledb.expressions.ExecutionMode;
import org.rumbledb.items.ItemFactory;
import org.rumbledb.runtime.HybridRuntimeIterator;
import org.rumbledb.runtime.AtMostOneItemLocalRuntimeIterator;
import org.rumbledb.runtime.RuntimeIterator;

import sparksoniq.spark.SparkSessionManager;
Expand All @@ -41,91 +40,65 @@
import java.util.LinkedHashMap;
import java.util.List;

public class ObjectIntersectFunctionIterator extends HybridRuntimeIterator {
public class ObjectIntersectFunctionIterator extends AtMostOneItemLocalRuntimeIterator {
/**
*
*/
private static final long serialVersionUID = 1L;
private RuntimeIterator iterator;

public ObjectIntersectFunctionIterator(
List<RuntimeIterator> arguments,
List<RuntimeIterator> children,
ExecutionMode executionMode,
ExceptionMetadata iteratorMetadata
) {
super(arguments, executionMode, iteratorMetadata);
this.iterator = arguments.get(0);
super(children, executionMode, iteratorMetadata);
}

@Override
public Item nextLocal() {
if (this.hasNext) {
List<Item> items = this.iterator.materialize(this.currentDynamicContextForLocalExecution);
LinkedHashMap<String, List<Item>> keyValuePairs = new LinkedHashMap<>();
boolean firstItem = true;
for (Item item : items) {
// ignore non-object items
if (item.isObject()) {
if (firstItem) {
// add all key-value pairs of the first item
for (String key : item.getKeys()) {
public Item materializeFirstItemOrNull(DynamicContext context) {
List<Item> items = this.children.get(0).materialize(context);
LinkedHashMap<String, List<Item>> keyValuePairs = new LinkedHashMap<>();
boolean firstItem = true;
for (Item item : items) {
// ignore non-object items
if (item.isObject()) {
if (firstItem) {
// add all key-value pairs of the first item
for (String key : item.getKeys()) {
Item value = item.getItemByKey(key);
ArrayList<Item> valueList = new ArrayList<>();
valueList.add(value);
keyValuePairs.put(key, valueList);
}
firstItem = false;
} else {
// iterate over existing keys in the map of results
Iterator<String> keyIterator = keyValuePairs.keySet().iterator();
while (keyIterator.hasNext()) {
String key = keyIterator.next();
// if the new item doesn't contain the same keys
if (!item.getKeys().contains(key)) {
// remove the key from the map
keyIterator.remove();
} else {
// add the matching key's value to the list
Item value = item.getItemByKey(key);
ArrayList<Item> valueList = new ArrayList<>();
valueList.add(value);
keyValuePairs.put(key, valueList);
}
firstItem = false;
} else {
// iterate over existing keys in the map of results
Iterator<String> keyIterator = keyValuePairs.keySet().iterator();
while (keyIterator.hasNext()) {
String key = keyIterator.next();
// if the new item doesn't contain the same keys
if (!item.getKeys().contains(key)) {
// remove the key from the map
keyIterator.remove();
} else {
// add the matching key's value to the list
Item value = item.getItemByKey(key);
keyValuePairs.get(key).add(value);
}
keyValuePairs.get(key).add(value);
}
}
}
}

Item result = ItemFactory.getInstance().createObjectItem(keyValuePairs);

this.hasNext = false;
return result;
}
throw new IteratorFlowException(
RuntimeIterator.FLOW_EXCEPTION_MESSAGE + " INTERSECT function",
getMetadata()
);
}

@Override
protected void openLocal() {
}
Item result = ItemFactory.getInstance().createObjectItem(keyValuePairs);

@Override
protected void closeLocal() {
return result;
}

@Override
protected void resetLocal() {
}

@Override
protected boolean hasNextLocal() {
return this.hasNext;
}

@Override
// @Override
protected JavaRDD<Item> getRDDAux(DynamicContext context) {
// Enclose object values into arrays.
JavaRDD<Item> childRDD = this.iterator.getRDD(context);
JavaRDD<Item> childRDD = this.children.get(0).getRDD(context);
Function<Item, Item> mapTransformation = new ObjectIntersectMapClosure();
JavaRDD<Item> mapResult = childRDD.map(mapTransformation);

Expand Down

0 comments on commit 01005f1

Please sign in to comment.