Skip to content

Commit

Permalink
ensure canceling fetch body in integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
elf-pavlik committed Apr 13, 2024
1 parent 5dc8afa commit e8c00f4
Showing 1 changed file with 140 additions and 120 deletions.
260 changes: 140 additions & 120 deletions test/integration/StreamingHttpChannel2023.test.ts
Expand Up @@ -80,18 +80,19 @@ describe.each(stores)('A server supporting StreamingHTTPChannel2023 using %s', (
const streamingResponse = await fetch(receiveFrom);
const reader = streamingResponse.body!.getReader();
const decoder = new TextDecoder();

const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const parser = new Parser();
const quads = new Store(parser.parse(notification));

expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Update ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);

reader.releaseLock();
await streamingResponse.body!.cancel();
try {
const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const quads = new Store(parser.parse(notification));
expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Update ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);
} finally {
reader.releaseLock();
await streamingResponse.body!.cancel();
}
});

it('emits initial Delete if topic does not exist.', async(): Promise<void> => {
Expand All @@ -101,18 +102,19 @@ describe.each(stores)('A server supporting StreamingHTTPChannel2023 using %s', (
const streamingResponse = await fetch(receiveFrom);
const reader = streamingResponse.body!.getReader();
const decoder = new TextDecoder();

const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const parser = new Parser();
const quads = new Store(parser.parse(notification));

expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Delete ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);

reader.releaseLock();
await streamingResponse.body!.cancel();
try {
const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const quads = new Store(parser.parse(notification));
expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Delete ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);
} finally {
reader.releaseLock();
await streamingResponse.body!.cancel();
}
});

it.todo('does not emit initial notification when other receivers connect.');
Expand All @@ -124,84 +126,93 @@ describe.each(stores)('A server supporting StreamingHTTPChannel2023 using %s', (
const streamingResponse = await fetch(receiveFrom);
const reader = streamingResponse.body!.getReader();
const decoder = new TextDecoder();
// Ignore initial notification
await reader.read().then(({ value }): string => decoder.decode(value));

// Create resource
const response = await fetch(topic, {
method: 'PUT',
headers: { 'content-type': 'text/plain' },
body: 'abc',
});
expect(response.status).toBe(201);

const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const parser = new Parser();
const quads = new Store(parser.parse(notification));

expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Create ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);

reader.releaseLock();
await streamingResponse.body!.cancel();
try {
// Ignore initial notification
await reader.read().then(({ value }): string => decoder.decode(value));

// Create resource
const response = await fetch(topic, {
method: 'PUT',
headers: { 'content-type': 'text/plain' },
body: 'abc',
});
expect(response.status).toBe(201);

const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const quads = new Store(parser.parse(notification));

expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Create ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);
} finally {
reader.releaseLock();
await streamingResponse.body!.cancel();
}
});

it('emits Update events.', async(): Promise<void> => {
await store.setRepresentation({ path: topic }, new BasicRepresentation('new', 'text/plain'));
const streamingResponse = await fetch(receiveFrom);
const reader = streamingResponse.body!.getReader();
const decoder = new TextDecoder();
// Ignore initial notification
await reader.read().then(({ value }): string => decoder.decode(value));

// Update resource
const response = await fetch(topic, {
method: 'PUT',
headers: { 'content-type': 'text/plain' },
body: 'abc',
});
expect(response.status).toBe(205);

const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const parser = new Parser();
const quads = new Store(parser.parse(notification));

expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Update ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);

reader.releaseLock();
await streamingResponse.body!.cancel();
try {
// Ignore initial notification
await reader.read().then(({ value }): string => decoder.decode(value));

// Update resource
const response = await fetch(topic, {
method: 'PUT',
headers: { 'content-type': 'text/plain' },
body: 'abc',
});
expect(response.status).toBe(205);

const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const quads = new Store(parser.parse(notification));

expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Update ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);
} finally {
reader.releaseLock();
await streamingResponse.body!.cancel();
}
});

