Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@
"@babel/preset-env": "^7.26.8",
"@babel/preset-typescript": "^7.26.0",
"@rushstack/eslint-patch": "^1.4.0",
"@types/express": "^5.0.0",
"@types/jsdom": "^21.1.7",
"@types/jsonwebtoken": "^9.0.6",
"@types/markdown-it": "^14.1.1",
"@types/node": "^20.17.17",
"@types/node": "^22.13.4",
"@vitejs/plugin-vue": "^4.3.0",
"@vitejs/plugin-vue-jsx": "^3.1.0",
"@vue/eslint-config-prettier": "^9.0.0",
Expand All @@ -79,6 +80,7 @@
"eslint-plugin-vue": "^9.12.0",
"jest": "^29.7.0",
"jsdom": "^25.0.1",
"node-mocks-http": "^1.16.2",
"npm-run-all2": "^7.0.1",
"prettier": "^3.0.1",
"superagent": "^9.0.0",
Expand Down
78 changes: 51 additions & 27 deletions server/controllers/OpeyIIController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import { UserInput } from '../schema/OpeySchema'

export class OpeyController {
constructor(
private obpClientService: OBPClientService,
private opeyClientService: OpeyClientService,
public obpClientService: OBPClientService,
public opeyClientService: OpeyClientService,
) {}

@Get('/')
async getStatus(
@Res() response: Response
): Response {
): Promise<Response | any> {

try {
const opeyStatus = await this.opeyClientService.getOpeyStatus()
Expand Down Expand Up @@ -67,33 +67,57 @@ export class OpeyController {
callback();
}
})

let nodeStream: NodeJS.ReadableStream | null = null

try {
const nodeStream = await this.opeyClientService.stream(user_input)
console.log(`Stream received from OpeyClientService.stream: ${nodeStream.readable}`)
nodeStream.pipe(streamMiddlewareTransform).pipe(response)
// Read stream from OpeyClientService
nodeStream = await this.opeyClientService.stream(user_input)
console.debug(`Stream received readable: ${nodeStream.readable}`)

} catch (error) {
console.error("Error reading stream: ", error)
response.status(500).json({ error: 'Internal Server Error' })
return
}

if (!nodeStream || !nodeStream.readable) {
console.error("Stream is not readable")
response.status(500).json({ error: 'Internal Server Error' })
return
}

try {
// response.writeHead(200, {
// 'Content-Type': "text/event-stream",
// 'Cache-Control': "no-cache",
// 'Connection': "keep-alive"
// });

response.status(200)
response.setHeader('Content-Type', 'text/event-stream')
response.setHeader('Cache-Control', 'no-cache')
response.setHeader('Connection', 'keep-alive')

// nodeStream.on('data', (chunk) => {
// const data = chunk.toString()
// console.log(`data: ${data}`)
// response.write(`data: ${data}\n\n`)
// })
// nodeStream.on('end', () => {
// console.log('Stream ended')
// response.end()
// })
// nodeStream.on('error', (error) => {
// console.error(error)
// response.write(`data: Error reading stream\n\n`)
// response.end()
// })
let data: any[] = []

nodeStream.on('data', (chunk) => {
const bufferChunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
data.push(bufferChunk);
response.write(`data: ${chunk.toString()}\n\n`)
})
nodeStream.on('end', () => {
//console.log('Stream ended')
const totalData = Buffer.concat(data)
response.write(totalData)
response.end()
})
nodeStream.on('error', (error) => {
console.error(error)
response.write(`data: Error reading stream\n\n`)
response.end()
})
} catch (error) {
console.error(error)
console.error("Error writing data: ", error)
response.status(500).json({ error: 'Internal Server Error' })
}
}
Expand All @@ -103,7 +127,7 @@ export class OpeyController {
@Session() session: any,
@Req() request: Request,
@Res() response: Response
): Response {
): Promise<Response | any> {

let user_input: UserInput
try {
Expand All @@ -113,14 +137,14 @@ export class OpeyController {
"is_tool_call_approval": request.body.is_tool_call_approval
}
} catch (error) {
console.error("Error in stream endpoint, could not parse into UserInput: ", error)
console.error("Error in invoke endpoint, could not parse into UserInput: ", error)
return response.status(500).json({ error: 'Internal Server Error' })
}

try {
const opey_response = await this.opeyClientService.invoke(user_input)

console.log("Opey response: ", opey_response)
//console.log("Opey response: ", opey_response)
return response.status(200).json(opey_response)
} catch (error) {
console.error(error)
Expand All @@ -136,7 +160,7 @@ export class OpeyController {
@Session() session: any,
@Req() request: Request,
@Res() response: Response
): Response {
): Promise<Response | any> {
try {
console.log("Getting consent from OBP")
// Check if consent is already in session
Expand Down Expand Up @@ -190,7 +214,7 @@ export class OpeyController {
@Session() session: any,
@Req() request: Request,
@Res() response: Response
): Response {
): Promise<Response | any> {
try {
const oauthConfig = session['clientConfig']
const version = this.obpClientService.getOBPVersion()
Expand Down
158 changes: 158 additions & 0 deletions tests/opey-unit.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import { OpeyController } from "../server/controllers/OpeyIIController";
import OpeyClientService from '../server/services/OpeyClientService';
import OBPClientService from '../server/services/OBPClientService';
import Stream, { Readable } from 'stream';
import { Request, Response } from 'express';
import httpMocks from 'node-mocks-http'
import { EventEmitter } from 'events';
import {jest} from '@jest/globals';

jest.mock("../server/services/OpeyClientService", () => {
return {
OpeyClientService: jest.fn().mockImplementation(() => {
return {
getOpeyStatus: jest.fn(async () => {
return {status: 'running'}
}),
stream: jest.fn(async () => {
const readableStream = new Stream.Readable();

for (let i=0; i<10; i++) {
readableStream.push(`Chunk ${i}`);
}

return readableStream as NodeJS.ReadableStream;
}),
invoke: jest.fn(async () => {
return {
content: 'Hi this is Opey',
}
})
}

}),
};
});

// jest.mock("./A", () => {
// return {
// A: jest.fn().mockImplementation(() => {
// return {
// getSomething: getSomethingMock
// }
// })
// };
// });
// Mock the OpeyClientService class


// jest.mocked(OpeyClientService).mockImplementation(() => {
// return {
// getOpeyStatus: jest.fn(async () => {
// return {status: 'running'}
// }),
// stream: jest.fn(async () => {
// const readableStream = new Stream.Readable();

// for (let i=0; i<10; i++) {
// readableStream.push(`Chunk ${i}`);
// }

// return readableStream as NodeJS.ReadableStream;
// }),
// invoke: jest.fn(async () => {
// return {
// content: 'Hi this is Opey',
// }
// })
// }
// });



describe('OpeyController', () => {
// Mock the OpeyClientService class

const MockOpeyClientService = {
authConfig: {},
opeyConfig: {},
getOpeyStatus: jest.fn(async () => {
return {status: 'running'}
}),
stream: jest.fn(async () => {

async function * generator() {
for (let i=0; i<10; i++) {
yield `Chunk ${i}`;
}
}

const readableStream = Stream.Readable.from(generator());

return readableStream as NodeJS.ReadableStream;
}),
invoke: jest.fn(async () => {
return {
content: 'Hi this is Opey',
}
})
} as unknown as jest.Mocked<OpeyClientService>


// Instantiate OpeyController with the mocked OpeyClientService
const opeyController = new OpeyController(new OBPClientService, MockOpeyClientService)


it('getStatus', async () => {
const res = httpMocks.createResponse();

await opeyController.getStatus(res)
expect(MockOpeyClientService.getOpeyStatus).toHaveBeenCalled();
expect(res.statusCode).toBe(200);
})

it('streamOpey', () => {

const _eventEmitter = new EventEmitter();
_eventEmitter.addListener('data', () => {
console.log('Data received')
})
// The default event emitter does nothing, so replace
const res = httpMocks.createResponse({
eventEmitter: _eventEmitter,
writableStream: Stream.Writable
});

const req = {
body: {
message: 'Hello Opey',
thread_id: '123',
is_tool_call_approval: false
}
} as unknown as Request;

// Define handelrs for events
res.on('end', () => {
console.log('Stream ended')
console.log(res._getData())
expect(res.statusCode).toBe(200);
})

let chunks: any[] = [];
res.on('data', (chunk) => {
console.log(chunk)
chunks.push(chunk);
expect(chunk).toBeDefined();
})

opeyController.streamOpey({}, req, res)
.then((res) => {
console.log(res)
})

expect(chunks.length).toBe(10);
expect(MockOpeyClientService.stream).toHaveBeenCalled();
expect(res).toBeDefined();

})
})
Loading