diff --git a/src/commands/voice/Clip.ts b/src/commands/voice/Clip.ts index 5427047..63b80cc 100644 --- a/src/commands/voice/Clip.ts +++ b/src/commands/voice/Clip.ts @@ -6,18 +6,19 @@ import {ArgumentType, CommandOptions} from "../Command" import {MessageGenerator} from "../../communication/MessageGenerator" import {GuildUtils} from "../../utils/GuildUtils" import {Logger} from "../../Logger" +import {CachedStream} from "../../voice/CachedStream" export default class ClipCommand extends VoiceCommand { readonly options: CommandOptions = { name: 'Clip', keywords: ['clip'], group: 'voice', - descriptions: ['Create a clip of what a user has said'], + descriptions: ['Create a clip of what was just said! If no user is provided, the whole channel is clipped.'], arguments: [ { key: 'user', description: 'User you would like to clip', - required: true, + required: false, type: ArgumentType.USER }, { @@ -38,22 +39,34 @@ export default class ClipCommand extends VoiceCommand { default: 'Clip' } ], - examples: ['clip @Eve -l 5 -c Eve Funny Clip'] + examples: ['clip', 'clip @Eve -l 5 -c Eve Funny Clip'] } execute(context: GuildContext, source: User, args: Map, message?: Message) { + let stream: CachedStream | undefined + let author: string + let embedMessageContents: string const user: User = args.get('user') - const voiceStream = context.getProvider().getVoiceConnectionHandler().getVoiceStreamForUser(user) - if (!voiceStream) { - Logger.w(ClipCommand.name, `No audioStream for ${user.tag} [${user.id}]`, context) - context.getProvider().getResponder().error('No listening stream registered for user', message) - return + if (user) { + author = user.tag + embedMessageContents = `Recording from [${GuildUtils.createUserMentionString(user.id)}]` + stream = context.getProvider().getVoiceConnectionHandler().getVoiceStreamForUser(user) + if (!stream) { + Logger.w(ClipCommand.name, `No audioStream for ${user.tag} [${user.id}]`, context) + context.getProvider().getResponder().error('No listening stream registered for user', message) + return + } + } else { + author = context.getGuild().name + embedMessageContents = `Recording from ${context.getVoiceConnection()?.channel?.name}` + stream = context.getProvider().getVoiceConnectionHandler().getMergedVoiceStream() } + context.getProvider().getResponder().startTyping(message) - AudioUtils.convertBufferToMp3Buffer(voiceStream.getBuffer(args.get('length')), args.get('caption'), user.tag) + AudioUtils.convertBufferToMp3Buffer(stream.getCachedBuffer(args.get('length')), args.get('caption'), author) .then((buffer) => { const embedMessage = MessageGenerator - .createBasicEmbed(`Recording from [${GuildUtils.createUserMentionString(user.id)}]`) + .createBasicEmbed(embedMessageContents) const embed = MessageGenerator.attachFileToEmbed(embedMessage, buffer, `${args.get('caption')}.mp3`) context.getProvider().getResponder().send({content: embed, message: message}).then((results) => { context.getProvider().getResponder().stopTyping(message)}) diff --git a/src/commands/voice/Recite.ts b/src/commands/voice/Recite.ts index c68d6f2..5a305df 100644 --- a/src/commands/voice/Recite.ts +++ b/src/commands/voice/Recite.ts @@ -4,18 +4,19 @@ import VoiceCommand from "../../voice/VoiceCommand" import {ArgumentType, CommandOptions} from "../Command" import {Logger} from "../../Logger" import {GuildUtils} from "../../utils/GuildUtils" +import {CachedStream} from "../../voice/CachedStream" export default class ReciteCommand extends VoiceCommand { readonly options: CommandOptions = { name: 'Recite', keywords: ['recite'], group: 'voice', - descriptions: ['Recite what a user has said'], + descriptions: ['Recite what was just said! If no user is provided, the whole channel is recited.'], arguments: [ { key: 'user', description: 'User you would like to recite', - required: true, + required: false, type: ArgumentType.USER }, { @@ -35,25 +36,31 @@ export default class ReciteCommand extends VoiceCommand { type: ArgumentType.FLAG, } ], - examples: ['recite @Eve -l 8'] + examples: ['recite', 'recite @Eve -l 8'] } execute(context: GuildContext, source: User, args: Map, message?: Message) { + let stream: CachedStream | undefined const user: User = args.get('user') - const voiceStream = context.getProvider().getVoiceConnectionHandler().getVoiceStreamForUser(user) - if (!voiceStream) { - Logger.w(ReciteCommand.name, `No audioStream for ${user.tag} [${user.id}]`, context) - context.getProvider().getResponder().error('No listening stream registered for user', message) - return + if (user) { + stream = context.getProvider().getVoiceConnectionHandler().getVoiceStreamForUser(user) + if (!stream) { + Logger.w(ReciteCommand.name, `No audioStream for ${user.tag} [${user.id}]`, context) + context.getProvider().getResponder().error('No listening stream registered for user', message) + return + } + } else { + stream = context.getProvider().getVoiceConnectionHandler().getMergedVoiceStream() } - context.getProvider().getInterruptService().playRawStream(voiceStream.getRecordedStream(args.get('length'))) - if (args.get('transcribe')) { + const audioStream = stream.getCachedStream(args.get('length')) + context.getProvider().getInterruptService().playRawStream(audioStream) + if (user && args.get('transcribe')) { const speechRecognizer = context.getVoiceDependencyProvider().getSpeechRecognizer() if (!speechRecognizer) { Logger.e(ReciteCommand.name, 'No SpeechRecognizer Registered', context) return } - speechRecognizer.recognizeTextFromSpeech(voiceStream).then((transcribed) => { + speechRecognizer.recognizeTextFromSpeech(audioStream).then((transcribed) => { const transcribedMessage = `${GuildUtils.createUserMentionString(user.id)} said ${transcribed}` context.getProvider().getResponder().send({content: transcribedMessage, message: message}) }) diff --git a/src/voice/CachedStream.ts b/src/voice/CachedStream.ts new file mode 100644 index 0000000..92fbd70 --- /dev/null +++ b/src/voice/CachedStream.ts @@ -0,0 +1,13 @@ +import { Duplex } from "stream"; + +export interface CachedStream { + getCachedStream(lengthInSeconds?: number, withSilence?: boolean): Duplex + getCachedBuffer(lengthInSeconds?: number, withSilence?: boolean): Buffer +} + +export function CreateStreamFromBuffer(buffer: Buffer): Duplex { + let duplex = new Duplex(); + duplex.push(buffer); + duplex.push(null); + return duplex; +} \ No newline at end of file diff --git a/src/voice/ConnectionHandler.ts b/src/voice/ConnectionHandler.ts index ecb8990..cfbc36a 100644 --- a/src/voice/ConnectionHandler.ts +++ b/src/voice/ConnectionHandler.ts @@ -6,6 +6,8 @@ import {PassThrough} from "stream" import {CommandDispatcher} from "../commands/Dispatcher" import {GlobalContext} from "../GlobalContext" import {Logger} from "../Logger" +import MergedStream from "./MergedStream" +import {CachedStream} from "./CachedStream" const USER_REJOIN_THRESHOLD = 5000 const VOICE_COMMAND_LENGTH = 3000 @@ -15,6 +17,8 @@ const TAG = 'ConnectionHandler' export default class VoiceConnectionHandler { private readonly context: GuildContext + private readonly lowMemoryMode: boolean + private mergeStream: MergedStream = new MergedStream() private voiceStreams: Map = new Map() private removedTimeouts: Map = new Map() private isListeningToCommand: Map = new Map() @@ -22,13 +26,18 @@ export default class VoiceConnectionHandler { constructor(guildContext: GuildContext) { this.context = guildContext + this.lowMemoryMode = false // TODO: Implement a global? parameter } - getVoiceStreams(): Map { + getMergedVoiceStream(): MergedStream { + return this.mergeStream + } + + getVoiceStreams(): Map { return this.voiceStreams } - getVoiceStreamForUser(user: User): RecorderStream | undefined { + getVoiceStreamForUser(user: User): CachedStream | undefined { return this.voiceStreams.get(user.id) } @@ -50,6 +59,7 @@ export default class VoiceConnectionHandler { }) this.removedTimeouts.clear() this.isListeningToCommand.clear() + this.mergeStream.clear() } joinVoiceChannel(voiceChannel: VoiceChannel | undefined | null): Promise { @@ -160,6 +170,7 @@ export default class VoiceConnectionHandler { this.context.getVoiceConnection()?.receiver.createStream(user).emit('end') this.context.getVoiceDependencyProvider() this.context.getVoiceDependencyProvider().getHotwordEngine()?.remove(user.id) + this.mergeStream.removeStream(user.id) } private startVoiceStreamForUser(user: User) { @@ -172,11 +183,11 @@ export default class VoiceConnectionHandler { mode: 'pcm', end: 'manual' }) - const previousStream = this.getVoiceStreamForUser(user) - const recorderStream = previousStream || new RecorderStream() + const previousStream = this.voiceStreams.get(user.id) + const recorderStream = previousStream || new RecorderStream(true) audio.pipe(recorderStream, {end: false}) this.voiceStreams.set(user.id, recorderStream) - + this.mergeStream.insertStream(user.id, recorderStream) const speechRecognizer = this.context.getVoiceDependencyProvider().getSpeechRecognizer() const hotwordEngine = this.context.getVoiceDependencyProvider().getHotwordEngine() if (!speechRecognizer || !hotwordEngine) { diff --git a/src/voice/MergedStream.ts b/src/voice/MergedStream.ts new file mode 100644 index 0000000..9519c60 --- /dev/null +++ b/src/voice/MergedStream.ts @@ -0,0 +1,53 @@ +import {Duplex} from "stream" +import RecorderStream from "./RecorderStream" +import {CachedStream, CreateStreamFromBuffer} from "./CachedStream" + +const SIGNED_16_BIT_MIN = -32768 +const SIGNED_16_BIT_MAX = 32767 +export default class MergedStream implements CachedStream { + private silenceStreams: Map = new Map() + + getCachedStream(): Duplex { + return CreateStreamFromBuffer(this.getCachedBuffer()) + } + + getCachedBuffer(): Buffer { + const buffers: Buffer[] = [] + this.silenceStreams.forEach((stream: CachedStream) => { + buffers.push(stream.getCachedBuffer(undefined, true)) + }) + let maxLengthOfBuffer = 0 + buffers.forEach((buffer) => { + maxLengthOfBuffer = Math.max(maxLengthOfBuffer, buffer.length) + }) + for (let i = 0; i < buffers.length; i++) { + if (buffers[i].length < maxLengthOfBuffer) { + const difference = maxLengthOfBuffer - buffers[i].length + buffers[i] = Buffer.concat([Buffer.alloc(difference, 0), buffers[i]]) + } + } + const result = Buffer.alloc(buffers[0].length) + for (let i = 0; i < result.length; i += 2) { + let value = 0 + buffers.forEach((buffer: Buffer) => { + value += buffer.readInt16LE(i) + }) + value = Math.max(SIGNED_16_BIT_MIN, value) + value = Math.min(SIGNED_16_BIT_MAX, value) + result.writeInt16LE(value, i) + } + return result + } + + insertStream(userID: string, stream: CachedStream) { + this.silenceStreams.set(userID, stream) + } + + removeStream(userID: string) { + this.silenceStreams.delete(userID) + } + + clear() { + this.silenceStreams.clear() + } +} \ No newline at end of file diff --git a/src/voice/RecorderStream.ts b/src/voice/RecorderStream.ts index 783cae8..21b9e00 100644 --- a/src/voice/RecorderStream.ts +++ b/src/voice/RecorderStream.ts @@ -1,39 +1,80 @@ import {Duplex, Transform, TransformCallback} from "stream" +import {CachedStream, CreateStreamFromBuffer} from "./CachedStream" const MAX_BUFFER_SIZE = 500 // Buffer in seconds is approximately MAX_BUFFER_SIZE / 50 -export default class RecorderStream extends Transform { +const SAMPLING_RATE = 20 // Discord sends a chunk (if not silent) every 20 ms +const DEBOUNCE_TIME = 30 // Debounce time for inserting silence (Don't want to accidentally insert silence) +export default class RecorderStream extends Transform implements CachedStream { private rollingBuffer: Buffer[] = [] + private rollingBufferWithSilence: Buffer[] = [] + private isWriting: Boolean = false + private readonly createSilenceStream: Boolean + private silenceDebouncer: NodeJS.Timeout | undefined = undefined + + constructor(createSilenceStream: boolean = false) { + super(); + this.createSilenceStream = createSilenceStream + if (this.createSilenceStream) { + this.setupSilenceInsertion() + } + } - getRecordedStream(lengthInSeconds: number = MAX_BUFFER_SIZE / 50): Duplex { + private setupSilenceInsertion() { + setTimeout(() => { + const silenceChunk = Buffer.from(new Array(3840).fill(0)) + this.insertChunk(this.rollingBufferWithSilence, silenceChunk, true) + this.setupSilenceInsertion() + }, SAMPLING_RATE) + } + + getCachedStream(lengthInSeconds: number = MAX_BUFFER_SIZE / 50, withSilence: boolean = false): Duplex { this.isWriting = true - let duplex = new Duplex(); - duplex.push(this.getBuffer(lengthInSeconds)); - duplex.push(null); + const stream = CreateStreamFromBuffer(this.getCachedBuffer(lengthInSeconds, withSilence)) this.isWriting = false - return duplex; + return stream; } - getBuffer(lengthInSeconds: number = MAX_BUFFER_SIZE / 50): Buffer { + getCachedBuffer(lengthInSeconds: number = MAX_BUFFER_SIZE / 50, withSilence: boolean = false): Buffer { + const buffer = !withSilence ? this.rollingBuffer : this.rollingBufferWithSilence const samplesPerSecond = 48000 const bytesPerSample = 2 const channels = 2 const numberOfBytes = lengthInSeconds * samplesPerSecond * bytesPerSample * channels - if (this.rollingBuffer.length === 0) { + if (buffer.length === 0) { return Buffer.alloc(0) } - const numberOfChunks = Math.min(numberOfBytes / this.rollingBuffer[0].length, MAX_BUFFER_SIZE) - return Buffer.concat(this.rollingBuffer.slice(-numberOfChunks)) + const numberOfChunks = Math.min(numberOfBytes / buffer[0].length, MAX_BUFFER_SIZE) + return Buffer.concat(buffer.slice(-numberOfChunks)) } _transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) { - if (!this.isWriting) { - if (this.rollingBuffer.length > MAX_BUFFER_SIZE) { // Chunk size ~3840 - this.rollingBuffer.shift() - } - this.rollingBuffer.push(chunk) + this.insertChunk(this.rollingBuffer, chunk) + if (this.createSilenceStream) { + this.insertChunk(this.rollingBufferWithSilence, chunk) } this.push(chunk) callback() } + + private insertChunk(buffer: Buffer[], chunk: any, isSilenceChunk: boolean = false) { + if (!this.isWriting && !(isSilenceChunk && this.silenceDebouncer)) { + if (buffer.length > MAX_BUFFER_SIZE) { // Chunk size ~3840 + buffer.shift() + } + buffer.push(chunk) + if (!isSilenceChunk) { + this.resetSilenceDebounce() + } + } + } + + private resetSilenceDebounce() { + if (this.silenceDebouncer) { + clearTimeout(this.silenceDebouncer) + } + this.silenceDebouncer = setTimeout(() => { + this.silenceDebouncer = undefined + }, DEBOUNCE_TIME) + } }