it('emits Delete events.', async(): Promise<void> => {
await store.setRepresentation({ path: topic }, new BasicRepresentation('new', 'text/plain'));
const streamingResponse = await fetch(receiveFrom);
const reader = streamingResponse.body!.getReader();
const decoder = new TextDecoder();
// Ignore initial notification
await reader.read().then(({ value }): string => decoder.decode(value));

// Delete resource
const response = await fetch(topic, {
method: 'DELETE',
});
expect(response.status).toBe(205);

const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const parser = new Parser();
const quads = new Store(parser.parse(notification));

expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Delete ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);

reader.releaseLock();
await streamingResponse.body!.cancel();
try {
// Ignore initial notification
await reader.read().then(({ value }): string => decoder.decode(value));

// Delete resource
const response = await fetch(topic, {
method: 'DELETE',
});
expect(response.status).toBe(205);

const notification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(notification).toBeDefined();

const quads = new Store(parser.parse(notification));

expect(quads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Delete ]);
expect(quads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(topic) ]);
} finally {
reader.releaseLock();
await streamingResponse.body!.cancel();
}
});

it('prevents connecting to channels of restricted topics.', async(): Promise<void> => {
Expand All @@ -224,17 +235,23 @@ describe.each(stores)('A server supporting StreamingHTTPChannel2023 using %s', (

// Unauthenticated fetch fails
const unauthenticatedResponse = await fetch(restrictedReceiveFrom);
expect(unauthenticatedResponse.status).toBe(401);
try {
expect(unauthenticatedResponse.status).toBe(401);
} finally {
await unauthenticatedResponse.body?.cancel();
}

// Authenticated fetch succeeds
const authenticatedResponse = await fetch(restrictedReceiveFrom, {
headers: {
authorization: `WebID ${webId}`,
},
});
expect(authenticatedResponse.status).toBe(200);

await authenticatedResponse.body!.cancel();
try {
expect(authenticatedResponse.status).toBe(200);
} finally {
await authenticatedResponse.body!.cancel();
}
});

it('emits container notifications if contents get added or removed.', async(): Promise<void> => {
Expand All @@ -246,44 +263,47 @@ describe.each(stores)('A server supporting StreamingHTTPChannel2023 using %s', (
const reader = streamingResponse.body!.getReader();
const decoder = new TextDecoder();
const parser = new Parser();
// Ignore initial notification
await reader.read().then(({ value }): string => decoder.decode(value));

// Create contained resource
const createResponse = await fetch(resource, {
method: 'PUT',
headers: { 'content-type': 'text/plain' },
body: 'abc',
});
expect(createResponse.status).toBe(201);

// Will receive the Add notification
const addNotification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(addNotification).toBeDefined();

const addQuads = new Store(parser.parse(addNotification));

expect(addQuads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Add ]);
expect(addQuads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(resource) ]);
expect(addQuads.getObjects(null, AS.terms.target, null)).toEqual([ namedNode(baseUrl) ]);

// Remove contained resource
const removeResponse = await fetch(resource, {
method: 'DELETE',
});
expect(removeResponse.status).toBe(205);

// Will receive the Remove notification
const removeNotification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(removeNotification).toBeDefined();

const removeQuads = new Store(parser.parse(removeNotification));

expect(removeQuads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Remove ]);
expect(removeQuads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(resource) ]);
expect(removeQuads.getObjects(null, AS.terms.target, null)).toEqual([ namedNode(baseUrl) ]);

reader.releaseLock();
await streamingResponse.body!.cancel();
try {
// Ignore initial notification
await reader.read().then(({ value }): string => decoder.decode(value));

// Create contained resource
const createResponse = await fetch(resource, {
method: 'PUT',
headers: { 'content-type': 'text/plain' },
body: 'abc',
});
expect(createResponse.status).toBe(201);

// Will receive the Add notification
const addNotification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(addNotification).toBeDefined();

const addQuads = new Store(parser.parse(addNotification));

expect(addQuads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Add ]);
expect(addQuads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(resource) ]);
expect(addQuads.getObjects(null, AS.terms.target, null)).toEqual([ namedNode(baseUrl) ]);

// Remove contained resource
const removeResponse = await fetch(resource, {
method: 'DELETE',
});
expect(removeResponse.status).toBe(205);

// Will receive the Remove notification
const removeNotification = await reader.read().then(({ value }): string => decoder.decode(value));
expect(removeNotification).toBeDefined();

const removeQuads = new Store(parser.parse(removeNotification));

expect(removeQuads.getObjects(null, RDF.terms.type, null)).toEqual([ AS.terms.Remove ]);
expect(removeQuads.getObjects(null, AS.terms.object, null)).toEqual([ namedNode(resource) ]);
expect(removeQuads.getObjects(null, AS.terms.target, null)).toEqual([ namedNode(baseUrl) ]);
} finally {
reader.releaseLock();
await streamingResponse.body!.cancel();
}
});
});

0 comments on commit e8c00f4

Please sign in to comment.