Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { JSONSchema7Definition } from "json-schema";
import { TestBed } from "@angular/core/testing";
import { WorkflowCompilingService } from "./workflow-compiling.service";
import { WorkflowActionService } from "../workflow-graph/model/workflow-action.service";
import { DynamicSchemaService } from "../dynamic-schema/dynamic-schema.service";
import { ValidationWorkflowService } from "../validation/validation-workflow.service";
import { OperatorMetadataService } from "../operator-metadata/operator-metadata.service";
import { StubOperatorMetadataService } from "../operator-metadata/stub-operator-metadata.service";
import { JointUIService } from "../joint-ui/joint-ui.service";
import { WorkflowUtilService } from "../workflow-graph/util/workflow-util.service";
import { UndoRedoService } from "../undo-redo/undo-redo.service";
import { mockPoint, mockScanPredicate } from "../workflow-graph/model/mock-workflow-data";
import { serializePortIdentity } from "../../../common/util/port-identity-serde";
import { commonTestImports, commonTestProviders } from "../../../common/testing/test-utils";

describe("WorkflowCompilingService.dropInvalidAttributeValues", () => {
// A schema shaped like the Aggregate operator after schema propagation has filled in the
// valid input attribute names ("col_y" is the only attribute available on the new input).
const aggregateSchema = (): JSONSchema7Definition =>
({
type: "object",
properties: {
groupByKeys: {
type: "array",
autofill: "attributeNameList",
items: { type: "string", enum: ["col_y", ""] },
},
aggregations: {
type: "array",
items: {
type: "object",
properties: {
attribute: { type: "string", autofill: "attributeName", enum: ["col_y"] },
aggFunction: { type: "string" },
resultAttribute: { type: "string" },
},
},
},
},
}) as unknown as JSONSchema7Definition;

it("drops list entries and resets single attributes that are no longer valid", () => {
const properties = {
groupByKeys: ["col_x", "col_y"],
aggregations: [{ attribute: "col_x", aggFunction: "sum", resultAttribute: "r" }],
};

const { value, changed } = WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), properties);

expect(changed).toBe(true);
expect(value.groupByKeys).toEqual(["col_y"]);
expect(value.aggregations[0].attribute).toBe("");
// non-attribute fields are preserved
expect(value.aggregations[0].aggFunction).toBe("sum");
expect(value.aggregations[0].resultAttribute).toBe("r");
// the input object is never mutated
expect(properties.groupByKeys).toEqual(["col_x", "col_y"]);
expect(properties.aggregations[0].attribute).toBe("col_x");
});

it("reports no change when all attribute references are valid", () => {
const properties = {
groupByKeys: ["col_y"],
aggregations: [{ attribute: "col_y", aggFunction: "sum", resultAttribute: "r" }],
};

const { value, changed } = WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), properties);

expect(changed).toBe(false);
expect(value).toBe(properties);
});

it("makes no change when the input schema (enum) is unknown", () => {
const schemaWithoutEnum: JSONSchema7Definition = {
type: "object",
properties: {
groupByKeys: {
type: "array",
autofill: "attributeNameList",
items: { type: "string" },
},
aggregations: {
type: "array",
items: {
type: "object",
properties: {
attribute: { type: "string", autofill: "attributeName" },
},
},
},
},
} as unknown as JSONSchema7Definition;

const properties = {
groupByKeys: ["col_x"],
aggregations: [{ attribute: "col_x" }],
};

const { value, changed } = WorkflowCompilingService.dropInvalidAttributeValues(schemaWithoutEnum, properties);

expect(changed).toBe(false);
expect(value).toBe(properties);
});

it("returns the value unchanged for non-object schemas or nullish values", () => {
// boolean schema (e.g. `additionalProperties: true`)
expect(WorkflowCompilingService.dropInvalidAttributeValues(true, { a: 1 })).toEqual({
value: { a: 1 },
changed: false,
});
// null / undefined values are not walked
expect(WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), null)).toEqual({
value: null,
changed: false,
});
expect(WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), undefined)).toEqual({
value: undefined,
changed: false,
});
});

