diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 08376b3..6c8f243 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -34,7 +34,7 @@ jobs: - name: npm install run: docker exec sdj-e2e-backend bash -c "npm ci" - name: affected:e2e - run: docker exec sdj-e2e-backend bash -c "npm run affected:e2e -- --parallel --base origin/${BASE_BRANCH}" + run: docker exec sdj-e2e-backend bash -c "npm run affected:e2e -- --base origin/${BASE_BRANCH}" env: DB_ROOT_PASSWORD: ${{ secrets.DB_PASSWORD }} DB_HOST: ${{ secrets.DB_HOST }} diff --git a/angular.json b/angular.json index 309ce45..a9ccc3b 100644 --- a/angular.json +++ b/angular.json @@ -482,34 +482,6 @@ } } }, - "backend-radio-infrastructure": { - "root": "libs/backend/radio/infrastructure", - "sourceRoot": "libs/backend/radio/infrastructure/src", - "projectType": "library", - "schematics": {}, - "architect": { - "lint": { - "builder": "@angular-devkit/build-angular:tslint", - "options": { - "tsConfig": [ - "libs/backend/radio/infrastructure/tsconfig.lib.json", - "libs/backend/radio/infrastructure/tsconfig.spec.json" - ], - "exclude": [ - "**/node_modules/**", - "!libs/backend/radio/infrastructure/**/*" - ] - } - }, - "test": { - "builder": "@nrwl/jest:jest", - "options": { - "jestConfig": "libs/backend/radio/infrastructure/jest.config.js", - "passWithNoTests": true - } - } - } - }, "backend-shared-util-you-tube": { "root": "libs/backend/shared/util-you-tube", "sourceRoot": "libs/backend/shared/util-you-tube/src", diff --git a/apps/backend-e2e/src/integration/play-radio-queue.spec.ts b/apps/backend-e2e/src/integration/play-radio-queue.spec.ts index 9efa190..d2f2240 100644 --- a/apps/backend-e2e/src/integration/play-radio-queue.spec.ts +++ b/apps/backend-e2e/src/integration/play-radio-queue.spec.ts @@ -56,16 +56,7 @@ describe('Play Radio Queue', () => { }); }); - test('returns 10-sec-of-silence', (done) => { - redisSub.on('message', (channel, message) => { - expect(channel).toEqual(channelId); - expect(message).toEqual('10-sec-of-silence'); - done(); - }); - redisPub.publish('getNext', channelId); - }); - - test('returns emits playRadio on second silence', (done) => { + test('returns 10-sec-of-silence and emits playRadio', (done) => { Promise.all([ new Promise((resolve) => { redisSub.on('message', (channel, message) => { @@ -132,23 +123,15 @@ describe('Play Radio Queue', () => { redisSub.once('message', (channel, message) => { expect(channel).toEqual(channelId); expect(message).toEqual('_D1rrdFcj1U'); - resolve(); + // @ToDo play is an event so it is possible that it emits by web socket before the track will be updated and the next test will fall + setTimeout(() => resolve(), 100); }); }), ]).then(() => done()); redisPub.publish('getNext', channelId); }); - test('returns silence when queue is empty', (done) => { - redisSub.once('message', (channel, message) => { - expect(channel).toEqual(channelId); - expect(message).toEqual('10-sec-of-silence'); - done(); - }); - redisPub.publish('getNext', channelId); - }); - - test('returns silence and emits playRadio on second silence playing', (done) => { + test('returns silence and emits playRadio when queue is empty', (done) => { Promise.all([ new Promise((resolve) => socket.once(WebSocketEvents.playRadio, resolve) diff --git a/libs/backend/radio/core/application-services/src/index.ts b/libs/backend/radio/core/application-services/src/index.ts index 912cc7c..1468ff3 100644 --- a/libs/backend/radio/core/application-services/src/index.ts +++ b/libs/backend/radio/core/application-services/src/index.ts @@ -15,13 +15,8 @@ export * from './lib/commands/set-channel-default-stream/set-channel-default-str export * from './lib/commands/thumb-down/thumb-down.command'; export * from './lib/commands/thumb-up/thumb-up.command'; export * from './lib/dtos/track-data.dto'; -export * from './lib/events/play-queued-track/play-queued-track.event'; -export * from './lib/events/play-silence/play-silence.event'; -export * from './lib/events/pozdro/pozdro.event'; -export * from './lib/events/queued-track-skiepped/queued-track-skipped.event'; export * from './lib/ports/track-data.service'; export * from './lib/ports/track.service'; -export * from './lib/ports/store.port'; export * from './lib/queries/get-channels/get-channels.query'; export * from './lib/queries/get-channels/get-channels.read-model'; export * from './lib/radio.facade'; diff --git a/libs/backend/radio/core/application-services/src/lib/backend-radio-core-application-services.module.ts b/libs/backend/radio/core/application-services/src/lib/backend-radio-core-application-services.module.ts index ed3f36b..d9e4553 100644 --- a/libs/backend/radio/core/application-services/src/lib/backend-radio-core-application-services.module.ts +++ b/libs/backend/radio/core/application-services/src/lib/backend-radio-core-application-services.module.ts @@ -19,7 +19,6 @@ import { ThumbUpHandler } from './commands/thumb-up/thumb-up.handler'; import { ChannelWillStartHandler } from './events/channel-will-start/channel-will-start.handler'; import { ChannelWillStopHandler } from './events/channel-will-stop/channel-will-stop.handler'; import { PlayQueuedTrackHandler } from './events/play-queued-track/play-queued-track.handler'; -import { PlaySilenceHandler } from './events/play-silence/play-silence.handler'; import { SongVotedNegativelyHandler } from './events/song-voted-negatively/song-voted-negatively.handler'; import { GetChannelsHandler } from './queries/get-channels/get-channels.handler'; import { RadioFacade } from './radio.facade'; @@ -47,7 +46,6 @@ export const EventHandlers = [ ChannelWillStartHandler, ChannelWillStopHandler, PlayQueuedTrackHandler, - PlaySilenceHandler, SongVotedNegativelyHandler, ]; diff --git a/libs/backend/radio/core/application-services/src/lib/commands/delete-queued-track/delete-queued-track.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/delete-queued-track/delete-queued-track.handler.ts index 172493d..ff8c749 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/delete-queued-track/delete-queued-track.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/delete-queued-track/delete-queued-track.handler.ts @@ -1,21 +1,18 @@ import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain'; -import { Store } from '../../ports/store.port'; import { DeleteQueuedTrackCommand } from './delete-queued-track.command'; @CommandHandler(DeleteQueuedTrackCommand) export class DeleteQueuedTrackHandler implements ICommandHandler { constructor( - private readonly queuedTrackRepository: QueuedTrackRepositoryInterface, - private readonly storageService: Store + private readonly queuedTrackRepository: QueuedTrackRepositoryInterface ) {} async execute(command: DeleteQueuedTrackCommand): Promise { const queuedTrack = await this.queuedTrackRepository.findOneOrFail( command.queuedTrackId ); - await this.storageService.removeFromQueue(queuedTrack); await this.queuedTrackRepository.remove(queuedTrack); } } diff --git a/libs/backend/radio/core/application-services/src/lib/commands/download-and-play/download-and-play.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/download-and-play/download-and-play.handler.ts index 70c9445..892ac68 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/download-and-play/download-and-play.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/download-and-play/download-and-play.handler.ts @@ -1,6 +1,6 @@ import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs'; -import { PlayQueuedTrackEvent } from '../../events/play-queued-track/play-queued-track.event'; +import { PlayQueuedTrackEvent } from '@sdj/backend/radio/core/domain'; import { RadioFacade } from '../../radio.facade'; import { DownloadTrackCommand } from '../download-track/download-track.command'; import { DownloadAndPlayCommand } from './download-and-play.command'; diff --git a/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.spec.ts b/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.spec.ts index 05a785f..02a2a6f 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.spec.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.spec.ts @@ -1,4 +1,4 @@ -import { EventBus } from '@nestjs/cqrs'; +import { EventPublisher } from '@nestjs/cqrs'; import { Test, TestingModule } from '@nestjs/testing'; import { ChannelRepositoryInterface, @@ -17,6 +17,7 @@ describe('PlayNextTrackOrSilenceHandler', () => { let radioFacade: Mocked; let queuedTrackRepository: Mocked; let trackRepository: Mocked; + let publisher: Mocked; beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ @@ -24,8 +25,8 @@ describe('PlayNextTrackOrSilenceHandler', () => { PlayNextTrackOrSilenceHandler, { provide: RadioFacade, useValue: createSpyObj(RadioFacade) }, { - provide: EventBus, - useValue: createSpyObj(EventBus), + provide: EventPublisher, + useValue: createSpyObj(EventPublisher), }, { provide: ChannelRepositoryInterface, @@ -45,6 +46,7 @@ describe('PlayNextTrackOrSilenceHandler', () => { channelRepository = module.get(ChannelRepositoryInterface); queuedTrackRepository = module.get(QueuedTrackRepositoryInterface); radioFacade = module.get(RadioFacade); + publisher = module.get(EventPublisher); service = module.get( PlayNextTrackOrSilenceHandler ); @@ -58,6 +60,7 @@ describe('PlayNextTrackOrSilenceHandler', () => { test('#execute triggers download an play track if is in queue', async () => { channelRepository.findOrCreate = jest.fn(); channelRepository.findOrCreate.mockResolvedValue({ id: '1234' } as any); + publisher.mergeObjectContext.mockImplementation((x) => x); radioFacade.downloadAndPlay.mockResolvedValue({}); @@ -77,6 +80,7 @@ describe('PlayNextTrackOrSilenceHandler', () => { queuedTrackRepository.getNextSongInQueue.mockResolvedValue(null); queuedTrackRepository.findOneOrFail = jest.fn(); queuedTrackRepository.findOneOrFail.mockResolvedValue({ id: 2 } as any); + publisher.mergeObjectContext.mockImplementation((x) => x); appConfig.trackLengthToStartOwnRadio = 40; trackRepository.countTracks = jest.fn(); diff --git a/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.ts index 9d7e0ee..b367c95 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/play-next-track-or-silence/play-next-track-or-silence.handler.ts @@ -1,4 +1,4 @@ -import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs'; +import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs'; import { ChannelRepositoryInterface, QueuedTrack, @@ -6,7 +6,6 @@ import { TrackRepositoryInterface, } from '@sdj/backend/radio/core/domain'; import { appConfig } from '@sdj/backend/shared/domain'; -import { PlaySilenceEvent } from '../../events/play-silence/play-silence.event'; import { RadioFacade } from '../../radio.facade'; import { DeleteQueuedTrackCommand } from '../delete-queued-track/delete-queued-track.command'; import { DownloadAndPlayCommand } from '../download-and-play/download-and-play.command'; @@ -17,16 +16,16 @@ import { PlayNextTrackOrSilenceCommand } from './play-next-track-or-silence.comm export class PlayNextTrackOrSilenceHandler implements ICommandHandler { constructor( - private eventBus: EventBus, private channelRepository: ChannelRepositoryInterface, + private publisher: EventPublisher, private radioFacade: RadioFacade, private queuedTrackRepository: QueuedTrackRepositoryInterface, private trackRepository: TrackRepositoryInterface ) {} async execute(command: PlayNextTrackOrSilenceCommand): Promise { - const channel = await this.channelRepository.findOrCreate( - command.channelId + const channel = this.publisher.mergeObjectContext( + await this.channelRepository.findOrCreate(command.channelId) ); const queuedTrack = await this.getNextTrack(channel.id); if (queuedTrack) { @@ -41,7 +40,9 @@ export class PlayNextTrackOrSilenceHandler await this.execute(command); } } else { - this.eventBus.publish(new PlaySilenceEvent(channel.id)); + channel.playSilence(); + channel.commit(); + await this.channelRepository.save(channel); } } diff --git a/libs/backend/radio/core/application-services/src/lib/commands/queue-track/queue-track.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/queue-track/queue-track.handler.ts index 3c2ef90..c28002c 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/queue-track/queue-track.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/queue-track/queue-track.handler.ts @@ -1,22 +1,22 @@ -import { CommandHandler, ICommandHandler } from '@nestjs/cqrs'; +import { CommandHandler, EventPublisher, ICommandHandler } from '@nestjs/cqrs'; import { ChannelRepositoryInterface, QueuedTrack, QueuedTrackRepositoryInterface, + RadioAggregate, TrackRepositoryInterface, } from '@sdj/backend/radio/core/domain'; import { appConfig } from '@sdj/backend/shared/domain'; -import { Store } from '../../ports/store.port'; import { QueueTrackCommand } from './queue-track.command'; @CommandHandler(QueueTrackCommand) export class QueueTrackHandler implements ICommandHandler { constructor( - private readonly storageService: Store, private channelRepository: ChannelRepositoryInterface, private queuedTrackRepository: QueuedTrackRepositoryInterface, - private readonly trackRepository: TrackRepositoryInterface + private readonly trackRepository: TrackRepositoryInterface, + private publisher: EventPublisher ) {} async execute(command: QueueTrackCommand): Promise { @@ -39,7 +39,8 @@ export class QueueTrackHandler implements ICommandHandler { ); } } - let queuedTrack = new QueuedTrack( + const radio = this.publisher.mergeObjectContext(new RadioAggregate()); + let queuedTrack = radio.queueTrack( track, channel, command.randomized, @@ -47,7 +48,7 @@ export class QueueTrackHandler implements ICommandHandler { ); queuedTrack = await this.queuedTrackRepository.save(queuedTrack); - await this.storageService.addToQueue(queuedTrack); + radio.commit(); return queuedTrack; } } diff --git a/libs/backend/radio/core/application-services/src/lib/commands/skip-queued-track/skip-queued-track.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/skip-queued-track/skip-queued-track.handler.ts index 7bd94c0..133253e 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/skip-queued-track/skip-queued-track.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/skip-queued-track/skip-queued-track.handler.ts @@ -1,10 +1,10 @@ import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs'; import { QueuedTrackRepositoryInterface, + QueuedTrackSkippedEvent, TrackRepositoryInterface, } from '@sdj/backend/radio/core/domain'; import { HostService } from '@sdj/backend/shared/application-services'; -import { QueuedTrackSkippedEvent } from '../../events/queued-track-skiepped/queued-track-skipped.event'; import { SkipQueuedTrackCommand } from './skip-queued-track.command'; @CommandHandler(SkipQueuedTrackCommand) diff --git a/libs/backend/radio/core/application-services/src/lib/commands/thumb-down/thumb-down.handler.ts b/libs/backend/radio/core/application-services/src/lib/commands/thumb-down/thumb-down.handler.ts index 4842b10..f19bd0d 100644 --- a/libs/backend/radio/core/application-services/src/lib/commands/thumb-down/thumb-down.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/commands/thumb-down/thumb-down.handler.ts @@ -1,12 +1,12 @@ import { CommandHandler, EventBus, ICommandHandler } from '@nestjs/cqrs'; import { QueuedTrackRepositoryInterface, + SongVotedNegativelyEvent, User, UserRepositoryInterface, Vote, VoteRepositoryInterface, } from '@sdj/backend/radio/core/domain'; -import { SongVotedNegativelyEvent } from '../../events/song-voted-negatively/song-voted-negatively.event'; import { ThumbDownCommand } from './thumb-down.command'; @CommandHandler(ThumbDownCommand) diff --git a/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.handler.ts b/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.handler.ts index 55ffa48..eee1759 100644 --- a/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.handler.ts @@ -1,38 +1,30 @@ -import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; +import { EventPublisher, EventsHandler, IEventHandler } from '@nestjs/cqrs'; import { - QueuedTrack, + ChannelRepositoryInterface, + PlayQueuedTrackEvent, QueuedTrackRepositoryInterface, } from '@sdj/backend/radio/core/domain'; -import { Store } from '../../ports/store.port'; -import { PlayQueuedTrackEvent } from './play-queued-track.event'; @EventsHandler(PlayQueuedTrackEvent) export class PlayQueuedTrackHandler implements IEventHandler { constructor( - private readonly storageService: Store, - private queuedTrackRepository: QueuedTrackRepositoryInterface + private queuedTrackRepository: QueuedTrackRepositoryInterface, + private channelRepository: ChannelRepositoryInterface, + private publisher: EventPublisher ) {} - async handle(event: PlayQueuedTrackEvent): Promise { + async handle(event: PlayQueuedTrackEvent): Promise { const queuedTrack = await this.queuedTrackRepository.findOneOrFail( event.queuedTrackId ); const channelId = queuedTrack.playedIn.id; - const prevTrack = await this.storageService.getCurrentTrack(channelId); - if (prevTrack) { - await this.storageService.removeFromQueue(prevTrack); - } - await this.storageService.setCurrentTrack(channelId, queuedTrack); - await this.updateQueuedTrackPlayedAt(queuedTrack); - return this.storageService.setSilenceCount(channelId, 0); - } - - updateQueuedTrackPlayedAt( - queuedTrack: QueuedTrack, - playedAt?: Date - ): Promise { - queuedTrack.playedAt = playedAt || new Date(); - return this.queuedTrackRepository.save(queuedTrack); + const channel = this.publisher.mergeObjectContext( + await this.channelRepository.findOrFail(channelId) + ); + channel.play(queuedTrack); + await this.channelRepository.save(channel); + await this.queuedTrackRepository.save(queuedTrack); + channel.commit(); } } diff --git a/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.handler.ts b/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.handler.ts deleted file mode 100644 index b72e0f0..0000000 --- a/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.handler.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { Store } from '../../ports/store.port'; -import { PlaySilenceEvent } from './play-silence.event'; - -@EventsHandler(PlaySilenceEvent) -export class PlaySilenceHandler implements IEventHandler { - constructor(private readonly storageService: Store) {} - - async handle(command: PlaySilenceEvent): Promise { - const channelId = command.channelId; - let count = await this.storageService.getSilenceCount(channelId); - count++; - await this.storageService.setSilenceCount(channelId, count); - const prevTrack = await this.storageService.getCurrentTrack(channelId); - if (prevTrack) { - await this.storageService.removeFromQueue(prevTrack); - } - return this.storageService.setCurrentTrack(channelId, null); - } -} diff --git a/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.handler.ts b/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.handler.ts index 6ac0b96..eb19f49 100644 --- a/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.handler.ts +++ b/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.handler.ts @@ -1,12 +1,12 @@ import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; import { QueuedTrackRepositoryInterface, + SongVotedNegativelyEvent, VoteRepositoryInterface, } from '@sdj/backend/radio/core/domain'; import { appConfig } from '@sdj/backend/shared/domain'; import { SkipQueuedTrackCommand } from '../../commands/skip-queued-track/skip-queued-track.command'; import { RadioFacade } from '../../radio.facade'; -import { SongVotedNegativelyEvent } from './song-voted-negatively.event'; @EventsHandler(SongVotedNegativelyEvent) export class SongVotedNegativelyHandler diff --git a/libs/backend/radio/core/application-services/src/lib/ports/store.port.ts b/libs/backend/radio/core/application-services/src/lib/ports/store.port.ts deleted file mode 100644 index 45df59f..0000000 --- a/libs/backend/radio/core/application-services/src/lib/ports/store.port.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { QueuedTrack } from '@sdj/backend/radio/core/domain'; -import { Observable } from 'rxjs'; - -export abstract class Store { - abstract async addToQueue(queuedTrack: QueuedTrack): Promise; - - abstract channelAppears(channelId: string): Promise; - - abstract channelDisappears(channelId: string): void; - - abstract async getCurrentTrack( - channelId: string - ): Promise; - - abstract getQueue(channelId: string): Observable; - - abstract async getSilenceCount(channelId: string): Promise; - - abstract isChannelActive(channelId: string): boolean; - - abstract async removeFromQueue(queuedTrack: QueuedTrack): Promise; - - abstract async setCurrentTrack( - channelId: string, - queuedTrack: QueuedTrack | null - ): Promise; - - abstract async setSilenceCount( - channelId: string, - value: number - ): Promise; -} diff --git a/libs/backend/radio/core/application-services/src/lib/radio.facade.ts b/libs/backend/radio/core/application-services/src/lib/radio.facade.ts index ba70ecf..616f943 100644 --- a/libs/backend/radio/core/application-services/src/lib/radio.facade.ts +++ b/libs/backend/radio/core/application-services/src/lib/radio.facade.ts @@ -1,7 +1,7 @@ import { Injectable } from '@nestjs/common'; import { CommandBus, EventBus, QueryBus } from '@nestjs/cqrs'; -import { QueuedTrack } from '@sdj/backend/radio/core/domain'; +import { PozdroEvent, QueuedTrack } from '@sdj/backend/radio/core/domain'; import { AddTrackToQueueCommand } from './commands/add-track-to-queue/add-track-to-queue.command'; import { DeleteQueuedTrackCommand } from './commands/delete-queued-track/delete-queued-track.command'; import { DownloadAndPlayCommand } from './commands/download-and-play/download-and-play.command'; @@ -16,7 +16,6 @@ import { SetChannelDefaultStreamCommand } from './commands/set-channel-default-s import { SkipQueuedTrackCommand } from './commands/skip-queued-track/skip-queued-track.command'; import { ThumbDownCommand } from './commands/thumb-down/thumb-down.command'; import { ThumbUpCommand } from './commands/thumb-up/thumb-up.command'; -import { PozdroEvent } from './events/pozdro/pozdro.event'; import { GetChannelsQuery } from './queries/get-channels/get-channels.query'; import { GetChannelsReadModel } from './queries/get-channels/get-channels.read-model'; diff --git a/libs/backend/radio/core/domain/src/index.ts b/libs/backend/radio/core/domain/src/index.ts index 9d76f78..908c4e4 100644 --- a/libs/backend/radio/core/domain/src/index.ts +++ b/libs/backend/radio/core/domain/src/index.ts @@ -7,8 +7,15 @@ export * from './lib/events/channel-started.event'; export * from './lib/events/channel-updated.event'; export * from './lib/events/channel-will-start.event'; export * from './lib/events/channel-will-stop.event'; +export * from './lib/events/play-queued-track.event'; +export * from './lib/events/play-silence.event'; +export * from './lib/events/pozdro.event'; +export * from './lib/events/queued-track-skipped.event'; +export * from './lib/events/song-voted-negatively.event'; +export * from './lib/events/queue-changed.event'; export * from './lib/events/user-joined-channel.event'; export * from './lib/events/user-leaved-channel.event'; +export * from './lib/radio.aggregate'; export * from './lib/repositories/channel-repository.interface'; export * from './lib/repositories/queued-track-repository.interface'; export * from './lib/repositories/track-repository.interface'; diff --git a/libs/backend/radio/core/domain/src/lib/entities/channel.entity.ts b/libs/backend/radio/core/domain/src/lib/entities/channel.entity.ts index d215bb6..eba7153 100644 --- a/libs/backend/radio/core/domain/src/lib/entities/channel.entity.ts +++ b/libs/backend/radio/core/domain/src/lib/entities/channel.entity.ts @@ -1,16 +1,29 @@ import { AggregateRoot } from '@nestjs/cqrs'; -import { Column, Entity, OneToMany, PrimaryColumn } from 'typeorm'; +import { + Column, + Entity, + JoinColumn, + OneToMany, + OneToOne, + PrimaryColumn, +} from 'typeorm'; +import { QueueChangedEvent } from '../..'; import { ChannelStartedEvent } from '../events/channel-started.event'; import { ChannelStoppedEvent } from '../events/channel-stopped.event'; import { ChannelUpdatedEvent } from '../events/channel-updated.event'; import { ChannelWillStartEvent } from '../events/channel-will-start.event'; import { ChannelWillStopEvent } from '../events/channel-will-stop.event'; +import { PlaySilenceEvent } from '../events/play-silence.event'; import { UserJoinedChannelEvent } from '../events/user-joined-channel.event'; import { UserLeavedChannelEvent } from '../events/user-leaved-channel.event'; import { QueuedTrack } from './queued-track.entity'; @Entity() export class Channel extends AggregateRoot { + @OneToOne((type) => QueuedTrack, (queuedTrack) => queuedTrack.playedIn) + @JoinColumn() + currentTrack: QueuedTrack; + @Column('varchar', { length: 200, nullable: true, @@ -86,4 +99,15 @@ export class Channel extends AggregateRoot { this.apply(new ChannelStoppedEvent(this.id)); this.apply(new ChannelUpdatedEvent(this.id)); } + + play(queuedTrack: QueuedTrack): void { + this.currentTrack = queuedTrack; + queuedTrack.play(); + this.apply(new QueueChangedEvent(this.id)); + } + + playSilence(): void { + this.currentTrack = null; + this.apply(new PlaySilenceEvent(this.id)); + } } diff --git a/libs/backend/radio/core/domain/src/lib/entities/queued-track.entity.ts b/libs/backend/radio/core/domain/src/lib/entities/queued-track.entity.ts index 9b1ac3e..997293f 100644 --- a/libs/backend/radio/core/domain/src/lib/entities/queued-track.entity.ts +++ b/libs/backend/radio/core/domain/src/lib/entities/queued-track.entity.ts @@ -64,4 +64,8 @@ export class QueuedTrack implements IQueuedTrack { this.track = track; this.randomized = randomized; } + + play(): void { + this.playedAt = new Date(); + } } diff --git a/libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.event.ts b/libs/backend/radio/core/domain/src/lib/events/play-queued-track.event.ts similarity index 100% rename from libs/backend/radio/core/application-services/src/lib/events/play-queued-track/play-queued-track.event.ts rename to libs/backend/radio/core/domain/src/lib/events/play-queued-track.event.ts diff --git a/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.event.ts b/libs/backend/radio/core/domain/src/lib/events/play-silence.event.ts similarity index 78% rename from libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.event.ts rename to libs/backend/radio/core/domain/src/lib/events/play-silence.event.ts index 8698830..31b135a 100644 --- a/libs/backend/radio/core/application-services/src/lib/events/play-silence/play-silence.event.ts +++ b/libs/backend/radio/core/domain/src/lib/events/play-silence.event.ts @@ -1,5 +1,8 @@ import { ICommand } from '@nestjs/cqrs'; +/** + * @ToDo rename to play radio + */ export class PlaySilenceEvent implements ICommand { constructor(public channelId: string) {} } diff --git a/libs/backend/radio/core/application-services/src/lib/events/pozdro/pozdro.event.ts b/libs/backend/radio/core/domain/src/lib/events/pozdro.event.ts similarity index 100% rename from libs/backend/radio/core/application-services/src/lib/events/pozdro/pozdro.event.ts rename to libs/backend/radio/core/domain/src/lib/events/pozdro.event.ts diff --git a/libs/backend/radio/core/domain/src/lib/events/queue-changed.event.ts b/libs/backend/radio/core/domain/src/lib/events/queue-changed.event.ts new file mode 100644 index 0000000..f9cc4a3 --- /dev/null +++ b/libs/backend/radio/core/domain/src/lib/events/queue-changed.event.ts @@ -0,0 +1,3 @@ +export class QueueChangedEvent { + constructor(public channelId: string) {} +} diff --git a/libs/backend/radio/core/application-services/src/lib/events/queued-track-skiepped/queued-track-skipped.event.ts b/libs/backend/radio/core/domain/src/lib/events/queued-track-skipped.event.ts similarity index 100% rename from libs/backend/radio/core/application-services/src/lib/events/queued-track-skiepped/queued-track-skipped.event.ts rename to libs/backend/radio/core/domain/src/lib/events/queued-track-skipped.event.ts diff --git a/libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.event.ts b/libs/backend/radio/core/domain/src/lib/events/song-voted-negatively.event.ts similarity index 100% rename from libs/backend/radio/core/application-services/src/lib/events/song-voted-negatively/song-voted-negatively.event.ts rename to libs/backend/radio/core/domain/src/lib/events/song-voted-negatively.event.ts diff --git a/libs/backend/radio/core/domain/src/lib/radio.aggregate.ts b/libs/backend/radio/core/domain/src/lib/radio.aggregate.ts new file mode 100644 index 0000000..866c09b --- /dev/null +++ b/libs/backend/radio/core/domain/src/lib/radio.aggregate.ts @@ -0,0 +1,21 @@ +import { AggregateRoot } from '@nestjs/cqrs'; +import { + Channel, + QueuedTrack, + Track, + User, +} from '@sdj/backend/radio/core/domain'; +import { QueueChangedEvent } from './events/queue-changed.event'; + +export class RadioAggregate extends AggregateRoot { + queueTrack( + track: Track, + channel: Channel, + randomized: boolean = false, + user?: User + ): QueuedTrack { + const queuedTrack = new QueuedTrack(track, channel, randomized, user); + this.apply(new QueueChangedEvent(channel.id)); + return queuedTrack; + } +} diff --git a/libs/backend/radio/core/domain/src/lib/repositories/queued-track-repository.interface.ts b/libs/backend/radio/core/domain/src/lib/repositories/queued-track-repository.interface.ts index 4e4e630..fb559ce 100644 --- a/libs/backend/radio/core/domain/src/lib/repositories/queued-track-repository.interface.ts +++ b/libs/backend/radio/core/domain/src/lib/repositories/queued-track-repository.interface.ts @@ -25,4 +25,6 @@ export abstract class QueuedTrackRepositoryInterface { abstract createQueryBuilder(alias: string): QueryBuilder; abstract remove(queuedTrack: QueuedTrack): Promise; + + abstract getQueue(channelId: string): Promise; } diff --git a/libs/backend/radio/feature/src/lib/backend-radio-feature.module.ts b/libs/backend/radio/feature/src/lib/backend-radio-feature.module.ts index ce395ad..ef9430e 100644 --- a/libs/backend/radio/feature/src/lib/backend-radio-feature.module.ts +++ b/libs/backend/radio/feature/src/lib/backend-radio-feature.module.ts @@ -3,7 +3,6 @@ import { BackendRadioCoreApplicationServicesModule, TrackService, } from '@sdj/backend/radio/core/application-services'; -import { BackendRadioInfrastructureModule } from '@sdj/backend/radio/infrastructure'; import { BackendRadioInfrastructureMp3GainModule } from '@sdj/backend/radio/infrastructure-mp3-gain'; import { BackendRadioInfrastructureSlackApiModule } from '@sdj/backend/radio/infrastructure-slack-api'; import { BackendRadioInfrastructureTypeormModule } from '@sdj/backend/radio/infrastructure-typeorm'; @@ -14,7 +13,6 @@ import { TrackServiceAdapter } from './adapters/track-service.adapter'; @Module({ imports: [ BackendRadioCoreApplicationServicesModule, - BackendRadioInfrastructureModule, BackendRadioInfrastructureYoutubeApiModule, BackendRadioInfrastructureMp3GainModule, BackendRadioInfrastructureTypeormModule, @@ -24,7 +22,6 @@ import { TrackServiceAdapter } from './adapters/track-service.adapter'; exports: [ TrackService, BackendRadioCoreApplicationServicesModule, - BackendRadioInfrastructureModule, BackendRadioInfrastructureYoutubeApiModule, BackendRadioInfrastructureTypeormModule, BackendRadioInfrastructureSlackApiModule, diff --git a/libs/backend/radio/infrastructure-typeorm/src/lib/repositories/typeorm-queued-track.repository.ts b/libs/backend/radio/infrastructure-typeorm/src/lib/repositories/typeorm-queued-track.repository.ts index 893732a..e3879be 100644 --- a/libs/backend/radio/infrastructure-typeorm/src/lib/repositories/typeorm-queued-track.repository.ts +++ b/libs/backend/radio/infrastructure-typeorm/src/lib/repositories/typeorm-queued-track.repository.ts @@ -1,6 +1,5 @@ -import { forwardRef, Inject, Injectable } from '@nestjs/common'; +import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { Store } from '@sdj/backend/radio/core/application-services'; import { QueuedTrack, QueuedTrackRepositoryInterface, @@ -10,8 +9,6 @@ import { QueryBuilder, Repository } from 'typeorm'; @Injectable() export class TypeormQueuedTrackRepository extends QueuedTrackRepositoryInterface { constructor( - @Inject(forwardRef(() => Store)) - private store: Store, @InjectRepository(QueuedTrack) private typeOrmRepository: Repository ) { @@ -53,21 +50,39 @@ export class TypeormQueuedTrackRepository extends QueuedTrackRepositoryInterface } async getCurrentTrack(channelId: string): Promise { - return this.store.getCurrentTrack(channelId); + return this.typeOrmRepository + .createQueryBuilder('queuedTrack') + .where('queuedTrack.playedIn = :channelId') + .innerJoin( + 'queuedTrack.playedIn', + 'channel', + 'queuedTrack.id = channel.currentTrack' + ) + .setParameter('channelId', channelId) + .getOne(); } getNextSongInQueue(channelId: string): Promise { - return ( - this.typeOrmRepository - .createQueryBuilder('queuedTrack') - // .addSelect('max(queuedTrack.id)') - .where('queuedTrack.playedIn = :channelId') - .leftJoinAndSelect('queuedTrack.track', 'track') - .andWhere('queuedTrack.playedAt IS NULL') - .orderBy('queuedTrack.order, queuedTrack.id', 'ASC') - .setParameter('channelId', channelId) - .getOne() - ); + return this.typeOrmRepository + .createQueryBuilder('queuedTrack') + .where('queuedTrack.playedIn = :channelId') + .leftJoinAndSelect('queuedTrack.track', 'track') + .andWhere('queuedTrack.playedAt IS NULL') + .orderBy('queuedTrack.order, queuedTrack.id', 'ASC') + .setParameter('channelId', channelId) + .getOne(); + } + + getQueue(channelId: string): Promise { + return this.typeOrmRepository + .createQueryBuilder('queuedTrack') + .where('queuedTrack.playedIn = :channelId') + .innerJoinAndSelect('queuedTrack.track', 'track') + .leftJoinAndSelect('queuedTrack.addedBy', 'addedBy ') + .andWhere('queuedTrack.playedAt IS NULL') + .orderBy('queuedTrack.createdAt') + .setParameter('channelId', channelId) + .getMany(); } remove(queuedTrack: QueuedTrack): Promise { diff --git a/libs/backend/radio/infrastructure/README.md b/libs/backend/radio/infrastructure/README.md deleted file mode 100644 index d8c4560..0000000 --- a/libs/backend/radio/infrastructure/README.md +++ /dev/null @@ -1,7 +0,0 @@ -# backend-radio-infrastructure - -This library was generated with [Nx](https://nx.dev). - -## Running unit tests - -Run `ng test backend-radio-infrastructure` to execute the unit tests via [Jest](https://jestjs.io). diff --git a/libs/backend/radio/infrastructure/jest.config.js b/libs/backend/radio/infrastructure/jest.config.js deleted file mode 100644 index 3f86d69..0000000 --- a/libs/backend/radio/infrastructure/jest.config.js +++ /dev/null @@ -1,10 +0,0 @@ -module.exports = { - name: 'backend-radio-infrastructure', - preset: '../../../../jest.config.js', - transform: { - '^.+\\.[tj]sx?$': 'ts-jest', - }, - moduleFileExtensions: ['ts', 'tsx', 'js', 'jsx', 'html'], - coverageDirectory: '../../../../coverage/libs/backend/radio/infrastructure', - globals: { 'ts-jest': { tsConfig: '/tsconfig.spec.json' } }, -}; diff --git a/libs/backend/radio/infrastructure/src/index.ts b/libs/backend/radio/infrastructure/src/index.ts deleted file mode 100644 index 85ff23c..0000000 --- a/libs/backend/radio/infrastructure/src/index.ts +++ /dev/null @@ -1,2 +0,0 @@ -export * from './lib/store.adapter'; -export * from './lib/backend-radio-infrastructure.module'; diff --git a/libs/backend/radio/infrastructure/src/lib/backend-radio-infrastructure.module.ts b/libs/backend/radio/infrastructure/src/lib/backend-radio-infrastructure.module.ts deleted file mode 100644 index d624c1c..0000000 --- a/libs/backend/radio/infrastructure/src/lib/backend-radio-infrastructure.module.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Module } from '@nestjs/common'; -import { Store } from '@sdj/backend/radio/core/application-services'; -import { StoreAdapter } from './store.adapter'; - -@Module({ - providers: [StoreAdapter, { provide: Store, useExisting: StoreAdapter }], - exports: [Store], -}) -export class BackendRadioInfrastructureModule {} diff --git a/libs/backend/radio/infrastructure/src/lib/store.adapter.ts b/libs/backend/radio/infrastructure/src/lib/store.adapter.ts deleted file mode 100644 index 0b5974a..0000000 --- a/libs/backend/radio/infrastructure/src/lib/store.adapter.ts +++ /dev/null @@ -1,133 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { Store } from '@sdj/backend/radio/core/application-services'; -import { - QueuedTrack, - QueuedTrackRepositoryInterface, -} from '@sdj/backend/radio/core/domain'; -import { BehaviorSubject, Observable, of } from 'rxjs'; -import { distinctUntilChanged, filter, first, map } from 'rxjs/operators'; - -interface ChannelState { - silenceCount: number; - queue: QueuedTrack[]; - currentTrack: QueuedTrack | null; -} - -interface State { - [key: string]: ChannelState; -} - -const initialChannelState = { silenceCount: 0, queue: [], currentTrack: null }; -const initialPlaylistState = {}; - -@Injectable() -export class StoreAdapter implements Store { - get state(): State { - return this._state.getValue(); - } - - private _state: BehaviorSubject = new BehaviorSubject( - initialPlaylistState - ); - - constructor( - private readonly queuedTrackRepository: QueuedTrackRepositoryInterface - ) {} - - async addToQueue(queuedTrack: QueuedTrack): Promise { - const channelState = await this.getChannelState(queuedTrack.playedIn.id); - const isTrackAlreadyInQueue = - channelState.queue.findIndex( - (trackInQueue: QueuedTrack) => trackInQueue.id === queuedTrack.id - ) !== -1; - if (!isTrackAlreadyInQueue) { - this._state.next({ - ...this.state, - [queuedTrack.playedIn.id]: { - ...channelState, - queue: channelState.queue.concat(queuedTrack), - }, - }); - } - } - - channelAppears(channelId: string): Promise { - return (!!this.state && !!this.state[channelId] - ? of() - : this._state.pipe( - filter((state) => !!state[channelId]), - first() - ) - ).toPromise(); - } - - channelDisappears(channelId: string): void { - const state = this.state; - delete state[channelId]; - this._state.next(state); - } - - async getCurrentTrack(channelId: string): Promise { - return (await this.getChannelState(channelId)).currentTrack; - } - - getQueue(channelId: string): Observable { - return this._state.pipe( - map((state: State) => state[channelId]), - filter(Boolean), - map((state: ChannelState) => state.queue), - distinctUntilChanged() - ); - } - - async getSilenceCount(channelId: string): Promise { - await this.getChannelState(channelId); - return this.state[channelId].silenceCount; - } - - isChannelActive(channelId: string): boolean { - return !!this.state[channelId]; - } - - async setCurrentTrack( - channelId: string, - queuedTrack: QueuedTrack | null - ): Promise { - const channelState = await this.getChannelState(channelId); - this._state.next({ - ...this.state, - [channelId]: { ...channelState, currentTrack: queuedTrack }, - }); - } - - async setSilenceCount(channelId: string, value: number): Promise { - const channelState = await this.getChannelState(channelId); - this._state.next({ - ...this.state, - [channelId]: { ...channelState, silenceCount: value }, - }); - } - - async removeFromQueue(queuedTrack: QueuedTrack): Promise { - const channelState = await this.getChannelState(queuedTrack.playedIn.id); - this._state.next({ - ...this.state, - [queuedTrack.playedIn.id]: { - ...channelState, - queue: channelState.queue.filter( - (qTrack) => qTrack.id !== queuedTrack.id - ), - }, - }); - } - - private async getChannelState(channelId: string): Promise { - if (!this.state[channelId]) { - this.state[channelId] = { - ...initialChannelState, - queue: await this.queuedTrackRepository.findQueuedTracks(channelId), - }; - } - return this.state[channelId]; - } -} diff --git a/libs/backend/radio/infrastructure/tsconfig.json b/libs/backend/radio/infrastructure/tsconfig.json deleted file mode 100644 index d32f55e..0000000 --- a/libs/backend/radio/infrastructure/tsconfig.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "extends": "../../../../tsconfig.base.json", - "compilerOptions": { - "types": ["node", "jest"] - }, - "include": [], - "files": [], - "references": [ - { - "path": "./tsconfig.lib.json" - }, - { - "path": "./tsconfig.spec.json" - } - ] -} diff --git a/libs/backend/radio/infrastructure/tsconfig.lib.json b/libs/backend/radio/infrastructure/tsconfig.lib.json deleted file mode 100644 index a174cb0..0000000 --- a/libs/backend/radio/infrastructure/tsconfig.lib.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "extends": "./tsconfig.json", - "compilerOptions": { - "outDir": "../../../../dist/out-tsc", - "types": [] - }, - "exclude": ["**/*.spec.ts"], - "include": ["**/*.ts"] -} diff --git a/libs/backend/radio/infrastructure/tsconfig.spec.json b/libs/backend/radio/infrastructure/tsconfig.spec.json deleted file mode 100644 index 8119bae..0000000 --- a/libs/backend/radio/infrastructure/tsconfig.spec.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "extends": "./tsconfig.json", - "compilerOptions": { - "outDir": "../../../../dist/out-tsc", - "module": "commonjs", - "types": ["jest", "node"] - }, - "include": [ - "**/*.spec.ts", - "**/*.spec.tsx", - "**/*.spec.js", - "**/*.spec.jsx", - "**/*.d.ts" - ] -} diff --git a/libs/backend/radio/infrastructure/tslint.json b/libs/backend/radio/infrastructure/tslint.json deleted file mode 100644 index e9dbc53..0000000 --- a/libs/backend/radio/infrastructure/tslint.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "extends": "../../../../tslint.json", - "rules": {}, - "linterOptions": { - "exclude": ["!**/*"] - } -} diff --git a/libs/backend/radio/ui-redis/src/lib/events/play-queued-track/ices-play-queued-track.handler.ts b/libs/backend/radio/ui-redis/src/lib/events/play-queued-track/ices-play-queued-track.handler.ts index f0ad263..fa4d9e8 100644 --- a/libs/backend/radio/ui-redis/src/lib/events/play-queued-track/ices-play-queued-track.handler.ts +++ b/libs/backend/radio/ui-redis/src/lib/events/play-queued-track/ices-play-queued-track.handler.ts @@ -1,6 +1,8 @@ import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { PlayQueuedTrackEvent } from '@sdj/backend/radio/core/application-services'; -import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain'; +import { + PlayQueuedTrackEvent, + QueuedTrackRepositoryInterface, +} from '@sdj/backend/radio/core/domain'; import { RedisService } from '../../services/redis.service'; @EventsHandler(PlayQueuedTrackEvent) diff --git a/libs/backend/radio/ui-redis/src/lib/events/play-silence/ices-play-silence.handler.ts b/libs/backend/radio/ui-redis/src/lib/events/play-silence/ices-play-silence.handler.ts index aa0a751..6af92ab 100644 --- a/libs/backend/radio/ui-redis/src/lib/events/play-silence/ices-play-silence.handler.ts +++ b/libs/backend/radio/ui-redis/src/lib/events/play-silence/ices-play-silence.handler.ts @@ -1,5 +1,5 @@ import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { PlaySilenceEvent } from '@sdj/backend/radio/core/application-services'; +import { PlaySilenceEvent } from '@sdj/backend/radio/core/domain'; import { RedisService } from '../../services/redis.service'; @EventsHandler(PlaySilenceEvent) diff --git a/libs/backend/radio/ui-slack/src/lib/bot/lib/commands/pozdro.slack-command.ts b/libs/backend/radio/ui-slack/src/lib/bot/lib/commands/pozdro.slack-command.ts index 0022e84..c2b6e83 100644 --- a/libs/backend/radio/ui-slack/src/lib/bot/lib/commands/pozdro.slack-command.ts +++ b/libs/backend/radio/ui-slack/src/lib/bot/lib/commands/pozdro.slack-command.ts @@ -1,9 +1,9 @@ import { Injectable, Logger } from '@nestjs/common'; +import { RadioFacade } from '@sdj/backend/radio/core/application-services'; import { PozdroEvent, - RadioFacade, -} from '@sdj/backend/radio/core/application-services'; -import { UserRepositoryInterface } from '@sdj/backend/radio/core/domain'; + UserRepositoryInterface, +} from '@sdj/backend/radio/core/domain'; import { SlackCommand, SlackCommandHandler, diff --git a/libs/backend/radio/ui-slack/src/lib/bot/lib/events/queued-track-skipped/slack-queued-track-skipped.handler.ts b/libs/backend/radio/ui-slack/src/lib/bot/lib/events/queued-track-skipped/slack-queued-track-skipped.handler.ts index ece1681..d904688 100644 --- a/libs/backend/radio/ui-slack/src/lib/bot/lib/events/queued-track-skipped/slack-queued-track-skipped.handler.ts +++ b/libs/backend/radio/ui-slack/src/lib/bot/lib/events/queued-track-skipped/slack-queued-track-skipped.handler.ts @@ -1,6 +1,8 @@ import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { QueuedTrackSkippedEvent } from '@sdj/backend/radio/core/application-services'; -import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain'; +import { + QueuedTrackRepositoryInterface, + QueuedTrackSkippedEvent, +} from '@sdj/backend/radio/core/domain'; import { SlackService } from '@sikora00/nestjs-slack-bot'; @EventsHandler(QueuedTrackSkippedEvent) diff --git a/libs/backend/radio/ui-web-socket/src/lib/events/play-queued-track/ws-play-queued-track.handler.ts b/libs/backend/radio/ui-web-socket/src/lib/events/play-queued-track/ws-play-queued-track.handler.ts index 750ba66..71533f9 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/events/play-queued-track/ws-play-queued-track.handler.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/events/play-queued-track/ws-play-queued-track.handler.ts @@ -1,7 +1,9 @@ import { Logger } from '@nestjs/common'; import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { PlayQueuedTrackEvent } from '@sdj/backend/radio/core/application-services'; -import { QueuedTrackRepositoryInterface } from '@sdj/backend/radio/core/domain'; +import { + PlayQueuedTrackEvent, + QueuedTrackRepositoryInterface, +} from '@sdj/backend/radio/core/domain'; import { WebSocketEvents } from '@sdj/shared/domain'; import { Gateway } from '../../gateway/gateway'; diff --git a/libs/backend/radio/ui-web-socket/src/lib/events/play-silence/ws-play-silence.handler.ts b/libs/backend/radio/ui-web-socket/src/lib/events/play-silence/ws-play-silence.handler.ts index a6bad05..470184d 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/events/play-silence/ws-play-silence.handler.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/events/play-silence/ws-play-silence.handler.ts @@ -1,27 +1,16 @@ import { Logger } from '@nestjs/common'; import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { - PlaySilenceEvent, - Store, -} from '@sdj/backend/radio/core/application-services'; +import { PlaySilenceEvent } from '@sdj/backend/radio/core/domain'; import { WebSocketEvents } from '@sdj/shared/domain'; import { Gateway } from '../../gateway/gateway'; @EventsHandler(PlaySilenceEvent) export class WsPlaySilenceHandler implements IEventHandler { - constructor( - private logger: Logger, - private gateway: Gateway, - private readonly storageService: Store - ) {} + constructor(private logger: Logger, private gateway: Gateway) {} async handle(command: PlaySilenceEvent): Promise { const channelId = command.channelId; - let count = await this.storageService.getSilenceCount(channelId); - count++; - if (count > 1) { - this.logger.log('radio', channelId); - this.gateway.server.in(channelId).emit(WebSocketEvents.playRadio); - } + this.logger.log('radio', channelId); + this.gateway.server.in(channelId).emit(WebSocketEvents.playRadio); } } diff --git a/libs/backend/radio/ui-web-socket/src/lib/events/pozdro/ws-pozdro.handler.ts b/libs/backend/radio/ui-web-socket/src/lib/events/pozdro/ws-pozdro.handler.ts index cfe2f6b..9792b86 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/events/pozdro/ws-pozdro.handler.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/events/pozdro/ws-pozdro.handler.ts @@ -1,6 +1,6 @@ import { Logger } from '@nestjs/common'; import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; -import { PozdroEvent } from '@sdj/backend/radio/core/application-services'; +import { PozdroEvent } from '@sdj/backend/radio/core/domain'; import { WebSocketEvents } from '@sdj/shared/domain'; import { Gateway } from '../../gateway/gateway'; diff --git a/libs/backend/radio/ui-web-socket/src/lib/events/queue-changed/ws-queued-changed.handler.ts b/libs/backend/radio/ui-web-socket/src/lib/events/queue-changed/ws-queued-changed.handler.ts new file mode 100644 index 0000000..367dcae --- /dev/null +++ b/libs/backend/radio/ui-web-socket/src/lib/events/queue-changed/ws-queued-changed.handler.ts @@ -0,0 +1,15 @@ +import { EventsHandler, IEventHandler } from '@nestjs/cqrs'; +import { QueueChangedEvent } from '@sdj/backend/radio/core/domain'; +import { WsQueueSynchronizationService } from '../../services/ws-queue-synchronization.service'; + +@EventsHandler(QueueChangedEvent) +export class WsQueuedChangedHandler + implements IEventHandler { + constructor( + private queueSynchronizationService: WsQueueSynchronizationService + ) {} + + async handle(event: QueueChangedEvent): Promise { + return this.queueSynchronizationService.updateQueue(event.channelId); + } +} diff --git a/libs/backend/radio/ui-web-socket/src/lib/gateway/gateway.ts b/libs/backend/radio/ui-web-socket/src/lib/gateway/gateway.ts index c30465e..e7b4b89 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/gateway/gateway.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/gateway/gateway.ts @@ -9,7 +9,6 @@ import { JoinChannelCommand, LeaveChannelCommand, RadioFacade, - Store, } from '@sdj/backend/radio/core/application-services'; import { ChannelRepositoryInterface, @@ -20,6 +19,7 @@ import { WebSocketEvents } from '@sdj/shared/domain'; import { Observable, Subject } from 'rxjs'; import { map, takeUntil } from 'rxjs/operators'; import { Server, Socket } from 'socket.io'; +import { WsQueueSynchronizationService } from '../services/ws-queue-synchronization.service'; @WebSocketGateway() export class Gateway implements OnGatewayDisconnect { @@ -29,7 +29,7 @@ export class Gateway implements OnGatewayDisconnect { constructor( private readonly channelRepository: ChannelRepositoryInterface, private hostService: HostService, - private readonly storageService: Store, + private queueSynchronizationService: WsQueueSynchronizationService, private radioFacade: RadioFacade ) {} @@ -75,12 +75,14 @@ export class Gateway implements OnGatewayDisconnect { this.clientInRoomSubjects[client.id].complete(); } this.clientInRoomSubjects[client.id] = new Subject(); - return this.storageService.getQueue(JSON.parse(channel)).pipe( - takeUntil(this.clientInRoomSubjects[client.id]), - map((list) => { - return { event: WebSocketEvents.queuedTrackList, data: list }; - }) - ); + return this.queueSynchronizationService + .listenToQueue(JSON.parse(channel)) + .pipe( + takeUntil(this.clientInRoomSubjects[client.id]), + map((list) => { + return { event: WebSocketEvents.queuedTrackList, data: list }; + }) + ); } private async leaveOtherChannels( diff --git a/libs/backend/radio/ui-web-socket/src/lib/services/ws-queue-synchronization.service.ts b/libs/backend/radio/ui-web-socket/src/lib/services/ws-queue-synchronization.service.ts new file mode 100644 index 0000000..1313ae7 --- /dev/null +++ b/libs/backend/radio/ui-web-socket/src/lib/services/ws-queue-synchronization.service.ts @@ -0,0 +1,43 @@ +import { Injectable } from '@nestjs/common'; +import { + QueuedTrack, + QueuedTrackRepositoryInterface, +} from '@sdj/backend/radio/core/domain'; +import { BehaviorSubject, from, Observable, Subject } from 'rxjs'; +import { switchMap } from 'rxjs/operators'; + +@Injectable() +export class WsQueueSynchronizationService { + private readonly queues: Record> = {}; + + constructor(private queuedTrackRepository: QueuedTrackRepositoryInterface) {} + + /** + * @ToDO there is a place for a memory leak because we should remove unused subjects + * @param channelId + */ + listenToQueue(channelId: string): Observable { + let queue: Observable = this.queues[channelId]; + if (!queue) { + queue = this.addQueue(channelId); + } + + return queue; + } + + async updateQueue(channelId): Promise { + const queueSubject = this.queues[channelId]; + if (queueSubject) { + queueSubject.next(await this.queuedTrackRepository.getQueue(channelId)); + } + } + + private addQueue(channelId: string): Observable { + return from(this.queuedTrackRepository.getQueue(channelId)).pipe( + switchMap((queue) => { + this.queues[channelId] = new BehaviorSubject(queue); + return this.queues[channelId]; + }) + ); + } +} diff --git a/libs/backend/radio/ui-web-socket/src/lib/websocket.module.ts b/libs/backend/radio/ui-web-socket/src/lib/websocket.module.ts index a3e2ae9..857df47 100644 --- a/libs/backend/radio/ui-web-socket/src/lib/websocket.module.ts +++ b/libs/backend/radio/ui-web-socket/src/lib/websocket.module.ts @@ -6,8 +6,10 @@ import { WsChannelUpdatedHandler } from './events/channel-updated/ws-channel-upd import { WsPlayQueuedTrackHandler } from './events/play-queued-track/ws-play-queued-track.handler'; import { WsPlaySilenceHandler } from './events/play-silence/ws-play-silence.handler'; import { WsPozdroHandler } from './events/pozdro/ws-pozdro.handler'; +import { WsQueuedChangedHandler } from './events/queue-changed/ws-queued-changed.handler'; import { WsUserJoinedChannelHandler } from './events/user-joined-channel/ws-user-joined-channel.handler'; import { Gateway } from './gateway/gateway'; +import { WsQueueSynchronizationService } from './services/ws-queue-synchronization.service'; const EventsHandlers = [ WsChannelStartedHandler, @@ -15,12 +17,13 @@ const EventsHandlers = [ WsPlayQueuedTrackHandler, WsPlaySilenceHandler, WsPozdroHandler, + WsQueuedChangedHandler, WsUserJoinedChannelHandler, ]; @Module({ imports: [CqrsModule], - providers: [...EventsHandlers, Gateway], + providers: [...EventsHandlers, Gateway, WsQueueSynchronizationService], exports: [Gateway], }) export class WebSocketModule implements OnModuleInit { diff --git a/nx.json b/nx.json index 6618664..796aa75 100644 --- a/nx.json +++ b/nx.json @@ -45,9 +45,6 @@ "backend-radio-ui-slack": { "tags": ["platform:backend", "scope:radio", "type:ui"] }, - "backend-radio-infrastructure": { - "tags": ["platform:backend", "scope:radio", "type:infrastructure"] - }, "backend-shared-util-you-tube": { "tags": ["platform:backend", "scope:shared", "type:util"] }, diff --git a/package.json b/package.json index a68973b..9d74e4c 100644 --- a/package.json +++ b/package.json @@ -168,8 +168,7 @@ }, "husky": { "hooks": { - "pre-commit": "npm-run-all --parallel validate-commit validate-nx-tags format:check", - "pre-push": "npm-run-all --parallel affected:lint affected:test" + "pre-commit": "validate-commit validate-nx-tags format:check" } } } diff --git a/tsconfig.base.json b/tsconfig.base.json index ccbcc82..751d831 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -34,9 +34,6 @@ "@sdj/backend/radio/ui-slack": [ "libs/backend/radio/ui-slack/src/index.ts" ], - "@sdj/backend/radio/infrastructure": [ - "libs/backend/radio/infrastructure/src/index.ts" - ], "@sdj/backend/shared/util-you-tube": [ "libs/backend/shared/util-you-tube/src/index.ts" ],