Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,60 @@ export const loadConnectorEntityFailure = createAction(
props<{ error: string }>()
);

/**
* Prompt user to confirm draining FlowFiles.
*/
export const promptDrainConnector = createAction(
`${PREFIX} Prompt Drain Connector`,
props<{ connector: ConnectorEntity }>()
);

/**
* Initiate draining of FlowFiles for a connector.
*/
export const drainConnector = createAction(`${PREFIX} Drain Connector`, props<{ connector: ConnectorEntity }>());

export const drainConnectorSuccess = createAction(
`${PREFIX} Drain Connector Success`,
props<{ connector: ConnectorEntity }>()
);

/**
* Cancel an ongoing drain operation (no confirmation needed).
*/
export const cancelConnectorDrain = createAction(
`${PREFIX} Cancel Connector Drain`,
props<{ connector: ConnectorEntity }>()
);

export const cancelConnectorDrainSuccess = createAction(
`${PREFIX} Cancel Connector Drain Success`,
props<{ connector: ConnectorEntity }>()
);

/**
* Start a connector (transition to RUNNING).
*/
export const startConnector = createAction(`${PREFIX} Start Connector`, props<{ connector: ConnectorEntity }>());

export const startConnectorSuccess = createAction(
`${PREFIX} Start Connector Success`,
props<{ connector: ConnectorEntity }>()
);

/**
* Stop a connector (transition to STOPPED).
*/
export const stopConnector = createAction(`${PREFIX} Stop Connector`, props<{ connector: ConnectorEntity }>());

export const stopConnectorSuccess = createAction(
`${PREFIX} Stop Connector Success`,
props<{ connector: ConnectorEntity }>()
);

/**
* API error during any connector action (drain, cancel drain, start, stop).
*/
export const connectorActionApiError = createAction(`${PREFIX} Connector Action Api Error`, props<{ error: string }>());

export const resetConnectorCanvasEntityState = createAction(`${PREFIX} Reset State`);
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,32 @@ import { TestBed } from '@angular/core/testing';
import { provideMockActions } from '@ngrx/effects/testing';
import { Action } from '@ngrx/store';
import { provideMockStore } from '@ngrx/store/testing';
import { firstValueFrom, of, ReplaySubject, throwError } from 'rxjs';
import { firstValueFrom, Observable, of, ReplaySubject, Subject, throwError } from 'rxjs';
import { HttpErrorResponse } from '@angular/common/http';
import { MatDialog } from '@angular/material/dialog';
import { YesNoDialog } from '@nifi/shared';
import { ConnectorCanvasEntityEffects } from './connector-canvas-entity.effects';
import { ConnectorService } from '../../service/connector.service';
import { ErrorHelper } from '../../../../service/error-helper.service';
import * as ConnectorCanvasActions from '../connector-canvas/connector-canvas.actions';
import * as ErrorActions from '../../../../state/error/error.actions';
import { ErrorContextKey } from '../../../../state/error';
import { selectConnectorCanvasEntity } from './connector-canvas-entity.selectors';
import { selectConnectorIdFromRoute } from '../connector-canvas/connector-canvas.selectors';
import {
cancelConnectorDrain,
cancelConnectorDrainSuccess,
connectorActionApiError,
drainConnector,
drainConnectorSuccess,
loadConnectorEntity,
loadConnectorEntityFailure,
loadConnectorEntitySuccess
loadConnectorEntitySuccess,
promptDrainConnector,
startConnector,
startConnectorSuccess,
stopConnector,
stopConnectorSuccess
} from './connector-canvas-entity.actions';
import { ConnectorEntity } from '@nifi/shared';
import type { Mock } from 'vitest';
Expand Down Expand Up @@ -59,19 +74,32 @@ describe('ConnectorCanvasEntityEffects', () => {

interface SetupOptions {
existingConnectorEntity?: ConnectorEntity | null;
connectorIdFromRoute?: string | null;
}

function createMockDialogRef(data: Record<string, Observable<unknown> | Subject<unknown>> = {}) {
return { componentInstance: data, afterClosed: () => new Subject<void>() };
}

async function setup(options: SetupOptions = {}) {
const actions$ = new ReplaySubject<Action>(1);

const mockConnectorService = {
getConnector: vi.fn().mockReturnValue(of(createMockConnectorEntity()))
getConnector: vi.fn().mockReturnValue(of(createMockConnectorEntity())),
drainConnector: vi.fn(),
cancelConnectorDrain: vi.fn(),
updateConnectorRunStatus: vi.fn()
};

const mockErrorHelper = {
getErrorString: vi.fn().mockReturnValue('Error message')
};

const mockDialog = {
open: vi.fn(),
closeAll: vi.fn()
};

await TestBed.configureTestingModule({
providers: [
ConnectorCanvasEntityEffects,
Expand All @@ -81,11 +109,16 @@ describe('ConnectorCanvasEntityEffects', () => {
{
selector: selectConnectorCanvasEntity,
value: options.existingConnectorEntity ?? null
},
{
selector: selectConnectorIdFromRoute,
value: options.connectorIdFromRoute ?? null
}
]
}),
{ provide: ConnectorService, useValue: mockConnectorService },
{ provide: ErrorHelper, useValue: mockErrorHelper }
{ provide: ErrorHelper, useValue: mockErrorHelper },
{ provide: MatDialog, useValue: mockDialog }
]
}).compileComponents();

Expand All @@ -95,7 +128,8 @@ describe('ConnectorCanvasEntityEffects', () => {
effects,
actions$,
mockConnectorService,
mockErrorHelper
mockErrorHelper,
mockDialog
};
}

Expand Down Expand Up @@ -184,4 +218,226 @@ describe('ConnectorCanvasEntityEffects', () => {
expect(mockErrorHelper.getErrorString).toHaveBeenCalledWith(errorResponse);
});
});