it("skips schema properties that are absent from the value object", () => {
// the value is missing both `groupByKeys` and `aggregations` defined in the schema
const properties = { unrelated: "keep-me" };

const { value, changed } = WorkflowCompilingService.dropInvalidAttributeValues(aggregateSchema(), properties);

expect(changed).toBe(false);
expect(value).toBe(properties);
});
});

describe("WorkflowCompilingService schema propagation property cleanup", () => {
let service: WorkflowCompilingService;
let workflowActionService: WorkflowActionService;
let dynamicSchemaService: DynamicSchemaService;

beforeEach(() => {
TestBed.configureTestingModule({
imports: [...commonTestImports],
providers: [
{ provide: OperatorMetadataService, useClass: StubOperatorMetadataService },
JointUIService,
WorkflowActionService,
WorkflowUtilService,
UndoRedoService,
DynamicSchemaService,
ValidationWorkflowService,
WorkflowCompilingService,
...commonTestProviders,
],
});
service = TestBed.inject(WorkflowCompilingService);
workflowActionService = TestBed.inject(WorkflowActionService);
dynamicSchemaService = TestBed.inject(DynamicSchemaService);
});

it("drops operator property values that the propagated input schema no longer supports", () => {
const operatorID = mockScanPredicate.operatorID;
workflowActionService.addOperator(mockScanPredicate, mockPoint);

// give the operator a schema with attribute-autofill properties bound to input port 0
const baseSchema = dynamicSchemaService.getDynamicSchema(operatorID);
dynamicSchemaService.setDynamicSchema(operatorID, {
...baseSchema,
jsonSchema: {
type: "object",
properties: {
groupByKeys: {
type: "array",
autofill: "attributeNameList",
autofillAttributeOnPort: 0,
items: { type: "string" },
},
attribute: { type: "string", autofill: "attributeName", autofillAttributeOnPort: 0 },
},
} as any,
});

// stale references to "col_x", a column that does not exist on the new input
workflowActionService.setOperatorProperty(operatorID, { groupByKeys: ["col_x", "col_y"], attribute: "col_x" });

// the propagated input schema only contains "col_y"
vi.spyOn(service, "getOperatorInputSchemaMap").mockReturnValue({
[serializePortIdentity({ id: 0, internal: false })]: [{ attributeName: "col_y", attributeType: "string" }],
} as any);

// invoke the private propagation handler directly (normally triggered by a compile response)
(service as any).applySchemaPropagationResult();

const cleaned = workflowActionService.getTexeraGraph().getOperator(operatorID).operatorProperties;
expect(cleaned.groupByKeys).toEqual(["col_y"]);
expect(cleaned.attribute).toBe("");
});

it("leaves valid property values untouched", () => {
const operatorID = mockScanPredicate.operatorID;
workflowActionService.addOperator(mockScanPredicate, mockPoint);

const baseSchema = dynamicSchemaService.getDynamicSchema(operatorID);
dynamicSchemaService.setDynamicSchema(operatorID, {
...baseSchema,
jsonSchema: {
type: "object",
properties: {
attribute: { type: "string", autofill: "attributeName", autofillAttributeOnPort: 0 },
},
} as any,
});

workflowActionService.setOperatorProperty(operatorID, { attribute: "col_y" });

vi.spyOn(service, "getOperatorInputSchemaMap").mockReturnValue({
[serializePortIdentity({ id: 0, internal: false })]: [{ attributeName: "col_y", attributeType: "string" }],
} as any);

const setSpy = vi.spyOn(workflowActionService, "setOperatorProperty");
(service as any).applySchemaPropagationResult();

expect(setSpy).not.toHaveBeenCalled();
expect(workflowActionService.getTexeraGraph().getOperator(operatorID).operatorProperties.attribute).toBe("col_y");
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import { HttpClient, HttpHeaders } from "@angular/common/http";
import { Injectable } from "@angular/core";
import { JSONSchema7Definition } from "json-schema";
import { EMPTY, merge, Observable, ReplaySubject } from "rxjs";
import { CustomJSONSchema7 } from "src/app/workspace/types/custom-json-schema.interface";
import { AppSettings } from "../../../common/app-setting";
Expand Down Expand Up @@ -190,6 +191,19 @@ export class WorkflowCompilingService {
let newDynamicSchema: OperatorSchema;
if (inputSchema) {
newDynamicSchema = WorkflowCompilingService.setOperatorInputAttrs(currentDynamicSchema, inputSchema);

// Now that the list of input attributes is known, drop any operator property
// values that reference attributes which no longer exist in the input schema (e.g. a copy-pasted
// operator wired to a different upstream, or an operator re-wired to a new source). Otherwise
// these old references cause a compile error that survives even after clearing properties.
const operator = this.workflowActionService.getTexeraGraph().getOperator(operatorID);
const { value: cleanedProperties, changed } = WorkflowCompilingService.dropInvalidAttributeValues(
newDynamicSchema.jsonSchema,
operator.operatorProperties
);
if (changed) {
this.workflowActionService.setOperatorProperty(operatorID, cleanedProperties);
}
} else {
// otherwise, the input attributes of the operator is unknown
// if the operator is not a source operator, restore its original schema of input attributes
Expand Down Expand Up @@ -392,6 +406,75 @@ export class WorkflowCompilingService {
};
}

/**
* Walks an operator's property values with its json schema and drops any
* value that references an input attribute which is no longer valid.
*
* Only properties marked with an `autofill` annotation are affected, and only when the schema carries an
* `enum` of valid attribute names (i.e. the input schema is known). Two cases are handled:
* - `attributeName`: a single column name. Reset to "" if it's not in the enum.
* - `attributeNameList`: a list of column names. Filter out entries that aren't in the enum.
*
* Returns the (possibly new) properties object and whether anything changed.
*/
public static dropInvalidAttributeValues(
schema: JSONSchema7Definition | undefined,
value: any
): { value: any; changed: boolean } {
if (typeof schema !== "object" || schema === null || value === undefined || value === null) {
return { value, changed: false };
}
const s = schema as CustomJSONSchema7;

if (s.autofill === "attributeNameList") {
const itemEnum = (s.items as CustomJSONSchema7 | undefined)?.enum;
if (Array.isArray(value) && Array.isArray(itemEnum)) {
const filtered = value.filter(v => itemEnum.includes(v));
return { value: filtered, changed: filtered.length !== value.length };
}
return { value, changed: false };
}

if (s.autofill === "attributeName") {
if (Array.isArray(s.enum) && typeof value === "string" && !s.enum.includes(value)) {
return { value: "", changed: true };
}
return { value, changed: false };
}

// recurse into object properties
if (s.properties && typeof value === "object" && !Array.isArray(value)) {
let changed = false;
const newValue = { ...value };
Object.entries(s.properties).forEach(([key, propSchema]) => {
if (key in newValue) {
const res = WorkflowCompilingService.dropInvalidAttributeValues(propSchema, newValue[key]);
if (res.changed) {
newValue[key] = res.value;
changed = true;
}
}
});
return { value: changed ? newValue : value, changed };
}

// recurse into array items (only when items is a single schema, not a tuple schema)
if (s.items && !Array.isArray(s.items) && Array.isArray(value)) {
let changed = false;
const newArr = value.map(item => {
const res = WorkflowCompilingService.dropInvalidAttributeValues(s.items as JSONSchema7Definition, item);
if (res.changed) {
changed = true;
return res.value;
}
return item;
});
return { value: changed ? newArr : value, changed };
}

return { value, changed: false };
}

public static restoreOperatorInputAttrs(operatorSchema: OperatorSchema): OperatorSchema {
let newJsonSchema = operatorSchema.jsonSchema;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ export class DynamicSchemaService {
/**
* Sets the dynamic schema of an operator. If the new schema is different, also emit dynamic schema changed event.
*
* The new dynamic schema is validated against the current operator properties.
* If the changed new dynamic schema invalidates some property, then the invalid properties fields will be dropped.
*
* Note: dropping operator property values that the new schema invalidates (e.g. attribute references that no
* longer exist after schema propagation) is handled by WorkflowCompilingService.dropInvalidAttributeValues,
* which has access to the propagated input attributes.
*/
public setDynamicSchema(operatorID: string, dynamicSchema: OperatorSchema): void {
const currentDynamicSchema = this.dynamicSchemaMap.get(operatorID);
Expand Down
Loading