Skip to content

Commit

Permalink
Merge pull request #1117 from apache/SP-1114
Browse files Browse the repository at this point in the history
[#1114] Adapters can now be added without starting them
  • Loading branch information
tenthe committed Jan 19, 2023
2 parents f582f86 + 6193611 commit c1ac05a
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 109 deletions.
Expand Up @@ -29,19 +29,18 @@
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.adapter.AdapterStreamDescription;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.util.ElementIdGenerator;
import org.apache.streampipes.resource.management.AdapterResourceManager;
import org.apache.streampipes.resource.management.DataStreamResourceManager;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.impl.AdapterInstanceStorageImpl;

import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.List;
import java.util.UUID;

/**
* This class is responsible for managing all the adapter instances which are executed on worker nodes
Expand Down Expand Up @@ -74,10 +73,8 @@ public String addAdapter(AdapterDescription ad,
throws AdapterException {

// Create elementId for adapter
// TODO centralized provisioning of element id
String dataStreamElementId = "urn:streampipes.apache.org:eventstream:" + RandomStringUtils.randomAlphabetic(6);
String uuid = UUID.randomUUID().toString();
ad.setElementId(ad.getElementId() + ":" + uuid);
String dataStreamElementId = ElementIdGenerator.makeElementId(SpDataStream.class);
ad.setElementId(ElementIdGenerator.makeElementId(ad));
ad.setCreatedAt(System.currentTimeMillis());
ad.setCorrespondingDataStreamElementId(dataStreamElementId);

Expand All @@ -87,18 +84,13 @@ public String addAdapter(AdapterDescription ad,

String elementId = this.adapterResourceManager.encryptAndCreate(ad);

// start when stream adapter
if (ad instanceof AdapterStreamDescription) {
startStreamAdapter(elementId);
}

// Create stream
SpDataStream storedDescription = new SourcesManagement().createAdapterDataStream(ad, dataStreamElementId);
storedDescription.setCorrespondingAdapterId(elementId);
installDataSource(storedDescription, principalSid, true);
LOG.info("Install source (source URL: {} in backend", ad.getElementId());

return storedDescription.getElementId();
return ad.getElementId();
}

public void updateAdapter(AdapterDescription ad,
Expand Down Expand Up @@ -164,17 +156,6 @@ public List<AdapterDescription> getAllAdapterInstances() throws AdapterException
return allAdapters;
}

public List<AdapterDescription> getAllAdapterDescriptions() throws AdapterException {

List<AdapterDescription> allAdapters = adapterInstanceStorage.getAllAdapters();

if (allAdapters == null) {
throw new AdapterException("Could not get all adapters");
}

return allAdapters;
}

public void stopStreamAdapter(String elementId) throws AdapterException {
AdapterDescription ad = adapterInstanceStorage.getAdapter(elementId);

Expand Down Expand Up @@ -212,11 +193,11 @@ public void startStreamAdapter(String elementId) throws AdapterException {
}
}

private void updateDataSource(AdapterDescription ad) throws AdapterException {
private void updateDataSource(AdapterDescription ad) {
// get data source
SpDataStream dataStream = this.dataStreamResourceManager.find(ad.getCorrespondingDataStreamElementId());

dataStream = SourcesManagement.updateDataStream(ad, dataStream);
SourcesManagement.updateDataStream(ad, dataStream);

// Update data source in database
this.dataStreamResourceManager.update(dataStream);
Expand Down
Expand Up @@ -86,8 +86,8 @@ public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema());


executor = Executors.newScheduledThreadPool(1);
var eventProcessor = new LocalEventProcessor(adapterPipeline, timestampKey);
executor = Executors.newScheduledThreadPool(1);

executor.scheduleAtFixedRate(() -> {
try (InputStream dataInputStream = getDataFromEndpoint()) {
Expand Down Expand Up @@ -132,7 +132,8 @@ public Boolean emit(byte[] event) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
logger.info("File stream adapter was stopped, the current replay is interuppted", e);
return false;
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions ui/cypress/support/utils/connect/ConnectBtns.ts
Expand Up @@ -47,8 +47,7 @@ export class ConnectBtns {
.parent();
}

// =================================================================
// Format button options
// ===================== Format button options ==========================
public static json() {
return cy.dataCy('connect-select-json-formats');
}
Expand Down Expand Up @@ -82,4 +81,15 @@ export class ConnectBtns {
}

// =================================================================

// ===================== Adapter settings btns ==========================
public static adapterSettingsStartAdapter() {
return cy.dataCy('adapter-settings-start-adapter-btn');
}

public static startAdapterNowCheckbox() {
return cy.dataCy('start-adapter-now-checkbox');
}

// ========================================================================
}
55 changes: 46 additions & 9 deletions ui/cypress/support/utils/connect/ConnectUtils.ts
Expand Up @@ -30,6 +30,8 @@ import { ConnectBtns } from './ConnectBtns';
export class ConnectUtils {
public static testSpecificStreamAdapter(
adapterConfiguration: SpecificAdapterInput,
starting = true,
successElement = 'sp-connect-adapter-live-preview',
) {
ConnectUtils.goToConnect();

Expand All @@ -53,7 +55,11 @@ export class ConnectUtils {

ConnectEventSchemaUtils.finishEventSchemaConfiguration();

ConnectUtils.startStreamAdapter(adapterConfiguration);
ConnectUtils.startStreamAdapter(
adapterConfiguration,
starting,
successElement,
);
}

public static testGenericStreamAdapter(
Expand Down Expand Up @@ -188,21 +194,27 @@ export class ConnectUtils {
cy.get('#event-schema-next-button').click();
}

public static startStreamAdapter(adapterInput: AdapterInput) {
ConnectUtils.startAdapter(
adapterInput,
'sp-connect-adapter-live-preview',
);
public static startStreamAdapter(
adapterInput: AdapterInput,
starting = true,
successElement = 'sp-connect-adapter-live-preview',
) {
ConnectUtils.startAdapter(adapterInput, successElement, starting);
}

public static startSetAdapter(adapterInput: AdapterInput) {
ConnectUtils.startAdapter(
adapterInput,
'sp-connect-adapter-set-success',
true,
);
}

public static startAdapter(adapterInput: AdapterInput, successElement) {
public static startAdapter(
adapterInput: AdapterInput,
successElement: string,
starting: boolean,
) {
// Set adapter name
cy.dataCy('sp-adapter-name').type(adapterInput.adapterName);

Expand All @@ -215,8 +227,13 @@ export class ConnectUtils {
.click();
}

// Start adapter
cy.get('#button-startAdapter').click();
// Deselect auto start of adapter
if (!starting) {
ConnectBtns.startAdapterNowCheckbox().parent().click();
}

ConnectBtns.adapterSettingsStartAdapter().click();

cy.dataCy(successElement, { timeout: 60000 }).should('be.visible');

this.closeAdapterPreview();
Expand Down Expand Up @@ -269,6 +286,26 @@ export class ConnectUtils {
return adapterConfiguration;
}

public static startAndValidateAdapter(amountOfProperties: number) {
ConnectBtns.startAdapter().should('not.be.disabled');

ConnectBtns.startAdapter().click();

// View data
ConnectBtns.infoAdapter().click();
cy.get('div').contains('Values').parent().click();

// Validate resulting event
cy.dataCy('sp-connect-adapter-live-preview', { timeout: 10000 }).should(
'be.visible',
);

// validate that three event properties
cy.get('.preview-row', { timeout: 10000 })
.its('length')
.should('eq', amountOfProperties);
}

public static tearDownPreprocessingRuleTest(
adapterConfiguration: AdapterInput,
expectedFile: string,
Expand Down
47 changes: 47 additions & 0 deletions ui/cypress/tests/adapter/createAdapterWithoutStarting.spec.ts
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { ConnectUtils } from '../../support/utils/connect/ConnectUtils';
import { SpecificAdapterBuilder } from '../../support/builder/SpecificAdapterBuilder';

describe('Creates a new adapter without starting it', () => {
beforeEach('Setup Test', () => {
cy.initStreamPipesTest();
});

it('Perform Test', () => {
const adapterInput = SpecificAdapterBuilder.create(
'Machine_Data_Simulator',
)
.setName('Machine Data Simulator Test')
.addInput('input', 'wait-time-ms', '1000')
.build();

ConnectUtils.testSpecificStreamAdapter(
adapterInput,
false,
'sp-connect-adapter-not-started-success',
);

ConnectUtils.startAndValidateAdapter(7);

ConnectUtils.closeAdapterPreview();

ConnectUtils.deleteAdapter();
});
});
18 changes: 1 addition & 17 deletions ui/cypress/tests/adapter/editAdapter.smoke.spec.ts
Expand Up @@ -68,23 +68,7 @@ describe('Test Edit Adapter', () => {

ConnectUtils.closeAdapterPreview();

// Start Adapter
ConnectBtns.startAdapter().should('not.be.disabled');
ConnectBtns.startAdapter().click();

// View data
ConnectBtns.infoAdapter().click();
cy.get('div').contains('Values').parent().click();

// Validate resulting event
cy.dataCy('sp-connect-adapter-live-preview', { timeout: 10000 }).should(
'be.visible',
);

// validate that three event properties
cy.get('.preview-row', { timeout: 10000 })
.its('length')
.should('eq', 3);
ConnectUtils.startAndValidateAdapter(3);

// Validate that name of adapter and data source
cy.dataCy('adapter-name').contains(newAdapterName);
Expand Down
Expand Up @@ -82,8 +82,12 @@ export class AdapterService {
}

startAdapter(adapter: AdapterDescriptionUnion): Observable<Message> {
return this.startAdapterByElementId(adapter.elementId);
}

startAdapterByElementId(elementId: string): Observable<Message> {
return this.http
.post(this.adapterMasterUrl + adapter.elementId + '/start', {})
.post(this.adapterMasterUrl + elementId + '/start', {})
.pipe(map(response => Message.fromData(response as any)));
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
<div fxLayoutAlign="start start" class="mr-15">
<mat-checkbox
(change)="optionSelectedEmitter.emit($event.checked)"
[checked]="isChecked"
[attr.data-cy]="dataCy"
class="large-checkbox"
>
Expand Down
Expand Up @@ -16,7 +16,7 @@
*
*/

import { Component, EventEmitter, Input, OnInit, Output } from '@angular/core';
import { Component, EventEmitter, Input, Output } from '@angular/core';

@Component({
selector: 'sp-adapter-options-panel',
Expand All @@ -36,6 +36,9 @@ export class SpAdapterOptionsPanelComponent {
@Input()
dataCy: string;

@Input()
isChecked = false;

@Output()
optionSelectedEmitter: EventEmitter<boolean> = new EventEmitter<boolean>();
}
Expand Up @@ -47,6 +47,16 @@
</sp-basic-inner-panel>

<div fxFlex="100" fxLayout="column">
<sp-adapter-options-panel
optionTitle="Run adapter"
optionDescription="Start adapter now"
optionIcon="play_arrow"
dataCy="start-adapter-now-checkbox"
[isChecked]="startAdapterNow"
(optionSelectedEmitter)="startAdapterNow = $event"
>
</sp-adapter-options-panel>

<sp-adapter-options-panel
optionTitle="Remove Duplicates"
optionDescription="Avoid duplicated events within a certain time interval"
Expand Down Expand Up @@ -150,7 +160,7 @@
*ngIf="!isEditMode"
[disabled]="!startAdapterSettingsFormValid"
mat-raised-button
id="button-startAdapter"
data-cy="adapter-settings-start-adapter-btn"
color="accent"
(click)="startAdapter()"
mat-button
Expand Down
Expand Up @@ -88,6 +88,8 @@ export class StartAdapterConfigurationComponent implements OnInit {
saveInDataLake = false;
dataLakeTimestampField: string;

startAdapterNow = true;

constructor(
private dialogService: DialogService,
private shepherdService: ShepherdService,
Expand Down Expand Up @@ -172,6 +174,7 @@ export class StartAdapterConfigurationComponent implements OnInit {
saveInDataLake: this.saveInDataLake,
dataLakeTimestampField: this.dataLakeTimestampField,
editMode: false,
startAdapterNow: this.startAdapterNow,
},
});

Expand Down

0 comments on commit c1ac05a

Please sign in to comment.