Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Merged Streams for Channel Clipping #9

Merged
merged 3 commits into from
Dec 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions src/commands/voice/Clip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import {ArgumentType, CommandOptions} from "../Command"
import {MessageGenerator} from "../../communication/MessageGenerator"
import {GuildUtils} from "../../utils/GuildUtils"
import {Logger} from "../../Logger"
import {CachedStream} from "../../voice/CachedStream"

export default class ClipCommand extends VoiceCommand {
readonly options: CommandOptions = {
name: 'Clip',
keywords: ['clip'],
group: 'voice',
descriptions: ['Create a clip of what a user has said'],
descriptions: ['Create a clip of what was just said! If no user is provided, the whole channel is clipped.'],
arguments: [
{
key: 'user',
description: 'User you would like to clip',
required: true,
required: false,
type: ArgumentType.USER
},
{
Expand All @@ -38,22 +39,34 @@ export default class ClipCommand extends VoiceCommand {
default: 'Clip'
}
],
examples: ['clip @Eve -l 5 -c Eve Funny Clip']
examples: ['clip', 'clip @Eve -l 5 -c Eve Funny Clip']
}

execute(context: GuildContext, source: User, args: Map<string, any>, message?: Message) {
let stream: CachedStream | undefined
let author: string
let embedMessageContents: string
const user: User = args.get('user')
const voiceStream = context.getProvider().getVoiceConnectionHandler().getVoiceStreamForUser(user)
if (!voiceStream) {
Logger.w(ClipCommand.name, `No audioStream for ${user.tag} [${user.id}]`, context)
context.getProvider().getResponder().error('No listening stream registered for user', message)
return
if (user) {
author = user.tag
embedMessageContents = `Recording from [${GuildUtils.createUserMentionString(user.id)}]`
stream = context.getProvider().getVoiceConnectionHandler().getVoiceStreamForUser(user)
if (!stream) {
Logger.w(ClipCommand.name, `No audioStream for ${user.tag} [${user.id}]`, context)
context.getProvider().getResponder().error('No listening stream registered for user', message)
return
}
} else {
author = context.getGuild().name
embedMessageContents = `Recording from ${context.getVoiceConnection()?.channel?.name}`
stream = context.getProvider().getVoiceConnectionHandler().getMergedVoiceStream()
}

context.getProvider().getResponder().startTyping(message)
AudioUtils.convertBufferToMp3Buffer(voiceStream.getBuffer(args.get('length')), args.get('caption'), user.tag)
AudioUtils.convertBufferToMp3Buffer(stream.getCachedBuffer(args.get('length')), args.get('caption'), author)
.then((buffer) => {
const embedMessage = MessageGenerator
.createBasicEmbed(`Recording from [${GuildUtils.createUserMentionString(user.id)}]`)
.createBasicEmbed(embedMessageContents)
const embed = MessageGenerator.attachFileToEmbed(embedMessage, buffer, `${args.get('caption')}.mp3`)
context.getProvider().getResponder().send({content: embed, message: message}).then((results) => {
context.getProvider().getResponder().stopTyping(message)})
Expand Down
29 changes: 18 additions & 11 deletions src/commands/voice/Recite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ import VoiceCommand from "../../voice/VoiceCommand"
import {ArgumentType, CommandOptions} from "../Command"
import {Logger} from "../../Logger"
import {GuildUtils} from "../../utils/GuildUtils"
import {CachedStream} from "../../voice/CachedStream"

export default class ReciteCommand extends VoiceCommand {
readonly options: CommandOptions = {
name: 'Recite',
keywords: ['recite'],
group: 'voice',
descriptions: ['Recite what a user has said'],
descriptions: ['Recite what was just said! If no user is provided, the whole channel is recited.'],
arguments: [
{
key: 'user',
description: 'User you would like to recite',
required: true,
required: false,
type: ArgumentType.USER
},
{
Expand All @@ -35,25 +36,31 @@ export default class ReciteCommand extends VoiceCommand {
type: ArgumentType.FLAG,
}
],
examples: ['recite @Eve -l 8']
examples: ['recite', 'recite @Eve -l 8']
}

execute(context: GuildContext, source: User, args: Map<string, any>, message?: Message) {
let stream: CachedStream | undefined
const user: User = args.get('user')
const voiceStream = context.getProvider().getVoiceConnectionHandler().getVoiceStreamForUser(user)
if (!voiceStream) {
Logger.w(ReciteCommand.name, `No audioStream for ${user.tag} [${user.id}]`, context)
context.getProvider().getResponder().error('No listening stream registered for user', message)
return
if (user) {
stream = context.getProvider().getVoiceConnectionHandler().getVoiceStreamForUser(user)
if (!stream) {
Logger.w(ReciteCommand.name, `No audioStream for ${user.tag} [${user.id}]`, context)
context.getProvider().getResponder().error('No listening stream registered for user', message)
return
}
} else {
stream = context.getProvider().getVoiceConnectionHandler().getMergedVoiceStream()
}
context.getProvider().getInterruptService().playRawStream(voiceStream.getRecordedStream(args.get('length')))
if (args.get('transcribe')) {
const audioStream = stream.getCachedStream(args.get('length'))
context.getProvider().getInterruptService().playRawStream(audioStream)
if (user && args.get('transcribe')) {
const speechRecognizer = context.getVoiceDependencyProvider().getSpeechRecognizer()
if (!speechRecognizer) {
Logger.e(ReciteCommand.name, 'No SpeechRecognizer Registered', context)
return
}
speechRecognizer.recognizeTextFromSpeech(voiceStream).then((transcribed) => {
speechRecognizer.recognizeTextFromSpeech(audioStream).then((transcribed) => {
const transcribedMessage = `${GuildUtils.createUserMentionString(user.id)} said ${transcribed}`
context.getProvider().getResponder().send({content: transcribedMessage, message: message})
})
Expand Down
13 changes: 13 additions & 0 deletions src/voice/CachedStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Duplex } from "stream";

export interface CachedStream {
getCachedStream(lengthInSeconds?: number, withSilence?: boolean): Duplex
getCachedBuffer(lengthInSeconds?: number, withSilence?: boolean): Buffer
}

export function CreateStreamFromBuffer(buffer: Buffer): Duplex {
let duplex = new Duplex();
duplex.push(buffer);
duplex.push(null);
return duplex;
}
21 changes: 16 additions & 5 deletions src/voice/ConnectionHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {PassThrough} from "stream"
import {CommandDispatcher} from "../commands/Dispatcher"
import {GlobalContext} from "../GlobalContext"
import {Logger} from "../Logger"
import MergedStream from "./MergedStream"
import {CachedStream} from "./CachedStream"

const USER_REJOIN_THRESHOLD = 5000
const VOICE_COMMAND_LENGTH = 3000
Expand All @@ -15,20 +17,27 @@ const TAG = 'ConnectionHandler'

export default class VoiceConnectionHandler {
private readonly context: GuildContext
private readonly lowMemoryMode: boolean
private mergeStream: MergedStream = new MergedStream()
private voiceStreams: Map<string, RecorderStream> = new Map()
private removedTimeouts: Map<string, NodeJS.Timeout> = new Map()
private isListeningToCommand: Map<string, boolean> = new Map()
private noUsersInVoiceChannelTimeout: NodeJS.Timeout | undefined

constructor(guildContext: GuildContext) {
this.context = guildContext
this.lowMemoryMode = false // TODO: Implement a global? parameter
}

getVoiceStreams(): Map<string, RecorderStream> {
getMergedVoiceStream(): MergedStream {
return this.mergeStream
}

getVoiceStreams(): Map<string, CachedStream> {
return this.voiceStreams
}

getVoiceStreamForUser(user: User): RecorderStream | undefined {
getVoiceStreamForUser(user: User): CachedStream | undefined {
return this.voiceStreams.get(user.id)
}

Expand All @@ -50,6 +59,7 @@ export default class VoiceConnectionHandler {
})
this.removedTimeouts.clear()
this.isListeningToCommand.clear()
this.mergeStream.clear()
}

joinVoiceChannel(voiceChannel: VoiceChannel | undefined | null): Promise<void> {
Expand Down Expand Up @@ -160,6 +170,7 @@ export default class VoiceConnectionHandler {
this.context.getVoiceConnection()?.receiver.createStream(user).emit('end')
this.context.getVoiceDependencyProvider()
this.context.getVoiceDependencyProvider().getHotwordEngine()?.remove(user.id)
this.mergeStream.removeStream(user.id)
}

private startVoiceStreamForUser(user: User) {
Expand All @@ -172,11 +183,11 @@ export default class VoiceConnectionHandler {
mode: 'pcm',
end: 'manual'
})
const previousStream = this.getVoiceStreamForUser(user)
const recorderStream = previousStream || new RecorderStream()
const previousStream = this.voiceStreams.get(user.id)
const recorderStream = previousStream || new RecorderStream(true)
audio.pipe(recorderStream, {end: false})
this.voiceStreams.set(user.id, recorderStream)

this.mergeStream.insertStream(user.id, recorderStream)
const speechRecognizer = this.context.getVoiceDependencyProvider().getSpeechRecognizer()
const hotwordEngine = this.context.getVoiceDependencyProvider().getHotwordEngine()
if (!speechRecognizer || !hotwordEngine) {
Expand Down
53 changes: 53 additions & 0 deletions src/voice/MergedStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import {Duplex} from "stream"
import RecorderStream from "./RecorderStream"
import {CachedStream, CreateStreamFromBuffer} from "./CachedStream"

const SIGNED_16_BIT_MIN = -32768
const SIGNED_16_BIT_MAX = 32767
export default class MergedStream implements CachedStream {
private silenceStreams: Map<string, CachedStream> = new Map<string, RecorderStream>()

getCachedStream(): Duplex {
return CreateStreamFromBuffer(this.getCachedBuffer())
}

getCachedBuffer(): Buffer {
const buffers: Buffer[] = []
this.silenceStreams.forEach((stream: CachedStream) => {
buffers.push(stream.getCachedBuffer(undefined, true))
})
let maxLengthOfBuffer = 0
buffers.forEach((buffer) => {
maxLengthOfBuffer = Math.max(maxLengthOfBuffer, buffer.length)
})
for (let i = 0; i < buffers.length; i++) {
if (buffers[i].length < maxLengthOfBuffer) {
const difference = maxLengthOfBuffer - buffers[i].length
buffers[i] = Buffer.concat([Buffer.alloc(difference, 0), buffers[i]])
}
}
const result = Buffer.alloc(buffers[0].length)
for (let i = 0; i < result.length; i += 2) {
let value = 0
buffers.forEach((buffer: Buffer) => {
value += buffer.readInt16LE(i)
})
value = Math.max(SIGNED_16_BIT_MIN, value)
value = Math.min(SIGNED_16_BIT_MAX, value)
result.writeInt16LE(value, i)
}
return result
}

insertStream(userID: string, stream: CachedStream) {
this.silenceStreams.set(userID, stream)
}

removeStream(userID: string) {
this.silenceStreams.delete(userID)
}

clear() {
this.silenceStreams.clear()
}
}
71 changes: 56 additions & 15 deletions src/voice/RecorderStream.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,80 @@
import {Duplex, Transform, TransformCallback} from "stream"
import {CachedStream, CreateStreamFromBuffer} from "./CachedStream"

const MAX_BUFFER_SIZE = 500 // Buffer in seconds is approximately MAX_BUFFER_SIZE / 50
export default class RecorderStream extends Transform {
const SAMPLING_RATE = 20 // Discord sends a chunk (if not silent) every 20 ms
const DEBOUNCE_TIME = 30 // Debounce time for inserting silence (Don't want to accidentally insert silence)
export default class RecorderStream extends Transform implements CachedStream {
private rollingBuffer: Buffer[] = []
private rollingBufferWithSilence: Buffer[] = []

private isWriting: Boolean = false
private readonly createSilenceStream: Boolean
private silenceDebouncer: NodeJS.Timeout | undefined = undefined

constructor(createSilenceStream: boolean = false) {
super();
this.createSilenceStream = createSilenceStream
if (this.createSilenceStream) {
this.setupSilenceInsertion()
}
}

getRecordedStream(lengthInSeconds: number = MAX_BUFFER_SIZE / 50): Duplex {
private setupSilenceInsertion() {
setTimeout(() => {
const silenceChunk = Buffer.from(new Array(3840).fill(0))
this.insertChunk(this.rollingBufferWithSilence, silenceChunk, true)
this.setupSilenceInsertion()
}, SAMPLING_RATE)
}

getCachedStream(lengthInSeconds: number = MAX_BUFFER_SIZE / 50, withSilence: boolean = false): Duplex {
this.isWriting = true
let duplex = new Duplex();
duplex.push(this.getBuffer(lengthInSeconds));
duplex.push(null);
const stream = CreateStreamFromBuffer(this.getCachedBuffer(lengthInSeconds, withSilence))
this.isWriting = false
return duplex;
return stream;
}

getBuffer(lengthInSeconds: number = MAX_BUFFER_SIZE / 50): Buffer {
getCachedBuffer(lengthInSeconds: number = MAX_BUFFER_SIZE / 50, withSilence: boolean = false): Buffer {
const buffer = !withSilence ? this.rollingBuffer : this.rollingBufferWithSilence
const samplesPerSecond = 48000
const bytesPerSample = 2
const channels = 2
const numberOfBytes = lengthInSeconds * samplesPerSecond * bytesPerSample * channels
if (this.rollingBuffer.length === 0) {
if (buffer.length === 0) {
return Buffer.alloc(0)
}
const numberOfChunks = Math.min(numberOfBytes / this.rollingBuffer[0].length, MAX_BUFFER_SIZE)
return Buffer.concat(this.rollingBuffer.slice(-numberOfChunks))
const numberOfChunks = Math.min(numberOfBytes / buffer[0].length, MAX_BUFFER_SIZE)
return Buffer.concat(buffer.slice(-numberOfChunks))
}

_transform(chunk: any, encoding: BufferEncoding, callback: TransformCallback) {
if (!this.isWriting) {
if (this.rollingBuffer.length > MAX_BUFFER_SIZE) { // Chunk size ~3840
this.rollingBuffer.shift()
}
this.rollingBuffer.push(chunk)
this.insertChunk(this.rollingBuffer, chunk)
if (this.createSilenceStream) {
this.insertChunk(this.rollingBufferWithSilence, chunk)
}
this.push(chunk)
callback()
}

private insertChunk(buffer: Buffer[], chunk: any, isSilenceChunk: boolean = false) {
if (!this.isWriting && !(isSilenceChunk && this.silenceDebouncer)) {
if (buffer.length > MAX_BUFFER_SIZE) { // Chunk size ~3840
buffer.shift()
}
buffer.push(chunk)
if (!isSilenceChunk) {
this.resetSilenceDebounce()
}
}
}

private resetSilenceDebounce() {
if (this.silenceDebouncer) {
clearTimeout(this.silenceDebouncer)
}
this.silenceDebouncer = setTimeout(() => {
this.silenceDebouncer = undefined
}, DEBOUNCE_TIME)
}
}