describe('promptDrainConnector$', () => {
it('should open a confirmation dialog and dispatch drainConnector on confirmation', () =>
new Promise<void>((resolve) => {
setup().then(({ effects, actions$, mockDialog }) => {
const mockDialogRef = createMockDialogRef({ yes: of(true) });
mockDialog.open.mockReturnValue(mockDialogRef);
const store = (effects as unknown as { store: { dispatch: (a: Action) => void } }).store;
const dispatchSpy = vi.spyOn(store, 'dispatch');

const entity = createMockConnectorEntity();
actions$.next(promptDrainConnector({ connector: entity }));

effects.promptDrainConnector$.subscribe(() => {
expect(mockDialog.open).toHaveBeenCalledWith(
YesNoDialog,
expect.objectContaining({
data: expect.objectContaining({
title: 'Drain Connector',
message: expect.stringContaining(
"Are you sure you want to drain connector 'Test Connector'"
)
})
})
);
expect(dispatchSpy).toHaveBeenCalledWith(drainConnector({ connector: entity }));
resolve();
});
});
}));
});

describe('drainConnector$', () => {
it('should dispatch drainConnectorSuccess when drain succeeds', async () => {
const refreshed = createMockConnectorEntity();
const { effects, actions$, mockConnectorService } = await setup();
(mockConnectorService.drainConnector as Mock).mockReturnValue(of(refreshed));

actions$.next(drainConnector({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.drainConnector$);
expect(result).toEqual(drainConnectorSuccess({ connector: refreshed }));
});

it('should dispatch connectorActionApiError when drain errors', async () => {
const errorResponse = new HttpErrorResponse({ error: 'Error', status: 500, statusText: 'ISE' });
const { effects, actions$, mockConnectorService } = await setup();
(mockConnectorService.drainConnector as Mock).mockReturnValue(throwError(() => errorResponse));

actions$.next(drainConnector({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.drainConnector$);
expect(result).toEqual(connectorActionApiError({ error: 'Error message' }));
});
});

describe('cancelConnectorDrain$', () => {
it('should dispatch cancelConnectorDrainSuccess when cancel succeeds', async () => {
const refreshed = createMockConnectorEntity();
const { effects, actions$, mockConnectorService } = await setup();
(mockConnectorService.cancelConnectorDrain as Mock).mockReturnValue(of(refreshed));

actions$.next(cancelConnectorDrain({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.cancelConnectorDrain$);
expect(result).toEqual(cancelConnectorDrainSuccess({ connector: refreshed }));
});

it('should dispatch connectorActionApiError when cancel errors', async () => {
const errorResponse = new HttpErrorResponse({ error: 'Error', status: 500, statusText: 'ISE' });
const { effects, actions$, mockConnectorService } = await setup();
(mockConnectorService.cancelConnectorDrain as Mock).mockReturnValue(throwError(() => errorResponse));

actions$.next(cancelConnectorDrain({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.cancelConnectorDrain$);
expect(result).toEqual(connectorActionApiError({ error: 'Error message' }));
});
});

describe('startConnector$', () => {
it('should dispatch startConnectorSuccess when start succeeds', async () => {
const refreshed = createMockConnectorEntity();
const { effects, actions$, mockConnectorService } = await setup();
(mockConnectorService.updateConnectorRunStatus as Mock).mockReturnValue(of(refreshed));

const request = createMockConnectorEntity();
actions$.next(startConnector({ connector: request }));

const result = await firstValueFrom(effects.startConnector$);
expect(result).toEqual(startConnectorSuccess({ connector: refreshed }));
expect(mockConnectorService.updateConnectorRunStatus).toHaveBeenCalledWith(request, 'RUNNING');
});

it('should dispatch connectorActionApiError when start errors', async () => {
const errorResponse = new HttpErrorResponse({ error: 'Error', status: 500, statusText: 'ISE' });
const { effects, actions$, mockConnectorService } = await setup();
(mockConnectorService.updateConnectorRunStatus as Mock).mockReturnValue(throwError(() => errorResponse));

actions$.next(startConnector({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.startConnector$);
expect(result).toEqual(connectorActionApiError({ error: 'Error message' }));
});
});

describe('stopConnector$', () => {
it('should dispatch stopConnectorSuccess when stop succeeds', async () => {
const refreshed = createMockConnectorEntity();
const { effects, actions$, mockConnectorService } = await setup();
(mockConnectorService.updateConnectorRunStatus as Mock).mockReturnValue(of(refreshed));

const request = createMockConnectorEntity();
actions$.next(stopConnector({ connector: request }));

const result = await firstValueFrom(effects.stopConnector$);
expect(result).toEqual(stopConnectorSuccess({ connector: refreshed }));
expect(mockConnectorService.updateConnectorRunStatus).toHaveBeenCalledWith(request, 'STOPPED');
});

it('should dispatch connectorActionApiError when stop errors', async () => {
const errorResponse = new HttpErrorResponse({ error: 'Error', status: 500, statusText: 'ISE' });
const { effects, actions$, mockConnectorService } = await setup();
(mockConnectorService.updateConnectorRunStatus as Mock).mockReturnValue(throwError(() => errorResponse));

actions$.next(stopConnector({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.stopConnector$);
expect(result).toEqual(connectorActionApiError({ error: 'Error message' }));
});
});

describe('refreshAfterAction$', () => {
it('should dispatch loadConnectorEntity on drainConnectorSuccess when connectorId is in route', async () => {
const { effects, actions$ } = await setup({ connectorIdFromRoute: 'connector-789' });

actions$.next(drainConnectorSuccess({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.refreshAfterAction$);
expect(result).toEqual(loadConnectorEntity({ connectorId: 'connector-789' }));
});

it('should dispatch loadConnectorEntity on cancelConnectorDrainSuccess when connectorId is in route', async () => {
const { effects, actions$ } = await setup({ connectorIdFromRoute: 'connector-789' });

actions$.next(cancelConnectorDrainSuccess({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.refreshAfterAction$);
expect(result).toEqual(loadConnectorEntity({ connectorId: 'connector-789' }));
});

it('should dispatch loadConnectorEntity on startConnectorSuccess when connectorId is in route', async () => {
const { effects, actions$ } = await setup({ connectorIdFromRoute: 'connector-789' });

actions$.next(startConnectorSuccess({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.refreshAfterAction$);
expect(result).toEqual(loadConnectorEntity({ connectorId: 'connector-789' }));
});

it('should dispatch loadConnectorEntity on stopConnectorSuccess when connectorId is in route', async () => {
const { effects, actions$ } = await setup({ connectorIdFromRoute: 'connector-789' });

actions$.next(stopConnectorSuccess({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.refreshAfterAction$);
expect(result).toEqual(loadConnectorEntity({ connectorId: 'connector-789' }));
});

it('should not dispatch when no connectorId in route', () =>
new Promise<void>((resolve) => {
setup({ connectorIdFromRoute: null }).then(({ effects, actions$ }) => {
let emissionCount = 0;
const subscription = effects.refreshAfterAction$.subscribe(() => {
emissionCount++;
});

actions$.next(drainConnectorSuccess({ connector: createMockConnectorEntity() }));

queueMicrotask(() => {
expect(emissionCount).toBe(0);
subscription.unsubscribe();
resolve();
});
});
}));
});

describe('connectorActionApiError$', () => {
it('should dispatch addBannerError with CONNECTOR_CANVAS context', async () => {
const { effects, actions$ } = await setup();

actions$.next(connectorActionApiError({ error: 'boom' }));

const result = await firstValueFrom(effects.connectorActionApiError$);
expect(result).toEqual(
ErrorActions.addBannerError({
errorContext: { errors: ['boom'], context: ErrorContextKey.CONNECTOR_CANVAS }
})
);
});
});

describe('clearCanvasErrorsOnAction$', () => {
const actionFactories = [
{ name: 'drainConnector', factory: drainConnector },
{ name: 'cancelConnectorDrain', factory: cancelConnectorDrain },
{ name: 'startConnector', factory: startConnector },
{ name: 'stopConnector', factory: stopConnector }
];

actionFactories.forEach(({ name, factory }) => {
it(`should dispatch clearBannerErrors on ${name}`, async () => {
const { effects, actions$ } = await setup();

actions$.next(factory({ connector: createMockConnectorEntity() }));

const result = await firstValueFrom(effects.clearCanvasErrorsOnAction$);
expect(result).toEqual(ErrorActions.clearBannerErrors({ context: ErrorContextKey.CONNECTOR_CANVAS }));
});
});
});
});
Loading
Loading