Skip to content

Commit

Permalink
refactor(backend): remove storage
Browse files Browse the repository at this point in the history
Remove in memory queue observable.
Move more info about the queue to the db.
Add queue as an observable only in the ws
layer, based on the db and events.
  • Loading branch information
Sikora00 authored and MaciejSikorski committed Oct 13, 2020
1 parent 7492c70 commit 654c800
Show file tree
Hide file tree
Showing 53 changed files with 236 additions and 427 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- name: npm install
run: docker exec sdj-e2e-backend bash -c "npm ci"
- name: affected:e2e
run: docker exec sdj-e2e-backend bash -c "npm run affected:e2e -- --parallel --base origin/${BASE_BRANCH}"
run: docker exec sdj-e2e-backend bash -c "npm run affected:e2e -- --base origin/${BASE_BRANCH}"
env:
DB_ROOT_PASSWORD: ${{ secrets.DB_PASSWORD }}
DB_HOST: ${{ secrets.DB_HOST }}
Expand Down
28 changes: 0 additions & 28 deletions angular.json
Original file line number Diff line number Diff line change
Expand Up @@ -482,34 +482,6 @@
}
}
},
"backend-radio-infrastructure": {
"root": "libs/backend/radio/infrastructure",
"sourceRoot": "libs/backend/radio/infrastructure/src",
"projectType": "library",
"schematics": {},
"architect": {
"lint": {
"builder": "@angular-devkit/build-angular:tslint",
"options": {
"tsConfig": [
"libs/backend/radio/infrastructure/tsconfig.lib.json",
"libs/backend/radio/infrastructure/tsconfig.spec.json"
],
"exclude": [
"**/node_modules/**",
"!libs/backend/radio/infrastructure/**/*"
]
}
},
"test": {
"builder": "@nrwl/jest:jest",
"options": {
"jestConfig": "libs/backend/radio/infrastructure/jest.config.js",
"passWithNoTests": true
}
}
}
},
"backend-shared-util-you-tube": {
"root": "libs/backend/shared/util-you-tube",
"sourceRoot": "libs/backend/shared/util-you-tube/src",
Expand Down
25 changes: 4 additions & 21 deletions apps/backend-e2e/src/integration/play-radio-queue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,7 @@ describe('Play Radio Queue', () => {
});
});

test('returns 10-sec-of-silence', (done) => {
redisSub.on('message', (channel, message) => {
expect(channel).toEqual(channelId);
expect(message).toEqual('10-sec-of-silence');
done();
});
redisPub.publish('getNext', channelId);
});

