Skip to content

Commit

Permalink
fix: Manage stream connection release by orchestrator instead of driver
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Apr 10, 2021
1 parent 893e467 commit adf059e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 30 deletions.
54 changes: 25 additions & 29 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Expand Up @@ -136,36 +136,32 @@ export class CubeStoreDriver extends BaseDriver {
}

private async importStream(columns: Column[], tableData: any, table: string, indexes) {
try {
const writer = csvWriter({ headers: columns.map(c => c.name) });
const gzipStream = createGzip();

await new Promise(
(resolve, reject) => pipeline(
tableData.rowStream, writer, gzipStream, (err) => (err ? reject(err) : resolve(null))
)
);
const fileName = `${table}.csv.gz`;
const res = await fetch(`${this.baseUrl.replace(/^ws/, 'http')}/upload-temp-file?name=${fileName}`, {
method: 'POST',
body: gzipStream,
});

const createTableSql = this.createTableSql(table, columns);
// eslint-disable-next-line no-unused-vars
const createTableSqlWithLocation = `${createTableSql} ${indexes} LOCATION ?`;

if (res.status !== 200) {
const err = await res.json();
throw new Error(`Error during create table: ${createTableSqlWithLocation}: ${err.error}`);
}
await this.query(createTableSqlWithLocation, [`temp://${fileName}`]).catch(e => {
e.message = `Error during create table: ${createTableSqlWithLocation}: ${e.message}`;
throw e;
});
} finally {
await tableData.release();
const writer = csvWriter({ headers: columns.map(c => c.name) });
const gzipStream = createGzip();

await new Promise(
(resolve, reject) => pipeline(
tableData.rowStream, writer, gzipStream, (err) => (err ? reject(err) : resolve(null))
)
);
const fileName = `${table}.csv.gz`;
const res = await fetch(`${this.baseUrl.replace(/^ws/, 'http')}/upload-temp-file?name=${fileName}`, {
method: 'POST',
body: gzipStream,
});

const createTableSql = this.createTableSql(table, columns);
// eslint-disable-next-line no-unused-vars
const createTableSqlWithLocation = `${createTableSql} ${indexes} LOCATION ?`;

if (res.status !== 200) {
const err = await res.json();
throw new Error(`Error during create table: ${createTableSqlWithLocation}: ${err.error}`);
}
await this.query(createTableSqlWithLocation, [`temp://${fileName}`]).catch(e => {
e.message = `Error during create table: ${createTableSqlWithLocation}: ${e.message}`;
throw e;
});
}

public static dialectClass() {
Expand Down
Expand Up @@ -674,7 +674,13 @@ class PreAggregationLoader {
...capabilities,
}
));
await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn);
try {
await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn);
} finally {
if (tableData.release) {
await tableData.release();
}
}
await this.loadCache.fetchTables(this.preAggregation);
}

Expand Down

0 comments on commit adf059e

Please sign in to comment.