From 654c80087bea9c2f2065a7647c8f49ccf8c0e133 Mon Sep 17 00:00:00 2001 From: Maciej Sikorski Date: Sun, 11 Oct 2020 13:17:49 +0200 Subject: [PATCH] refactor(backend): remove storage Remove in memory queue observable. Move more info about the queue to the db. Add queue as an observable only in the ws layer, based on the db and events. --- .github/workflows/pr.yml | 2 +- angular.json | 28 ---- .../src/integration/play-radio-queue.spec.ts | 25 +--- .../core/application-services/src/index.ts | 5 - ...-radio-core-application-services.module.ts | 2 - .../delete-queued-track.handler.ts | 5 +- .../download-and-play.handler.ts | 2 +- ...play-next-track-or-silence.handler.spec.ts | 10 +- .../play-next-track-or-silence.handler.ts | 13 +- .../queue-track/queue-track.handler.ts | 13 +- .../skip-queued-track.handler.ts | 2 +- .../commands/thumb-down/thumb-down.handler.ts | 2 +- .../play-queued-track.handler.ts | 36 ++--- .../play-silence/play-silence.handler.ts | 20 --- .../song-voted-negatively.handler.ts | 2 +- .../src/lib/ports/store.port.ts | 32 ----- .../src/lib/radio.facade.ts | 3 +- libs/backend/radio/core/domain/src/index.ts | 7 + .../domain/src/lib/entities/channel.entity.ts | 26 +++- .../src/lib/entities/queued-track.entity.ts | 4 + .../lib/events}/play-queued-track.event.ts | 0 .../src/lib/events}/play-silence.event.ts | 3 + .../src/lib/events}/pozdro.event.ts | 0 .../src/lib/events/queue-changed.event.ts | 3 + .../lib/events}/queued-track-skipped.event.ts | 0 .../events}/song-voted-negatively.event.ts | 0 .../core/domain/src/lib/radio.aggregate.ts | 21 +++ .../queued-track-repository.interface.ts | 2 + .../src/lib/backend-radio-feature.module.ts | 3 - .../typeorm-queued-track.repository.ts | 47 ++++--- libs/backend/radio/infrastructure/README.md | 7 - .../radio/infrastructure/jest.config.js | 10 -- .../backend/radio/infrastructure/src/index.ts | 2 - .../backend-radio-infrastructure.module.ts | 9 -- .../infrastructure/src/lib/store.adapter.ts | 133 ------------------ .../radio/infrastructure/tsconfig.json | 16 --- .../radio/infrastructure/tsconfig.lib.json | 9 -- .../radio/infrastructure/tsconfig.spec.json | 15 -- libs/backend/radio/infrastructure/tslint.json | 7 - .../ices-play-queued-track.handler.ts | 6 +- .../play-silence/ices-play-silence.handler.ts | 2 +- .../bot/lib/commands/pozdro.slack-command.ts | 6 +- .../slack-queued-track-skipped.handler.ts | 6 +- .../ws-play-queued-track.handler.ts | 6 +- .../play-silence/ws-play-silence.handler.ts | 19 +-- .../lib/events/pozdro/ws-pozdro.handler.ts | 2 +- .../ws-queued-changed.handler.ts | 15 ++ .../ui-web-socket/src/lib/gateway/gateway.ts | 18 +-- .../ws-queue-synchronization.service.ts | 43 ++++++ .../ui-web-socket/src/lib/websocket.module.ts | 5 +- nx.json | 3 - package.json | 3 +- tsconfig.base.json | 3 - 53 files changed, 236 insertions(+), 427 deletions(-) delete mode 100644 libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.handler.ts delete mode 100644 libs/backend/radio/core/application-services/src/lib/ports/store.port.ts rename libs/backend/radio/core/{application-services/src/lib/events/play-queued-track => domain/src/lib/events}/play-queued-track.event.ts (100%) rename libs/backend/radio/core/{application-services/src/lib/events/play-silence => domain/src/lib/events}/play-silence.event.ts (78%) rename libs/backend/radio/core/{application-services/src/lib/events/pozdro => domain/src/lib/events}/pozdro.event.ts (100%) create mode 100644 libs/backend/radio/core/domain/src/lib/events/queue-changed.event.ts rename libs/backend/radio/core/{application-services/src/lib/events/queued-track-skiepped => domain/src/lib/events}/queued-track-skipped.event.ts (100%) rename libs/backend/radio/core/{application-services/src/lib/events/song-voted-negatively => domain/src/lib/events}/song-voted-negatively.event.ts (100%) create mode 100644 libs/backend/radio/core/domain/src/lib/radio.aggregate.ts delete mode 100644 libs/backend/radio/infrastructure/README.md delete mode 100644 libs/backend/radio/infrastructure/jest.config.js delete mode 100644 libs/backend/radio/infrastructure/src/index.ts delete mode 100644 libs/backend/radio/infrastructure/src/lib/backend-radio-infrastructure.module.ts delete mode 100644 libs/backend/radio/infrastructure/src/lib/store.adapter.ts delete mode 100644 libs/backend/radio/infrastructure/tsconfig.json delete mode 100644 libs/backend/radio/infrastructure/tsconfig.lib.json delete mode 100644 libs/backend/radio/infrastructure/tsconfig.spec.json delete mode 100644 libs/backend/radio/infrastructure/tslint.json create mode 100644 libs/backend/radio/ui-web-socket/src/lib/events/queue-changed/ws-queued-changed.handler.ts create mode 100644 libs/backend/radio/ui-web-socket/src/lib/services/ws-queue-synchronization.service.ts diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 08376b3..6c8f243 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -34,7 +34,7 @@ jobs: - name: npm install run: docker exec sdj-e2e-backend bash -c "npm ci" - name: affected:e2e - run: docker exec sdj-e2e-backend bash -c "npm run affected:e2e -- --parallel --base origin/${BASE_BRANCH}" + run: docker exec sdj-e2e-backend bash -c "npm run affected:e2e -- --base origin/${BASE_BRANCH}" env: DB_ROOT_PASSWORD: ${{ secrets.DB_PASSWORD }} DB_HOST: ${{ secrets.DB_HOST }} diff --git a/angular.json b/angular.json index 309ce45..a9ccc3b 100644 --- a/angular.json +++ b/angular.json @@ -482,34 +482,6 @@ } } }, - "backend-radio-infrastructure": { - "root": "libs/backend/radio/infrastructure", - "sourceRoot": "libs/backend/radio/infrastructure/src", - "projectType": "library", - "schematics": {}, - "architect": { - "lint": { - "builder": "@angular-devkit/build-angular:tslint", - "options": { - "tsConfig": [ - "libs/backend/radio/infrastructure/tsconfig.lib.json", - "libs/backend/radio/infrastructure/tsconfig.spec.json" - ], - "exclude": [ - "**/node_modules/**", - "!libs/backend/radio/infrastructure/**/*" - ] - } - }, - "test": { - "builder": "@nrwl/jest:jest", - "options": { - "jestConfig": "libs/backend/radio/infrastructure/jest.config.js", - "passWithNoTests": true - } - } - } - }, "backend-shared-util-you-tube": { "root": "libs/backend/shared/util-you-tube", "sourceRoot": "libs/backend/shared/util-you-tube/src", diff --git a/apps/backend-e2e/src/integration/play-radio-queue.spec.ts b/apps/backend-e2e/src/integration/play-radio-queue.spec.ts index 9efa190..d2f2240 100644 --- a/apps/backend-e2e/src/integration/play-radio-queue.spec.ts +++ b/apps/backend-e2e/src/integration/play-radio-queue.spec.ts @@ -56,16 +56,7 @@ describe('Play Radio Queue', () => { }); }); - test('returns 10-sec-of-silence', (done) => { - redisSub.on('message', (channel, message) => { - expect(channel).toEqual(channelId); - expect(message).toEqual('10-sec-of-silence'); - done(); - }); - redisPub.publish('getNext', channelId); - }); - - test('returns emits playRadio on second silence', (done) => { + test('returns 10-sec-of-silence and emits playRadio', (done) => { Promise.all([ new Promise((resolve) => { redisSub.on('message', (channel, message) => { @@ -132,23 +123,15 @@ describe('Play Radio Queue', () => { redisSub.once('message', (channel, message) => { expect(channel).toEqual(channelId); expect(message).toEqual('_D1rrdFcj1U'); - resolve(); + // @ToDo play is an event so it is possible that it emits by web socket before the track will be updated and the next test will fall + setTimeout(() => resolve(), 100); }); }), ]).then(() => done()); redisPub.publish('getNext', channelId); }); - test('returns silence when queue is empty', (done) => { - redisSub.once('message', (channel, message) => { - expect(channel).toEqual(channelId); - expect(message).toEqual('10-sec-of-silence'); - done(); - }); - redisPub.publish('getNext', channelId); - }); - - test('returns silence and emits playRadio on second silence playing', (done) => { + test('returns silence and emits playRadio when queue is empty', (done) => { Promise.all([ new Promise((resolve) => socket.once(WebSocketEvents.playRadio, resolve) diff --git a/libs/backend/radio/core/application-services/src/index.ts b/libs/backend/radio/core/application-services/src/index.ts index 912cc7c..1468ff3 100644 --- a/libs/backend/radio/core/application-services/src/index.ts +++ b/libs/backend/radio/core/application-services/src/index.ts @@ -15,13 +15,8 @@ export * from './lib/commands/set-channel-default-stream/set-channel-default-str export * from './lib/commands/thumb-down/thumb-down.command'; export * from './lib/commands/thumb-up/thumb-up.command'; export * from './lib/dtos/track-data.dto'; -export * from './lib/events/play-queued-track/play-queued-track.event'; -export * from './lib/events/play-silence/play-silence.event'; -export * from './lib/events/pozdro/pozdro.event'; -export * from './lib/events/queued-track-skiepped/queued-track-skipped.event'; export * from './lib/ports/track-data.service'; export * from './lib/ports/track.service'; -export * from './lib/ports/store.port'; export * from './lib/queries/get-channels/get-channels.query'; export * from './lib/queries/get-channels/get-channels.read-model'; export * from './lib/radio.facade'; diff --git a/libs/backend/radio/core/application-services/src/lib/backend-radio-core-application-services.module.ts b/libs/backend/radio/core/application-services/src/lib/backend-radio-core-application-services.module.ts index ed3f36b..d9e4553 100644 --- a/libs/backend/radio/core/application-services/src/lib/backend-radio-core-application-services.module.ts +++ b/libs/backend/radio/core/application-services/src/lib/backend-radio-core-application-services.module.ts @@ -19,7 +19,6 @@ import { ThumbUpHandler } from './commands/thumb-up/thumb-up.handler'; import { ChannelWillStartHandler } from './events/channel-will-start/channel-will-start.handler'; import { ChannelWillStopHandler } from './events/channel-will-stop/channel-will-stop.handler'; import { PlayQueuedTrackHandler } from './events/play-queued-track/play-queued-track.handler'; -import { PlaySilenceHandler } from './events/play-silence/play-silence.handler'; import { SongVotedNegativelyHandler } from './events/song-voted-negatively/song-voted-negatively.handler'; import { GetChannelsHandler } from './queries/get-channels/get-channels.handler'; import { RadioFacade } from './radio.facade'; @@ -47,7 +46,6 @@ export const EventHandlers = [ ChannelWillStartHandler, ChannelWillStopHandler, PlayQueuedTrackHandler, - PlaySilenceHandler, SongVotedNegativelyHandler, ]; diff --git a/libs/backend/radio/core/application-services/src/lib/commands/delete-queued-track/delete-queued-track.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/delete-queued-track/delete-queued-track.handler.ts index 172493d..ff8c749 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/delete-queued-track/delete-queued-track.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/delete-queued-track/delete-queued-track.handler.ts @@ -1,21 +1,18 @@ import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain'; -import { Store } from '../../ports/store.port'; import { DeleteQueuedTrackCommand } from './delete-queued-track.command'; @CommandHandler(DeleteQueuedTrackCommand) export class DeleteQueuedTrackHandler implements ICommandHandler { constructor( - private readonly queuedTrackRepository: QueuedTrackRepositoryInterface, - private readonly storageService: Store + private readonly queuedTrackRepository: QueuedTrackRepositoryInterface ) {} async execute(command: DeleteQueuedTrackCommand): Promise { const queuedTrack = await this.queuedTrackRepository.findOneOrFail( command.queuedTrackId ); - await this.storageService.removeFromQueue(queuedTrack); await this.queuedTrackRepository.remove(queuedTrack); } } diff --git a/libs/backend/radio/core/application-services/src/lib/commands/download-and-play/download-and-play.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/download-and-play/download-and-play.handler.ts index 70c9445..892ac68 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/download-and-play/download-and-play.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/download-and-play/download-and-play.handler.ts @@ -1,6 +1,6 @@ import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs'; -import { PlayQueuedTrackEvent } from '../../events/play-queued-track/play-queued-track.event'; +import { PlayQueuedTrackEvent } from '@sdj/backend/radio/core/domain'; import { RadioFacade } from '../../radio.facade'; import { DownloadTrackCommand } from '../download-track/download-track.command'; import { DownloadAndPlayCommand } from './download-and-play.command'; diff --git a/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.spec.ts b/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.spec.ts index 05a785f..02a2a6f 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.spec.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.spec.ts @@ -1,4 +1,4 @@ -import { EventBus } from '@nestjs/cqrs'; +import { EventPublisher } from '@nestjs/cqrs'; import { Test, TestingModule } from '@nestjs/testing'; import { ChannelRepositoryInterface, @@ -17,6 +17,7 @@ describe('PlayNextTrackOrSilenceHandler', () => { let radioFacade: Mocked; let queuedTrackRepository: Mocked; let trackRepository: Mocked; + let publisher: Mocked; beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ @@ -24,8 +25,8 @@ describe('PlayNextTrackOrSilenceHandler', () => { PlayNextTrackOrSilenceHandler, { provide: RadioFacade, useValue: createSpyObj(RadioFacade) }, { - provide: EventBus, - useValue: createSpyObj(EventBus), + provide: EventPublisher, + useValue: createSpyObj(EventPublisher), }, { provide: ChannelRepositoryInterface, @@ -45,6 +46,7 @@ describe('PlayNextTrackOrSilenceHandler', () => { channelRepository = module.get(ChannelRepositoryInterface); queuedTrackRepository = module.get(QueuedTrackRepositoryInterface); radioFacade = module.get(RadioFacade); + publisher = module.get(EventPublisher); service = module.get( PlayNextTrackOrSilenceHandler ); @@ -58,6 +60,7 @@ describe('PlayNextTrackOrSilenceHandler', () => { test('#execute triggers download an play track if is in queue', async () => { channelRepository.findOrCreate = jest.fn(); channelRepository.findOrCreate.mockResolvedValue({ id: '1234' } as any); + publisher.mergeObjectContext.mockImplementation((x) => x); radioFacade.downloadAndPlay.mockResolvedValue({}); @@ -77,6 +80,7 @@ describe('PlayNextTrackOrSilenceHandler', () => { queuedTrackRepository.getNextSongInQueue.mockResolvedValue(null); queuedTrackRepository.findOneOrFail = jest.fn(); queuedTrackRepository.findOneOrFail.mockResolvedValue({ id: 2 } as any); + publisher.mergeObjectContext.mockImplementation((x) => x); appConfig.trackLengthToStartOwnRadio = 40; trackRepository.countTracks = jest.fn(); diff --git a/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.ts index 9d7e0ee..b367c95 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.ts @@ -1,4 +1,4 @@ -import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs'; +import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs'; import { ChannelRepositoryInterface, QueuedTrack, @@ -6,7 +6,6 @@ import { TrackRepositoryInterface, } from '@sdj/backend/radio/core/domain'; import { appConfig } from '@sdj/backend/shared/domain'; -import { PlaySilenceEvent } from '../../events/play-silence/play-silence.event'; import { RadioFacade } from '../../radio.facade'; import { DeleteQueuedTrackCommand } from '../delete-queued-track/delete-queued-track.command'; import { DownloadAndPlayCommand } from '../download-and-play/download-and-play.command'; @@ -17,16 +16,16 @@ import { PlayNextTrackOrSilenceCommand } from './play-next-track-or-silence.comm export class PlayNextTrackOrSilenceHandler implements ICommandHandler { constructor( - private eventBus: EventBus, private channelRepository: ChannelRepositoryInterface, + private publisher: EventPublisher, private radioFacade: RadioFacade, private queuedTrackRepository: QueuedTrackRepositoryInterface, private trackRepository: TrackRepositoryInterface ) {} async execute(command: PlayNextTrackOrSilenceCommand): Promise { - const channel = await this.channelRepository.findOrCreate( - command.channelId + const channel = this.publisher.mergeObjectContext( + await this.channelRepository.findOrCreate(command.channelId) ); const queuedTrack = await this.getNextTrack(channel.id); if (queuedTrack) { @@ -41,7 +40,9 @@ export class PlayNextTrackOrSilenceHandler await this.execute(command); } } else { - this.eventBus.publish(new PlaySilenceEvent(channel.id)); + channel.playSilence(); + channel.commit(); + await this.channelRepository.save(channel); } } diff --git a/libs/backend/radio/core/application-services/src/lib/commands/queue-track/queue-track.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/queue-track/queue-track.handler.ts index 3c2ef90..c28002c 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/queue-track/queue-track.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/queue-track/queue-track.handler.ts @@ -1,22 +1,22 @@ -import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs'; import { ChannelRepositoryInterface, QueuedTrack, QueuedTrackRepositoryInterface, + RadioAggregate, TrackRepositoryInterface, } from '@sdj/backend/radio/core/domain'; import { appConfig } from '@sdj/backend/shared/domain'; -import { Store } from '../../ports/store.port'; import { QueueTrackCommand } from './queue-track.command'; @CommandHandler(QueueTrackCommand) export class QueueTrackHandler implements ICommandHandler { constructor( - private readonly storageService: Store, private channelRepository: ChannelRepositoryInterface, private queuedTrackRepository: QueuedTrackRepositoryInterface, - private readonly trackRepository: TrackRepositoryInterface + private readonly trackRepository: TrackRepositoryInterface, + private publisher: EventPublisher ) {} async execute(command: QueueTrackCommand): Promise { @@ -39,7 +39,8 @@ export class QueueTrackHandler implements ICommandHandler { ); } } - let queuedTrack = new QueuedTrack( + const radio = this.publisher.mergeObjectContext(new RadioAggregate()); + let queuedTrack = radio.queueTrack( track, channel, command.randomized, @@ -47,7 +48,7 @@ export class QueueTrackHandler implements ICommandHandler { ); queuedTrack = await this.queuedTrackRepository.save(queuedTrack); - await this.storageService.addToQueue(queuedTrack); + radio.commit(); return queuedTrack; } } diff --git a/libs/backend/radio/core/application-services/src/lib/commands/skip-queued-track/skip-queued-track.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/skip-queued-track/skip-queued-track.handler.ts index 7bd94c0..133253e 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/skip-queued-track/skip-queued-track.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/skip-queued-track/skip-queued-track.handler.ts @@ -1,10 +1,10 @@ import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs'; import { QueuedTrackRepositoryInterface, + QueuedTrackSkippedEvent, TrackRepositoryInterface, } from '@sdj/backend/radio/core/domain'; import { HostService } from '@sdj/backend/shared/application-services'; -import { QueuedTrackSkippedEvent } from '../../events/queued-track-skiepped/queued-track-skipped.event'; import { SkipQueuedTrackCommand } from './skip-queued-track.command'; @CommandHandler(SkipQueuedTrackCommand) diff --git a/libs/backend/radio/core/application-services/src/lib/commands/thumb-down/thumb-down.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/thumb-down/thumb-down.handler.ts index 4842b10..f19bd0d 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/thumb-down/thumb-down.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/thumb-down/thumb-down.handler.ts @@ -1,12 +1,12 @@ import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs'; import { QueuedTrackRepositoryInterface, + SongVotedNegativelyEvent, User, UserRepositoryInterface, Vote, VoteRepositoryInterface, } from '@sdj/backend/radio/core/domain'; -import { SongVotedNegativelyEvent } from '../../events/song-voted-negatively/song-voted-negatively.event'; import { ThumbDownCommand } from './thumb-down.command'; @CommandHandler(ThumbDownCommand) diff --git a/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.handler.ts b/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.handler.ts index 55ffa48..eee1759 100644 --- a/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.handler.ts @@ -1,38 +1,30 @@ -import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; +import { EventPublisher, EventsHandler, IEventHandler } from '@nestjs/cqrs'; import { - QueuedTrack, + ChannelRepositoryInterface, + PlayQueuedTrackEvent, QueuedTrackRepositoryInterface, } from '@sdj/backend/radio/core/domain'; -import { Store } from '../../ports/store.port'; -import { PlayQueuedTrackEvent } from './play-queued-track.event'; @EventsHandler(PlayQueuedTrackEvent) export class PlayQueuedTrackHandler implements IEventHandler { constructor( - private readonly storageService: Store, - private queuedTrackRepository: QueuedTrackRepositoryInterface + private queuedTrackRepository: QueuedTrackRepositoryInterface, + private channelRepository: ChannelRepositoryInterface, + private publisher: EventPublisher ) {} - async handle(event: PlayQueuedTrackEvent): Promise { + async handle(event: PlayQueuedTrackEvent): Promise { const queuedTrack = await this.queuedTrackRepository.findOneOrFail( event.queuedTrackId ); const channelId = queuedTrack.playedIn.id; - const prevTrack = await this.storageService.getCurrentTrack(channelId); - if (prevTrack) { - await this.storageService.removeFromQueue(prevTrack); - } - await this.storageService.setCurrentTrack(channelId, queuedTrack); - await this.updateQueuedTrackPlayedAt(queuedTrack); - return this.storageService.setSilenceCount(channelId, 0); - } - - updateQueuedTrackPlayedAt( - queuedTrack: QueuedTrack, - playedAt?: Date - ): Promise { - queuedTrack.playedAt = playedAt || new Date(); - return this.queuedTrackRepository.save(queuedTrack); + const channel = this.publisher.mergeObjectContext( + await this.channelRepository.findOrFail(channelId) + ); + channel.play(queuedTrack); + await this.channelRepository.save(channel); + await this.queuedTrackRepository.save(queuedTrack); + channel.commit(); } } diff --git a/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.handler.ts b/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.handler.ts deleted file mode 100644 index b72e0f0..0000000 --- a/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.handler.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { Store } from '../../ports/store.port'; -import { PlaySilenceEvent } from './play-silence.event'; - -@EventsHandler(PlaySilenceEvent) -export class PlaySilenceHandler implements IEventHandler { - constructor(private readonly storageService: Store) {} - - async handle(command: PlaySilenceEvent): Promise { - const channelId = command.channelId; - let count = await this.storageService.getSilenceCount(channelId); - count++; - await this.storageService.setSilenceCount(channelId, count); - const prevTrack = await this.storageService.getCurrentTrack(channelId); - if (prevTrack) { - await this.storageService.removeFromQueue(prevTrack); - } - return this.storageService.setCurrentTrack(channelId, null); - } -} diff --git a/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.handler.ts b/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.handler.ts index 6ac0b96..eb19f49 100644 --- a/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.handler.ts @@ -1,12 +1,12 @@ import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; import { QueuedTrackRepositoryInterface, + SongVotedNegativelyEvent, VoteRepositoryInterface, } from '@sdj/backend/radio/core/domain'; import { appConfig } from '@sdj/backend/shared/domain'; import { SkipQueuedTrackCommand } from '../../commands/skip-queued-track/skip-queued-track.command'; import { RadioFacade } from '../../radio.facade'; -import { SongVotedNegativelyEvent } from './song-voted-negatively.event'; @EventsHandler(SongVotedNegativelyEvent) export class SongVotedNegativelyHandler diff --git a/libs/backend/radio/core/application-services/src/lib/ports/store.port.ts b/libs/backend/radio/core/application-services/src/lib/ports/store.port.ts deleted file mode 100644 index 45df59f..0000000 --- a/libs/backend/radio/core/application-services/src/lib/ports/store.port.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { QueuedTrack } from '@sdj/backend/radio/core/domain'; -import { Observable } from 'rxjs'; - -export abstract class Store { - abstract async addToQueue(queuedTrack: QueuedTrack): Promise; - - abstract channelAppears(channelId: string): Promise; - - abstract channelDisappears(channelId: string): void; - - abstract async getCurrentTrack( - channelId: string - ): Promise; - - abstract getQueue(channelId: string): Observable; - - abstract async getSilenceCount(channelId: string): Promise; - - abstract isChannelActive(channelId: string): boolean; - - abstract async removeFromQueue(queuedTrack: QueuedTrack): Promise; - - abstract async setCurrentTrack( - channelId: string, - queuedTrack: QueuedTrack | null - ): Promise; - - abstract async setSilenceCount( - channelId: string, - value: number - ): Promise; -} diff --git a/libs/backend/radio/core/application-services/src/lib/radio.facade.ts b/libs/backend/radio/core/application-services/src/lib/radio.facade.ts index ba70ecf..616f943 100644 --- a/libs/backend/radio/core/application-services/src/lib/radio.facade.ts +++ b/libs/backend/radio/core/application-services/src/lib/radio.facade.ts @@ -1,7 +1,7 @@ import { Injectable } from '@nestjs/common'; import { CommandBus, EventBus, QueryBus } from '@nestjs/cqrs'; -import { QueuedTrack } from '@sdj/backend/radio/core/domain'; +import { PozdroEvent, QueuedTrack } from '@sdj/backend/radio/core/domain'; import { AddTrackToQueueCommand } from './commands/add-track-to-queue/add-track-to-queue.command'; import { DeleteQueuedTrackCommand } from './commands/delete-queued-track/delete-queued-track.command'; import { DownloadAndPlayCommand } from './commands/download-and-play/download-and-play.command'; @@ -16,7 +16,6 @@ import { SetChannelDefaultStreamCommand } from './commands/set-channel-default-s import { SkipQueuedTrackCommand } from './commands/skip-queued-track/skip-queued-track.command'; import { ThumbDownCommand } from './commands/thumb-down/thumb-down.command'; import { ThumbUpCommand } from './commands/thumb-up/thumb-up.command'; -import { PozdroEvent } from './events/pozdro/pozdro.event'; import { GetChannelsQuery } from './queries/get-channels/get-channels.query'; import { GetChannelsReadModel } from './queries/get-channels/get-channels.read-model'; diff --git a/libs/backend/radio/core/domain/src/index.ts b/libs/backend/radio/core/domain/src/index.ts index 9d76f78..908c4e4 100644 --- a/libs/backend/radio/core/domain/src/index.ts +++ b/libs/backend/radio/core/domain/src/index.ts @@ -7,8 +7,15 @@ export * from './lib/events/channel-started.event'; export * from './lib/events/channel-updated.event'; export * from './lib/events/channel-will-start.event'; export * from './lib/events/channel-will-stop.event'; +export * from './lib/events/play-queued-track.event'; +export * from './lib/events/play-silence.event'; +export * from './lib/events/pozdro.event'; +export * from './lib/events/queued-track-skipped.event'; +export * from './lib/events/song-voted-negatively.event'; +export * from './lib/events/queue-changed.event'; export * from './lib/events/user-joined-channel.event'; export * from './lib/events/user-leaved-channel.event'; +export * from './lib/radio.aggregate'; export * from './lib/repositories/channel-repository.interface'; export * from './lib/repositories/queued-track-repository.interface'; export * from './lib/repositories/track-repository.interface'; diff --git a/libs/backend/radio/core/domain/src/lib/entities/channel.entity.ts b/libs/backend/radio/core/domain/src/lib/entities/channel.entity.ts index d215bb6..eba7153 100644 --- a/libs/backend/radio/core/domain/src/lib/entities/channel.entity.ts +++ b/libs/backend/radio/core/domain/src/lib/entities/channel.entity.ts @@ -1,16 +1,29 @@ import { AggregateRoot } from '@nestjs/cqrs'; -import { Column, Entity, OneToMany, PrimaryColumn } from 'typeorm'; +import { + Column, + Entity, + JoinColumn, + OneToMany, + OneToOne, + PrimaryColumn, +} from 'typeorm'; +import { QueueChangedEvent } from '../..'; import { ChannelStartedEvent } from '../events/channel-started.event'; import { ChannelStoppedEvent } from '../events/channel-stopped.event'; import { ChannelUpdatedEvent } from '../events/channel-updated.event'; import { ChannelWillStartEvent } from '../events/channel-will-start.event'; import { ChannelWillStopEvent } from '../events/channel-will-stop.event'; +import { PlaySilenceEvent } from '../events/play-silence.event'; import { UserJoinedChannelEvent } from '../events/user-joined-channel.event'; import { UserLeavedChannelEvent } from '../events/user-leaved-channel.event'; import { QueuedTrack } from './queued-track.entity'; @Entity() export class Channel extends AggregateRoot { + @OneToOne((type) => QueuedTrack, (queuedTrack) => queuedTrack.playedIn) + @JoinColumn() + currentTrack: QueuedTrack; + @Column('varchar', { length: 200, nullable: true, @@ -86,4 +99,15 @@ export class Channel extends AggregateRoot { this.apply(new ChannelStoppedEvent(this.id)); this.apply(new ChannelUpdatedEvent(this.id)); } + + play(queuedTrack: QueuedTrack): void { + this.currentTrack = queuedTrack; + queuedTrack.play(); + this.apply(new QueueChangedEvent(this.id)); + } + + playSilence(): void { + this.currentTrack = null; + this.apply(new PlaySilenceEvent(this.id)); + } } diff --git a/libs/backend/radio/core/domain/src/lib/entities/queued-track.entity.ts b/libs/backend/radio/core/domain/src/lib/entities/queued-track.entity.ts index 9b1ac3e..997293f 100644 --- a/libs/backend/radio/core/domain/src/lib/entities/queued-track.entity.ts +++ b/libs/backend/radio/core/domain/src/lib/entities/queued-track.entity.ts @@ -64,4 +64,8 @@ export class QueuedTrack implements IQueuedTrack { this.track = track; this.randomized = randomized; } + + play(): void { + this.playedAt = new Date(); + } } diff --git a/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.event.ts b/libs/backend/radio/core/domain/src/lib/events/play-queued-track.event.ts similarity index 100% rename from libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.event.ts rename to libs/backend/radio/core/domain/src/lib/events/play-queued-track.event.ts diff --git a/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.event.ts b/libs/backend/radio/core/domain/src/lib/events/play-silence.event.ts similarity index 78% rename from libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.event.ts rename to libs/backend/radio/core/domain/src/lib/events/play-silence.event.ts index 8698830..31b135a 100644 --- a/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.event.ts +++ b/libs/backend/radio/core/domain/src/lib/events/play-silence.event.ts @@ -1,5 +1,8 @@ import { ICommand } from '@nestjs/cqrs'; +/** + * @ToDo rename to play radio + */ export class PlaySilenceEvent implements ICommand { constructor(public channelId: string) {} } diff --git a/libs/backend/radio/core/application-services/src/lib/events/pozdro/pozdro.event.ts b/libs/backend/radio/core/domain/src/lib/events/pozdro.event.ts similarity index 100% rename from libs/backend/radio/core/application-services/src/lib/events/pozdro/pozdro.event.ts rename to libs/backend/radio/core/domain/src/lib/events/pozdro.event.ts diff --git a/libs/backend/radio/core/domain/src/lib/events/queue-changed.event.ts b/libs/backend/radio/core/domain/src/lib/events/queue-changed.event.ts new file mode 100644 index 0000000..f9cc4a3 --- /dev/null +++ b/libs/backend/radio/core/domain/src/lib/events/queue-changed.event.ts @@ -0,0 +1,3 @@ +export class QueueChangedEvent { + constructor(public channelId: string) {} +} diff --git a/libs/backend/radio/core/application-services/src/lib/events/queued-track-skiepped/queued-track-skipped.event.ts b/libs/backend/radio/core/domain/src/lib/events/queued-track-skipped.event.ts similarity index 100% rename from libs/backend/radio/core/application-services/src/lib/events/queued-track-skiepped/queued-track-skipped.event.ts rename to libs/backend/radio/core/domain/src/lib/events/queued-track-skipped.event.ts diff --git a/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.event.ts b/libs/backend/radio/core/domain/src/lib/events/song-voted-negatively.event.ts similarity index 100% rename from libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.event.ts rename to libs/backend/radio/core/domain/src/lib/events/song-voted-negatively.event.ts diff --git a/libs/backend/radio/core/domain/src/lib/radio.aggregate.ts b/libs/backend/radio/core/domain/src/lib/radio.aggregate.ts new file mode 100644 index 0000000..866c09b --- /dev/null +++ b/libs/backend/radio/core/domain/src/lib/radio.aggregate.ts @@ -0,0 +1,21 @@ +import { AggregateRoot } from '@nestjs/cqrs'; +import { + Channel, + QueuedTrack, + Track, + User, +} from '@sdj/backend/radio/core/domain'; +import { QueueChangedEvent } from './events/queue-changed.event'; + +export class RadioAggregate extends AggregateRoot { + queueTrack( + track: Track, + channel: Channel, + randomized: boolean = false, + user?: User + ): QueuedTrack { + const queuedTrack = new QueuedTrack(track, channel, randomized, user); + this.apply(new QueueChangedEvent(channel.id)); + return queuedTrack; + } +} diff --git a/libs/backend/radio/core/domain/src/lib/repositories/queued-track-repository.interface.ts b/libs/backend/radio/core/domain/src/lib/repositories/queued-track-repository.interface.ts index 4e4e630..fb559ce 100644 --- a/libs/backend/radio/core/domain/src/lib/repositories/queued-track-repository.interface.ts +++ b/libs/backend/radio/core/domain/src/lib/repositories/queued-track-repository.interface.ts @@ -25,4 +25,6 @@ export abstract class QueuedTrackRepositoryInterface { abstract createQueryBuilder(alias: string): QueryBuilder; abstract remove(queuedTrack: QueuedTrack): Promise; + + abstract getQueue(channelId: string): Promise; } diff --git a/libs/backend/radio/feature/src/lib/backend-radio-feature.module.ts b/libs/backend/radio/feature/src/lib/backend-radio-feature.module.ts index ce395ad..ef9430e 100644 --- a/libs/backend/radio/feature/src/lib/backend-radio-feature.module.ts +++ b/libs/backend/radio/feature/src/lib/backend-radio-feature.module.ts @@ -3,7 +3,6 @@ import { BackendRadioCoreApplicationServicesModule, TrackService, } from '@sdj/backend/radio/core/application-services'; -import { BackendRadioInfrastructureModule } from '@sdj/backend/radio/infrastructure'; import { BackendRadioInfrastructureMp3GainModule } from '@sdj/backend/radio/infrastructure-mp3-gain'; import { BackendRadioInfrastructureSlackApiModule } from '@sdj/backend/radio/infrastructure-slack-api'; import { BackendRadioInfrastructureTypeormModule } from '@sdj/backend/radio/infrastructure-typeorm'; @@ -14,7 +13,6 @@ import { TrackServiceAdapter } from './adapters/track-service.adapter'; @Module({ imports: [ BackendRadioCoreApplicationServicesModule, - BackendRadioInfrastructureModule, BackendRadioInfrastructureYoutubeApiModule, BackendRadioInfrastructureMp3GainModule, BackendRadioInfrastructureTypeormModule, @@ -24,7 +22,6 @@ import { TrackServiceAdapter } from './adapters/track-service.adapter'; exports: [ TrackService, BackendRadioCoreApplicationServicesModule, - BackendRadioInfrastructureModule, BackendRadioInfrastructureYoutubeApiModule, BackendRadioInfrastructureTypeormModule, BackendRadioInfrastructureSlackApiModule, diff --git a/libs/backend/radio/infrastructure-typeorm/src/lib/repositories/typeorm-queued-track.repository.ts b/libs/backend/radio/infrastructure-typeorm/src/lib/repositories/typeorm-queued-track.repository.ts index 893732a..e3879be 100644 --- a/libs/backend/radio/infrastructure-typeorm/src/lib/repositories/typeorm-queued-track.repository.ts +++ b/libs/backend/radio/infrastructure-typeorm/src/lib/repositories/typeorm-queued-track.repository.ts @@ -1,6 +1,5 @@ -import { forwardRef, Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { Store } from '@sdj/backend/radio/core/application-services'; import { QueuedTrack, QueuedTrackRepositoryInterface, @@ -10,8 +9,6 @@ import { QueryBuilder, Repository } from 'typeorm'; @Injectable() export class TypeormQueuedTrackRepository extends QueuedTrackRepositoryInterface { constructor( - @Inject(forwardRef(() => Store)) - private store: Store, @InjectRepository(QueuedTrack) private typeOrmRepository: Repository ) { @@ -53,21 +50,39 @@ export class TypeormQueuedTrackRepository extends QueuedTrackRepositoryInterface } async getCurrentTrack(channelId: string): Promise { - return this.store.getCurrentTrack(channelId); + return this.typeOrmRepository + .createQueryBuilder('queuedTrack') + .where('queuedTrack.playedIn = :channelId') + .innerJoin( + 'queuedTrack.playedIn', + 'channel', + 'queuedTrack.id = channel.currentTrack' + ) + .setParameter('channelId', channelId) + .getOne(); } getNextSongInQueue(channelId: string): Promise { - return ( - this.typeOrmRepository - .createQueryBuilder('queuedTrack') - // .addSelect('max(queuedTrack.id)') - .where('queuedTrack.playedIn = :channelId') - .leftJoinAndSelect('queuedTrack.track', 'track') - .andWhere('queuedTrack.playedAt IS NULL') - .orderBy('queuedTrack.order, queuedTrack.id', 'ASC') - .setParameter('channelId', channelId) - .getOne() - ); + return this.typeOrmRepository + .createQueryBuilder('queuedTrack') + .where('queuedTrack.playedIn = :channelId') + .leftJoinAndSelect('queuedTrack.track', 'track') + .andWhere('queuedTrack.playedAt IS NULL') + .orderBy('queuedTrack.order, queuedTrack.id', 'ASC') + .setParameter('channelId', channelId) + .getOne(); + } + + getQueue(channelId: string): Promise { + return this.typeOrmRepository + .createQueryBuilder('queuedTrack') + .where('queuedTrack.playedIn = :channelId') + .innerJoinAndSelect('queuedTrack.track', 'track') + .leftJoinAndSelect('queuedTrack.addedBy', 'addedBy ') + .andWhere('queuedTrack.playedAt IS NULL') + .orderBy('queuedTrack.createdAt') + .setParameter('channelId', channelId) + .getMany(); } remove(queuedTrack: QueuedTrack): Promise { diff --git a/libs/backend/radio/infrastructure/README.md b/libs/backend/radio/infrastructure/README.md deleted file mode 100644 index d8c4560..0000000 --- a/libs/backend/radio/infrastructure/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# backend-radio-infrastructure - -This library was generated with [Nx](https://nx.dev). - -## Running unit tests - -Run `ng test backend-radio-infrastructure` to execute the unit tests via [Jest](https://jestjs.io). diff --git a/libs/backend/radio/infrastructure/jest.config.js b/libs/backend/radio/infrastructure/jest.config.js deleted file mode 100644 index 3f86d69..0000000 --- a/libs/backend/radio/infrastructure/jest.config.js +++ /dev/null @@ -1,10 +0,0 @@ -module.exports = { - name: 'backend-radio-infrastructure', - preset: '../../../../jest.config.js', - transform: { - '^.+\\.[tj]sx?$': 'ts-jest', - }, - moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'html'], - coverageDirectory: '../../../../coverage/libs/backend/radio/infrastructure', - globals: { 'ts-jest': { tsConfig: '/tsconfig.spec.json' } }, -}; diff --git a/libs/backend/radio/infrastructure/src/index.ts b/libs/backend/radio/infrastructure/src/index.ts deleted file mode 100644 index 85ff23c..0000000 --- a/libs/backend/radio/infrastructure/src/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './lib/store.adapter'; -export * from './lib/backend-radio-infrastructure.module'; diff --git a/libs/backend/radio/infrastructure/src/lib/backend-radio-infrastructure.module.ts b/libs/backend/radio/infrastructure/src/lib/backend-radio-infrastructure.module.ts deleted file mode 100644 index d624c1c..0000000 --- a/libs/backend/radio/infrastructure/src/lib/backend-radio-infrastructure.module.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Module } from '@nestjs/common'; -import { Store } from '@sdj/backend/radio/core/application-services'; -import { StoreAdapter } from './store.adapter'; - -@Module({ - providers: [StoreAdapter, { provide: Store, useExisting: StoreAdapter }], - exports: [Store], -}) -export class BackendRadioInfrastructureModule {} diff --git a/libs/backend/radio/infrastructure/src/lib/store.adapter.ts b/libs/backend/radio/infrastructure/src/lib/store.adapter.ts deleted file mode 100644 index 0b5974a..0000000 --- a/libs/backend/radio/infrastructure/src/lib/store.adapter.ts +++ /dev/null @@ -1,133 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { Store } from '@sdj/backend/radio/core/application-services'; -import { - QueuedTrack, - QueuedTrackRepositoryInterface, -} from '@sdj/backend/radio/core/domain'; -import { BehaviorSubject, Observable, of } from 'rxjs'; -import { distinctUntilChanged, filter, first, map } from 'rxjs/operators'; - -interface ChannelState { - silenceCount: number; - queue: QueuedTrack[]; - currentTrack: QueuedTrack | null; -} - -interface State { - [key: string]: ChannelState; -} - -const initialChannelState = { silenceCount: 0, queue: [], currentTrack: null }; -const initialPlaylistState = {}; - -@Injectable() -export class StoreAdapter implements Store { - get state(): State { - return this._state.getValue(); - } - - private _state: BehaviorSubject = new BehaviorSubject( - initialPlaylistState - ); - - constructor( - private readonly queuedTrackRepository: QueuedTrackRepositoryInterface - ) {} - - async addToQueue(queuedTrack: QueuedTrack): Promise { - const channelState = await this.getChannelState(queuedTrack.playedIn.id); - const isTrackAlreadyInQueue = - channelState.queue.findIndex( - (trackInQueue: QueuedTrack) => trackInQueue.id === queuedTrack.id - ) !== -1; - if (!isTrackAlreadyInQueue) { - this._state.next({ - ...this.state, - [queuedTrack.playedIn.id]: { - ...channelState, - queue: channelState.queue.concat(queuedTrack), - }, - }); - } - } - - channelAppears(channelId: string): Promise { - return (!!this.state && !!this.state[channelId] - ? of() - : this._state.pipe( - filter((state) => !!state[channelId]), - first() - ) - ).toPromise(); - } - - channelDisappears(channelId: string): void { - const state = this.state; - delete state[channelId]; - this._state.next(state); - } - - async getCurrentTrack(channelId: string): Promise { - return (await this.getChannelState(channelId)).currentTrack; - } - - getQueue(channelId: string): Observable { - return this._state.pipe( - map((state: State) => state[channelId]), - filter(Boolean), - map((state: ChannelState) => state.queue), - distinctUntilChanged() - ); - } - - async getSilenceCount(channelId: string): Promise { - await this.getChannelState(channelId); - return this.state[channelId].silenceCount; - } - - isChannelActive(channelId: string): boolean { - return !!this.state[channelId]; - } - - async setCurrentTrack( - channelId: string, - queuedTrack: QueuedTrack | null - ): Promise { - const channelState = await this.getChannelState(channelId); - this._state.next({ - ...this.state, - [channelId]: { ...channelState, currentTrack: queuedTrack }, - }); - } - - async setSilenceCount(channelId: string, value: number): Promise { - const channelState = await this.getChannelState(channelId); - this._state.next({ - ...this.state, - [channelId]: { ...channelState, silenceCount: value }, - }); - } - - async removeFromQueue(queuedTrack: QueuedTrack): Promise { - const channelState = await this.getChannelState(queuedTrack.playedIn.id); - this._state.next({ - ...this.state, - [queuedTrack.playedIn.id]: { - ...channelState, - queue: channelState.queue.filter( - (qTrack) => qTrack.id !== queuedTrack.id - ), - }, - }); - } - - private async getChannelState(channelId: string): Promise { - if (!this.state[channelId]) { - this.state[channelId] = { - ...initialChannelState, - queue: await this.queuedTrackRepository.findQueuedTracks(channelId), - }; - } - return this.state[channelId]; - } -} diff --git a/libs/backend/radio/infrastructure/tsconfig.json b/libs/backend/radio/infrastructure/tsconfig.json deleted file mode 100644 index d32f55e..0000000 --- a/libs/backend/radio/infrastructure/tsconfig.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "extends": "../../../../tsconfig.base.json", - "compilerOptions": { - "types": ["node", "jest"] - }, - "include": [], - "files": [], - "references": [ - { - "path": "./tsconfig.lib.json" - }, - { - "path": "./tsconfig.spec.json" - } - ] -} diff --git a/libs/backend/radio/infrastructure/tsconfig.lib.json b/libs/backend/radio/infrastructure/tsconfig.lib.json deleted file mode 100644 index a174cb0..0000000 --- a/libs/backend/radio/infrastructure/tsconfig.lib.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "extends": "./tsconfig.json", - "compilerOptions": { - "outDir": "../../../../dist/out-tsc", - "types": [] - }, - "exclude": ["**/*.spec.ts"], - "include": ["**/*.ts"] -} diff --git a/libs/backend/radio/infrastructure/tsconfig.spec.json b/libs/backend/radio/infrastructure/tsconfig.spec.json deleted file mode 100644 index 8119bae..0000000 --- a/libs/backend/radio/infrastructure/tsconfig.spec.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "extends": "./tsconfig.json", - "compilerOptions": { - "outDir": "../../../../dist/out-tsc", - "module": "commonjs", - "types": ["jest", "node"] - }, - "include": [ - "**/*.spec.ts", - "**/*.spec.tsx", - "**/*.spec.js", - "**/*.spec.jsx", - "**/*.d.ts" - ] -} diff --git a/libs/backend/radio/infrastructure/tslint.json b/libs/backend/radio/infrastructure/tslint.json deleted file mode 100644 index e9dbc53..0000000 --- a/libs/backend/radio/infrastructure/tslint.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "extends": "../../../../tslint.json", - "rules": {}, - "linterOptions": { - "exclude": ["!**/*"] - } -} diff --git a/libs/backend/radio/ui-redis/src/lib/events/play-queued-track/ices-play-queued-track.handler.ts b/libs/backend/radio/ui-redis/src/lib/events/play-queued-track/ices-play-queued-track.handler.ts index f0ad263..fa4d9e8 100644 --- a/libs/backend/radio/ui-redis/src/lib/events/play-queued-track/ices-play-queued-track.handler.ts +++ b/libs/backend/radio/ui-redis/src/lib/events/play-queued-track/ices-play-queued-track.handler.ts @@ -1,6 +1,8 @@ import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { PlayQueuedTrackEvent } from '@sdj/backend/radio/core/application-services'; -import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain'; +import { + PlayQueuedTrackEvent, + QueuedTrackRepositoryInterface, +} from '@sdj/backend/radio/core/domain'; import { RedisService } from '../../services/redis.service'; @EventsHandler(PlayQueuedTrackEvent) diff --git a/libs/backend/radio/ui-redis/src/lib/events/play-silence/ices-play-silence.handler.ts b/libs/backend/radio/ui-redis/src/lib/events/play-silence/ices-play-silence.handler.ts index aa0a751..6af92ab 100644 --- a/libs/backend/radio/ui-redis/src/lib/events/play-silence/ices-play-silence.handler.ts +++ b/libs/backend/radio/ui-redis/src/lib/events/play-silence/ices-play-silence.handler.ts @@ -1,5 +1,5 @@ import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { PlaySilenceEvent } from '@sdj/backend/radio/core/application-services'; +import { PlaySilenceEvent } from '@sdj/backend/radio/core/domain'; import { RedisService } from '../../services/redis.service'; @EventsHandler(PlaySilenceEvent) diff --git a/libs/backend/radio/ui-slack/src/lib/bot/lib/commands/pozdro.slack-command.ts b/libs/backend/radio/ui-slack/src/lib/bot/lib/commands/pozdro.slack-command.ts index 0022e84..c2b6e83 100644 --- a/libs/backend/radio/ui-slack/src/lib/bot/lib/commands/pozdro.slack-command.ts +++ b/libs/backend/radio/ui-slack/src/lib/bot/lib/commands/pozdro.slack-command.ts @@ -1,9 +1,9 @@ import { Injectable, Logger } from '@nestjs/common'; +import { RadioFacade } from '@sdj/backend/radio/core/application-services'; import { PozdroEvent, - RadioFacade, -} from '@sdj/backend/radio/core/application-services'; -import { UserRepositoryInterface } from '@sdj/backend/radio/core/domain'; + UserRepositoryInterface, +} from '@sdj/backend/radio/core/domain'; import { SlackCommand, SlackCommandHandler, diff --git a/libs/backend/radio/ui-slack/src/lib/bot/lib/events/queued-track-skipped/slack-queued-track-skipped.handler.ts b/libs/backend/radio/ui-slack/src/lib/bot/lib/events/queued-track-skipped/slack-queued-track-skipped.handler.ts index ece1681..d904688 100644 --- a/libs/backend/radio/ui-slack/src/lib/bot/lib/events/queued-track-skipped/slack-queued-track-skipped.handler.ts +++ b/libs/backend/radio/ui-slack/src/lib/bot/lib/events/queued-track-skipped/slack-queued-track-skipped.handler.ts @@ -1,6 +1,8 @@ import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { QueuedTrackSkippedEvent } from '@sdj/backend/radio/core/application-services'; -import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain'; +import { + QueuedTrackRepositoryInterface, + QueuedTrackSkippedEvent, +} from '@sdj/backend/radio/core/domain'; import { SlackService } from '@sikora00/nestjs-slack-bot'; @EventsHandler(QueuedTrackSkippedEvent) diff --git a/libs/backend/radio/ui-web-socket/src/lib/events/play-queued-track/ws-play-queued-track.handler.ts b/libs/backend/radio/ui-web-socket/src/lib/events/play-queued-track/ws-play-queued-track.handler.ts index 750ba66..71533f9 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/events/play-queued-track/ws-play-queued-track.handler.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/events/play-queued-track/ws-play-queued-track.handler.ts @@ -1,7 +1,9 @@ import { Logger } from '@nestjs/common'; import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { PlayQueuedTrackEvent } from '@sdj/backend/radio/core/application-services'; -import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain'; +import { + PlayQueuedTrackEvent, + QueuedTrackRepositoryInterface, +} from '@sdj/backend/radio/core/domain'; import { WebSocketEvents } from '@sdj/shared/domain'; import { Gateway } from '../../gateway/gateway'; diff --git a/libs/backend/radio/ui-web-socket/src/lib/events/play-silence/ws-play-silence.handler.ts b/libs/backend/radio/ui-web-socket/src/lib/events/play-silence/ws-play-silence.handler.ts index a6bad05..470184d 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/events/play-silence/ws-play-silence.handler.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/events/play-silence/ws-play-silence.handler.ts @@ -1,27 +1,16 @@ import { Logger } from '@nestjs/common'; import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { - PlaySilenceEvent, - Store, -} from '@sdj/backend/radio/core/application-services'; +import { PlaySilenceEvent } from '@sdj/backend/radio/core/domain'; import { WebSocketEvents } from '@sdj/shared/domain'; import { Gateway } from '../../gateway/gateway'; @EventsHandler(PlaySilenceEvent) export class WsPlaySilenceHandler implements IEventHandler { - constructor( - private logger: Logger, - private gateway: Gateway, - private readonly storageService: Store - ) {} + constructor(private logger: Logger, private gateway: Gateway) {} async handle(command: PlaySilenceEvent): Promise { const channelId = command.channelId; - let count = await this.storageService.getSilenceCount(channelId); - count++; - if (count > 1) { - this.logger.log('radio', channelId); - this.gateway.server.in(channelId).emit(WebSocketEvents.playRadio); - } + this.logger.log('radio', channelId); + this.gateway.server.in(channelId).emit(WebSocketEvents.playRadio); } } diff --git a/libs/backend/radio/ui-web-socket/src/lib/events/pozdro/ws-pozdro.handler.ts b/libs/backend/radio/ui-web-socket/src/lib/events/pozdro/ws-pozdro.handler.ts index cfe2f6b..9792b86 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/events/pozdro/ws-pozdro.handler.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/events/pozdro/ws-pozdro.handler.ts @@ -1,6 +1,6 @@ import { Logger } from '@nestjs/common'; import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { PozdroEvent } from '@sdj/backend/radio/core/application-services'; +import { PozdroEvent } from '@sdj/backend/radio/core/domain'; import { WebSocketEvents } from '@sdj/shared/domain'; import { Gateway } from '../../gateway/gateway'; diff --git a/libs/backend/radio/ui-web-socket/src/lib/events/queue-changed/ws-queued-changed.handler.ts b/libs/backend/radio/ui-web-socket/src/lib/events/queue-changed/ws-queued-changed.handler.ts new file mode 100644 index 0000000..367dcae --- /dev/null +++ b/libs/backend/radio/ui-web-socket/src/lib/events/queue-changed/ws-queued-changed.handler.ts @@ -0,0 +1,15 @@ +import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; +import { QueueChangedEvent } from '@sdj/backend/radio/core/domain'; +import { WsQueueSynchronizationService } from '../../services/ws-queue-synchronization.service'; + +@EventsHandler(QueueChangedEvent) +export class WsQueuedChangedHandler + implements IEventHandler { + constructor( + private queueSynchronizationService: WsQueueSynchronizationService + ) {} + + async handle(event: QueueChangedEvent): Promise { + return this.queueSynchronizationService.updateQueue(event.channelId); + } +} diff --git a/libs/backend/radio/ui-web-socket/src/lib/gateway/gateway.ts b/libs/backend/radio/ui-web-socket/src/lib/gateway/gateway.ts index c30465e..e7b4b89 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/gateway/gateway.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/gateway/gateway.ts @@ -9,7 +9,6 @@ import { JoinChannelCommand, LeaveChannelCommand, RadioFacade, - Store, } from '@sdj/backend/radio/core/application-services'; import { ChannelRepositoryInterface, @@ -20,6 +19,7 @@ import { WebSocketEvents } from '@sdj/shared/domain'; import { Observable, Subject } from 'rxjs'; import { map, takeUntil } from 'rxjs/operators'; import { Server, Socket } from 'socket.io'; +import { WsQueueSynchronizationService } from '../services/ws-queue-synchronization.service'; @WebSocketGateway() export class Gateway implements OnGatewayDisconnect { @@ -29,7 +29,7 @@ export class Gateway implements OnGatewayDisconnect { constructor( private readonly channelRepository: ChannelRepositoryInterface, private hostService: HostService, - private readonly storageService: Store, + private queueSynchronizationService: WsQueueSynchronizationService, private radioFacade: RadioFacade ) {} @@ -75,12 +75,14 @@ export class Gateway implements OnGatewayDisconnect { this.clientInRoomSubjects[client.id].complete(); } this.clientInRoomSubjects[client.id] = new Subject(); - return this.storageService.getQueue(JSON.parse(channel)).pipe( - takeUntil(this.clientInRoomSubjects[client.id]), - map((list) => { - return { event: WebSocketEvents.queuedTrackList, data: list }; - }) - ); + return this.queueSynchronizationService + .listenToQueue(JSON.parse(channel)) + .pipe( + takeUntil(this.clientInRoomSubjects[client.id]), + map((list) => { + return { event: WebSocketEvents.queuedTrackList, data: list }; + }) + ); } private async leaveOtherChannels( diff --git a/libs/backend/radio/ui-web-socket/src/lib/services/ws-queue-synchronization.service.ts b/libs/backend/radio/ui-web-socket/src/lib/services/ws-queue-synchronization.service.ts new file mode 100644 index 0000000..1313ae7 --- /dev/null +++ b/libs/backend/radio/ui-web-socket/src/lib/services/ws-queue-synchronization.service.ts @@ -0,0 +1,43 @@ +import { Injectable } from '@nestjs/common'; +import { + QueuedTrack, + QueuedTrackRepositoryInterface, +} from '@sdj/backend/radio/core/domain'; +import { BehaviorSubject, from, Observable, Subject } from 'rxjs'; +import { switchMap } from 'rxjs/operators'; + +@Injectable() +export class WsQueueSynchronizationService { + private readonly queues: Record> = {}; + + constructor(private queuedTrackRepository: QueuedTrackRepositoryInterface) {} + + /** + * @ToDO there is a place for a memory leak because we should remove unused subjects + * @param channelId + */ + listenToQueue(channelId: string): Observable { + let queue: Observable = this.queues[channelId]; + if (!queue) { + queue = this.addQueue(channelId); + } + + return queue; + } + + async updateQueue(channelId): Promise { + const queueSubject = this.queues[channelId]; + if (queueSubject) { + queueSubject.next(await this.queuedTrackRepository.getQueue(channelId)); + } + } + + private addQueue(channelId: string): Observable { + return from(this.queuedTrackRepository.getQueue(channelId)).pipe( + switchMap((queue) => { + this.queues[channelId] = new BehaviorSubject(queue); + return this.queues[channelId]; + }) + ); + } +} diff --git a/libs/backend/radio/ui-web-socket/src/lib/websocket.module.ts b/libs/backend/radio/ui-web-socket/src/lib/websocket.module.ts index a3e2ae9..857df47 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/websocket.module.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/websocket.module.ts @@ -6,8 +6,10 @@ import { WsChannelUpdatedHandler } from './events/channel-updated/ws-channel-upd import { WsPlayQueuedTrackHandler } from './events/play-queued-track/ws-play-queued-track.handler'; import { WsPlaySilenceHandler } from './events/play-silence/ws-play-silence.handler'; import { WsPozdroHandler } from './events/pozdro/ws-pozdro.handler'; +import { WsQueuedChangedHandler } from './events/queue-changed/ws-queued-changed.handler'; import { WsUserJoinedChannelHandler } from './events/user-joined-channel/ws-user-joined-channel.handler'; import { Gateway } from './gateway/gateway'; +import { WsQueueSynchronizationService } from './services/ws-queue-synchronization.service'; const EventsHandlers = [ WsChannelStartedHandler, @@ -15,12 +17,13 @@ const EventsHandlers = [ WsPlayQueuedTrackHandler, WsPlaySilenceHandler, WsPozdroHandler, + WsQueuedChangedHandler, WsUserJoinedChannelHandler, ]; @Module({ imports: [CqrsModule], - providers: [...EventsHandlers, Gateway], + providers: [...EventsHandlers, Gateway, WsQueueSynchronizationService], exports: [Gateway], }) export class WebSocketModule implements OnModuleInit { diff --git a/nx.json b/nx.json index 6618664..796aa75 100644 --- a/nx.json +++ b/nx.json @@ -45,9 +45,6 @@ "backend-radio-ui-slack": { "tags": ["platform:backend", "scope:radio", "type:ui"] }, - "backend-radio-infrastructure": { - "tags": ["platform:backend", "scope:radio", "type:infrastructure"] - }, "backend-shared-util-you-tube": { "tags": ["platform:backend", "scope:shared", "type:util"] }, diff --git a/package.json b/package.json index a68973b..9d74e4c 100644 --- a/package.json +++ b/package.json @@ -168,8 +168,7 @@ }, "husky": { "hooks": { - "pre-commit": "npm-run-all --parallel validate-commit validate-nx-tags format:check", - "pre-push": "npm-run-all --parallel affected:lint affected:test" + "pre-commit": "validate-commit validate-nx-tags format:check" } } } diff --git a/tsconfig.base.json b/tsconfig.base.json index ccbcc82..751d831 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -34,9 +34,6 @@ "@sdj/backend/radio/ui-slack": [ "libs/backend/radio/ui-slack/src/index.ts" ], - "@sdj/backend/radio/infrastructure": [ - "libs/backend/radio/infrastructure/src/index.ts" - ], "@sdj/backend/shared/util-you-tube": [ "libs/backend/shared/util-you-tube/src/index.ts" ],