test('returns emits playRadio on second silence', (done) => {
test('returns 10-sec-of-silence and emits playRadio', (done) => {
Promise.all([
new Promise((resolve) => {
redisSub.on('message', (channel, message) => {
Expand Down Expand Up @@ -132,23 +123,15 @@ describe('Play Radio Queue', () => {
redisSub.once('message', (channel, message) => {
expect(channel).toEqual(channelId);
expect(message).toEqual('_D1rrdFcj1U');
resolve();
// @ToDo play is an event so it is possible that it emits by web socket before the track will be updated and the next test will fall
setTimeout(() => resolve(), 100);
});
}),
]).then(() => done());
redisPub.publish('getNext', channelId);
});

test('returns silence when queue is empty', (done) => {
redisSub.once('message', (channel, message) => {
expect(channel).toEqual(channelId);
expect(message).toEqual('10-sec-of-silence');
done();
});
redisPub.publish('getNext', channelId);
});

test('returns silence and emits playRadio on second silence playing', (done) => {
test('returns silence and emits playRadio when queue is empty', (done) => {
Promise.all([
new Promise((resolve) =>
socket.once(WebSocketEvents.playRadio, resolve)
Expand Down
5 changes: 0 additions & 5 deletions libs/backend/radio/core/application-services/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@ export * from './lib/commands/set-channel-default-stream/set-channel-default-str
export * from './lib/commands/thumb-down/thumb-down.command';
export * from './lib/commands/thumb-up/thumb-up.command';
export * from './lib/dtos/track-data.dto';
export * from './lib/events/play-queued-track/play-queued-track.event';
export * from './lib/events/play-silence/play-silence.event';
export * from './lib/events/pozdro/pozdro.event';
export * from './lib/events/queued-track-skiepped/queued-track-skipped.event';
export * from './lib/ports/track-data.service';
export * from './lib/ports/track.service';
export * from './lib/ports/store.port';
export * from './lib/queries/get-channels/get-channels.query';
export * from './lib/queries/get-channels/get-channels.read-model';
export * from './lib/radio.facade';
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import { ThumbUpHandler } from './commands/thumb-up/thumb-up.handler';
import { ChannelWillStartHandler } from './events/channel-will-start/channel-will-start.handler';
import { ChannelWillStopHandler } from './events/channel-will-stop/channel-will-stop.handler';
import { PlayQueuedTrackHandler } from './events/play-queued-track/play-queued-track.handler';
import { PlaySilenceHandler } from './events/play-silence/play-silence.handler';
import { SongVotedNegativelyHandler } from './events/song-voted-negatively/song-voted-negatively.handler';
import { GetChannelsHandler } from './queries/get-channels/get-channels.handler';
import { RadioFacade } from './radio.facade';
Expand Down Expand Up @@ -47,7 +46,6 @@ export const EventHandlers = [
ChannelWillStartHandler,
ChannelWillStopHandler,
PlayQueuedTrackHandler,
PlaySilenceHandler,
SongVotedNegativelyHandler,
];

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain';
import { Store } from '../../ports/store.port';
import { DeleteQueuedTrackCommand } from './delete-queued-track.command';

@CommandHandler(DeleteQueuedTrackCommand)
export class DeleteQueuedTrackHandler
implements ICommandHandler<DeleteQueuedTrackCommand> {
constructor(
private readonly queuedTrackRepository: QueuedTrackRepositoryInterface,
private readonly storageService: Store
private readonly queuedTrackRepository: QueuedTrackRepositoryInterface
) {}

async execute(command: DeleteQueuedTrackCommand): Promise<void> {
const queuedTrack = await this.queuedTrackRepository.findOneOrFail(
command.queuedTrackId
);
await this.storageService.removeFromQueue(queuedTrack);
await this.queuedTrackRepository.remove(queuedTrack);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs';

import { PlayQueuedTrackEvent } from '../../events/play-queued-track/play-queued-track.event';
import { PlayQueuedTrackEvent } from '@sdj/backend/radio/core/domain';
import { RadioFacade } from '../../radio.facade';
import { DownloadTrackCommand } from '../download-track/download-track.command';
import { DownloadAndPlayCommand } from './download-and-play.command';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventBus } from '@nestjs/cqrs';
import { EventPublisher } from '@nestjs/cqrs';
import { Test, TestingModule } from '@nestjs/testing';
import {
ChannelRepositoryInterface,
Expand All @@ -17,15 +17,16 @@ describe('PlayNextTrackOrSilenceHandler', () => {
let radioFacade: Mocked<RadioFacade>;
let queuedTrackRepository: Mocked<QueuedTrackRepositoryInterface>;
let trackRepository: Mocked<TrackRepositoryInterface>;
let publisher: Mocked<EventPublisher>;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
PlayNextTrackOrSilenceHandler,
{ provide: RadioFacade, useValue: createSpyObj(RadioFacade) },
{
provide: EventBus,
useValue: createSpyObj(EventBus),
provide: EventPublisher,
useValue: createSpyObj(EventPublisher),
},
{
provide: ChannelRepositoryInterface,
Expand All @@ -45,6 +46,7 @@ describe('PlayNextTrackOrSilenceHandler', () => {
channelRepository = module.get(ChannelRepositoryInterface);
queuedTrackRepository = module.get(QueuedTrackRepositoryInterface);
radioFacade = module.get(RadioFacade);
publisher = module.get(EventPublisher);
service = module.get<PlayNextTrackOrSilenceHandler>(
PlayNextTrackOrSilenceHandler
);
Expand All @@ -58,6 +60,7 @@ describe('PlayNextTrackOrSilenceHandler', () => {
test('#execute triggers download an play track if is in queue', async () => {
channelRepository.findOrCreate = jest.fn();
channelRepository.findOrCreate.mockResolvedValue({ id: '1234' } as any);
publisher.mergeObjectContext.mockImplementation((x) => x);

radioFacade.downloadAndPlay.mockResolvedValue({});

Expand All @@ -77,6 +80,7 @@ describe('PlayNextTrackOrSilenceHandler', () => {
queuedTrackRepository.getNextSongInQueue.mockResolvedValue(null);
queuedTrackRepository.findOneOrFail = jest.fn();
queuedTrackRepository.findOneOrFail.mockResolvedValue({ id: 2 } as any);
publisher.mergeObjectContext.mockImplementation((x) => x);

appConfig.trackLengthToStartOwnRadio = 40;
trackRepository.countTracks = jest.fn();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs';
import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs';
import {
ChannelRepositoryInterface,
QueuedTrack,
QueuedTrackRepositoryInterface,
TrackRepositoryInterface,
} from '@sdj/backend/radio/core/domain';
import { appConfig } from '@sdj/backend/shared/domain';
import { PlaySilenceEvent } from '../../events/play-silence/play-silence.event';
import { RadioFacade } from '../../radio.facade';
import { DeleteQueuedTrackCommand } from '../delete-queued-track/delete-queued-track.command';
import { DownloadAndPlayCommand } from '../download-and-play/download-and-play.command';
Expand All @@ -17,16 +16,16 @@ import { PlayNextTrackOrSilenceCommand } from './play-next-track-or-silence.comm
export class PlayNextTrackOrSilenceHandler
implements ICommandHandler<PlayNextTrackOrSilenceCommand> {
constructor(
private eventBus: EventBus,
private channelRepository: ChannelRepositoryInterface,
private publisher: EventPublisher,
private radioFacade: RadioFacade,
private queuedTrackRepository: QueuedTrackRepositoryInterface,
private trackRepository: TrackRepositoryInterface
) {}

async execute(command: PlayNextTrackOrSilenceCommand): Promise<any> {
const channel = await this.channelRepository.findOrCreate(
command.channelId
const channel = this.publisher.mergeObjectContext(
await this.channelRepository.findOrCreate(command.channelId)
);
const queuedTrack = await this.getNextTrack(channel.id);
if (queuedTrack) {
Expand All @@ -41,7 +40,9 @@ export class PlayNextTrackOrSilenceHandler
await this.execute(command);
}
} else {
this.eventBus.publish(new PlaySilenceEvent(channel.id));
channel.playSilence();
channel.commit();
await this.channelRepository.save(channel);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import { CommandHandler, ICommandHandler } from '@nestjs/cqrs';
import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs';
import {
ChannelRepositoryInterface,
QueuedTrack,
QueuedTrackRepositoryInterface,
RadioAggregate,
TrackRepositoryInterface,
} from '@sdj/backend/radio/core/domain';
import { appConfig } from '@sdj/backend/shared/domain';
import { Store } from '../../ports/store.port';

import { QueueTrackCommand } from './queue-track.command';

@CommandHandler(QueueTrackCommand)
export class QueueTrackHandler implements ICommandHandler<QueueTrackCommand> {
constructor(
private readonly storageService: Store,
private channelRepository: ChannelRepositoryInterface,
private queuedTrackRepository: QueuedTrackRepositoryInterface,
private readonly trackRepository: TrackRepositoryInterface
private readonly trackRepository: TrackRepositoryInterface,
private publisher: EventPublisher
) {}

async execute(command: QueueTrackCommand): Promise<QueuedTrack> {
Expand All @@ -39,15 +39,16 @@ export class QueueTrackHandler implements ICommandHandler<QueueTrackCommand> {
);
}
}
let queuedTrack = new QueuedTrack(
const radio = this.publisher.mergeObjectContext(new RadioAggregate());
let queuedTrack = radio.queueTrack(
track,
channel,
command.randomized,
command.addedBy
);

queuedTrack = await this.queuedTrackRepository.save(queuedTrack);
await this.storageService.addToQueue(queuedTrack);
radio.commit();
return queuedTrack;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs';
import {
QueuedTrackRepositoryInterface,
QueuedTrackSkippedEvent,
TrackRepositoryInterface,
} from '@sdj/backend/radio/core/domain';
import { HostService } from '@sdj/backend/shared/application-services';
import { QueuedTrackSkippedEvent } from '../../events/queued-track-skiepped/queued-track-skipped.event';
import { SkipQueuedTrackCommand } from './skip-queued-track.command';

@CommandHandler(SkipQueuedTrackCommand)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs';
import {
QueuedTrackRepositoryInterface,
SongVotedNegativelyEvent,
User,
UserRepositoryInterface,
Vote,
VoteRepositoryInterface,
} from '@sdj/backend/radio/core/domain';
import { SongVotedNegativelyEvent } from '../../events/song-voted-negatively/song-voted-negatively.event';
import { ThumbDownCommand } from './thumb-down.command';

@CommandHandler(ThumbDownCommand)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,30 @@
import { EventsHandler, IEventHandler } from '@nestjs/cqrs';
import { EventPublisher, EventsHandler, IEventHandler } from '@nestjs/cqrs';
import {
QueuedTrack,
ChannelRepositoryInterface,
PlayQueuedTrackEvent,
QueuedTrackRepositoryInterface,
} from '@sdj/backend/radio/core/domain';
import { Store } from '../../ports/store.port';
import { PlayQueuedTrackEvent } from './play-queued-track.event';

@EventsHandler(PlayQueuedTrackEvent)
export class PlayQueuedTrackHandler
implements IEventHandler<PlayQueuedTrackEvent> {
constructor(
private readonly storageService: Store,
private queuedTrackRepository: QueuedTrackRepositoryInterface
private queuedTrackRepository: QueuedTrackRepositoryInterface,
private channelRepository: ChannelRepositoryInterface,
private publisher: EventPublisher
) {}

async handle(event: PlayQueuedTrackEvent): Promise<unknown> {
async handle(event: PlayQueuedTrackEvent): Promise<void> {
const queuedTrack = await this.queuedTrackRepository.findOneOrFail(
event.queuedTrackId
);
const channelId = queuedTrack.playedIn.id;
const prevTrack = await this.storageService.getCurrentTrack(channelId);
if (prevTrack) {
await this.storageService.removeFromQueue(prevTrack);
}
await this.storageService.setCurrentTrack(channelId, queuedTrack);
await this.updateQueuedTrackPlayedAt(queuedTrack);
return this.storageService.setSilenceCount(channelId, 0);
}

updateQueuedTrackPlayedAt(
queuedTrack: QueuedTrack,
playedAt?: Date
): Promise<QueuedTrack> {
queuedTrack.playedAt = playedAt || new Date();
return this.queuedTrackRepository.save(queuedTrack);
const channel = this.publisher.mergeObjectContext(
await this.channelRepository.findOrFail(channelId)
);
channel.play(queuedTrack);
await this.channelRepository.save(channel);
await this.queuedTrackRepository.save(queuedTrack);
channel.commit();
}
}

This file was deleted.

0 comments on commit 654c800

Please sign in to comment.