Skip to content

Commit

Permalink
Merge pull request #221 from TylerHAtkinson/support-typeorm-streams
Browse files Browse the repository at this point in the history
feat: Add support for typeorm  streams
  • Loading branch information
stelescuraul authored Jul 26, 2024
2 parents 73cfdf0 + 7ad16cd commit 70963bc
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 8 deletions.
53 changes: 49 additions & 4 deletions lib/common/RLSPostgresQueryRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
TenancyModelOptions,
TenantId,
} from '../interfaces/tenant-options.interface';
import { ReadStream } from 'typeorm/platform/PlatformTools';

export class RLSPostgresQueryRunner extends PostgresQueryRunner {
tenantId: TenantId = null;
Expand All @@ -27,15 +28,23 @@ export class RLSPostgresQueryRunner extends PostgresQueryRunner {
this.actorId = tenancyModelOptions.actorId;
}

private async setOptionsInDB() {
await super.query(
`set "rls.tenant_id" = '${this.tenantId}'; set "rls.actor_id" = '${this.actorId}';`,
);
}

private async resetOptionsInDB() {
await super.query(`reset rls.actor_id; reset rls.tenant_id;`);
}

async query(
queryString: string,
params?: any[],
useStructuredResult?: boolean,
): Promise<any> {
if (!this.isTransactionCommand) {
await super.query(
`set "rls.tenant_id" = '${this.tenantId}'; set "rls.actor_id" = '${this.actorId}';`,
);
await this.setOptionsInDB();
}

let result: Promise<any>;
Expand All @@ -47,13 +56,49 @@ export class RLSPostgresQueryRunner extends PostgresQueryRunner {
}

if (!this.isTransactionCommand && !(this.isTransactionActive && error)) {
await super.query(`reset rls.actor_id; reset rls.tenant_id;`);
await this.resetOptionsInDB();
}

if (error) throw error;
else return result;
}

async stream(
queryString: string,
params?: any[],
onEnd?: () => void,
onError?: (err: Error) => void,
): Promise<ReadStream> {
await this.setOptionsInDB();
try {
return await super.stream(
queryString,
params,
async () => {
await this.resetOptionsInDB();

if (onEnd) {
onEnd();
}
},
async (err: Error) => {
if (!this.isTransactionActive) {
await this.resetOptionsInDB();
}

if (onError) {
onError(err);
}
},
);
} catch (err) {
if (!this.isTransactionActive) {
await this.resetOptionsInDB();
}
throw err;
}
}

async startTransaction(isolationLevel?: IsolationLevel): Promise<void> {
this.isTransactionCommand = true;
await super.startTransaction(isolationLevel);
Expand Down
38 changes: 38 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"nyc": "^15.1.0",
"opn": "^6.0.0",
"pg": "^8.7.3",
"pg-query-stream": "^4.5.3",
"prettier": "^2.7.1",
"rxjs": "^7.5.6",
"semantic-release": "^19.0.3",
Expand Down
105 changes: 105 additions & 0 deletions test/common/RLSConnection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { DataSource } from 'typeorm';
import { PostgresDriver } from 'typeorm/driver/postgres/PostgresDriver';
import { RLSConnection, RLSPostgresQueryRunner } from '../../lib/common';
import { Post } from '../util/entity/Post';
import { Transform } from 'stream';
import {
closeTestingConnections,
reloadTestingDatabases,
Expand Down Expand Up @@ -104,6 +105,110 @@ describe('RLSConnection', () => {
loadedPost.title.should.be.eql('Foo');
});

it('should save and return the Post using streams', async () => {
const postRepo = connection.getRepository(Post);
const post = postRepo.create();
post.title = 'Foo';
post.tenantId = tenantModelOptions.tenantId as number;
post.userId = tenantModelOptions.actorId as number;
await postRepo.save(post);

const postStream = await postRepo
.createQueryBuilder('post')
.where({ id: post.id })
.stream();

const loadedPosts = await new Promise<any>((resolve, reject) => {
const result = [];
postStream.on('data', data => result.push(data));
postStream.on('end', () => resolve(result));
postStream.on('error', reject);
});

loadedPosts.should.have.lengthOf(1);
loadedPosts[0].post_id.should.be.eql(post.id);
loadedPosts[0].post_title.should.be.eql('Foo');
});

it('should save and return the Post using streams within a transaction', async () => {
connection.transaction(async entityManager => {
const postRepo = entityManager.getRepository(Post);
const post = postRepo.create();
post.title = 'Foo';
post.tenantId = tenantModelOptions.tenantId as number;
post.userId = tenantModelOptions.actorId as number;
await postRepo.save(post);

const postStream = await postRepo
.createQueryBuilder('post')
.where({ id: post.id })
.stream();

const loadedPosts = await new Promise<any>((resolve, reject) => {
const result = [];
postStream.on('data', data => result.push(data));
postStream.on('end', () => resolve(result));
postStream.on('error', reject);
});

loadedPosts.should.have.lengthOf(1);
loadedPosts[0].post_id.should.be.eql(post.id);
loadedPosts[0].post_title.should.be.eql('Foo');
});
});

it('should not reset tenantid if a query is ran while streaming', async () => {
const postRepo = connection.getRepository(Post);
const fooPost = postRepo.create();
fooPost.title = 'Foo';
fooPost.tenantId = tenantModelOptions.tenantId as number;
fooPost.userId = tenantModelOptions.actorId as number;

const barPost = postRepo.create();
barPost.title = 'Bar';
barPost.tenantId = tenantModelOptions.tenantId as number;
barPost.userId = tenantModelOptions.actorId as number;
await postRepo.save([fooPost, barPost]);

const loadedPosts = await new Promise<Post[]>(async (resolve, reject) => {
const postStream = (
await postRepo.createQueryBuilder('post').stream()
).pipe(
new Transform({
objectMode: true,
transform: async (data, encoding, callback) => {
try {
callback(
null,
await postRepo.findOneOrFail({
where: { id: data.post_id },
}),
);
} catch (err) {
callback(err);
}
},
}),
);

const result: Post[] = [];
postStream.on('data', (data: Post) => {
result.push(data);
});
postStream.on('end', () => resolve(result));
postStream.on('error', reject);
});

loadedPosts.should.have.lengthOf(2);
loadedPosts[0].should.be.instanceOf(Post);
loadedPosts[0].id.should.be.eql(fooPost.id);
loadedPosts[0].title.should.be.eql('Foo');

loadedPosts[1].should.be.instanceOf(Post);
loadedPosts[1].id.should.be.eql(barPost.id);
loadedPosts[1].title.should.be.eql('Bar');
});

describe('#close', () => {
it('throw error if trying to close connection on RLSConnection instance', async () => {
const tempConnection = new RLSConnection(
Expand Down
Loading

0 comments on commit 70963bc

Please sign in to comment.