Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 11 additions & 193 deletions packages/cubejs-base-driver/src/BaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,6 @@ import {
isSslCert,
} from '@cubejs-backend/shared';
import fs from 'fs';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { S3, GetObjectCommand, S3ClientConfig } from '@aws-sdk/client-s3';
import { Storage } from '@google-cloud/storage';
import {
BlobServiceClient,
StorageSharedKeyCredential,
ContainerSASPermissions,
SASProtocol,
generateBlobSASQueryParameters,
} from '@azure/storage-blob';
import {
DefaultAzureCredential,
ClientSecretCredential,
} from '@azure/identity';

import { cancelCombinator } from './utils';
import {
Expand Down Expand Up @@ -57,44 +43,10 @@ import {
InformationSchemaColumn,
} from './driver.interface';

/**
* @see {@link DefaultAzureCredential} constructor options
*/
export type AzureStorageClientConfig = {
azureKey?: string,
sasToken?: string,
/**
* The client ID of a Microsoft Entra app registration.
* In case of DefaultAzureCredential flow if it is omitted
* the Azure library will try to use the AZURE_CLIENT_ID env
*/
clientId?: string,
/**
* ID of the application's Microsoft Entra tenant. Also called its directory ID.
* In case of DefaultAzureCredential flow if it is omitted
* the Azure library will try to use the AZURE_TENANT_ID env
*/
tenantId?: string,
/**
* Azure service principal client secret.
* Enables authentication to Microsoft Entra ID using a client secret that was generated
* for an App Registration. More information on how to configure a client secret can be found here:
* https://learn.microsoft.com/entra/identity-platform/quickstart-configure-app-access-web-apis#add-credentials-to-your-web-application
* In case of DefaultAzureCredential flow if it is omitted
* the Azure library will try to use the AZURE_CLIENT_SECRET env
*/
clientSecret?: string,
/**
* The path to a file containing a Kubernetes service account token that authenticates the identity.
* In case of DefaultAzureCredential flow if it is omitted
* the Azure library will try to use the AZURE_FEDERATED_TOKEN_FILE env
*/
tokenFilePath?: string,
};

export type GoogleStorageClientConfig = {
credentials: any,
};
// Import only types, because these SDKs are quite large and should be loaded lazily
import type { AzureStorageClientConfig } from './storage-fs/azure.fs';
import type { S3StorageClientConfig } from './storage-fs/aws.fs';
import type { GoogleStorageClientConfig } from './storage-fs/gcs.fs';

export type ParsedBucketUrl = {
/**
Expand Down Expand Up @@ -800,38 +752,12 @@ export abstract class BaseDriver implements DriverInterface {
* Returns an array of signed AWS S3 URLs of the unloaded csv files.
*/
protected async extractUnloadedFilesFromS3(
clientOptions: S3ClientConfig,
clientOptions: S3StorageClientConfig,
bucketName: string,
prefix: string
): Promise<string[]> {
const storage = new S3(clientOptions);
// It looks that different driver configurations use different formats
// for the bucket - some expect only names, some - full url-like names.
// So we unify this.
bucketName = bucketName.replace(/^[a-zA-Z]+:\/\//, '');

const list = await storage.listObjectsV2({
Bucket: bucketName,
Prefix: prefix,
});
if (list) {
if (!list.Contents) {
return [];
} else {
const csvFiles = await Promise.all(
list.Contents.map(async (file) => {
const command = new GetObjectCommand({
Bucket: bucketName,
Key: file.Key,
});
return getSignedUrl(storage, command, { expiresIn: 3600 });
})
);
return csvFiles;
}
}

throw new Error('Unable to retrieve list of files from S3 storage after unloading.');
// Lazy loading, because it's using azure SDK, which is quite heavy.
return (await import('./storage-fs/aws.fs')).extractUnloadedFilesFromS3(clientOptions, bucketName, prefix);
}

/**
Expand All @@ -842,124 +768,16 @@ export abstract class BaseDriver implements DriverInterface {
bucketName: string,
tableName: string
): Promise<string[]> {
const storage = new Storage(
gcsConfig.credentials
? { credentials: gcsConfig.credentials, projectId: gcsConfig.credentials.project_id }
: undefined
);
const bucket = storage.bucket(bucketName);
const [files] = await bucket.getFiles({ prefix: `${tableName}/` });
if (files.length) {
const csvFiles = await Promise.all(files.map(async (file) => {
const [url] = await file.getSignedUrl({
action: 'read',
expires: new Date(new Date().getTime() + 60 * 60 * 1000)
});
return url;
}));

return csvFiles;
} else {
throw new Error('No CSV files were obtained from the bucket');
}
// Lazy loading, because it's using azure SDK, which is quite heavy.
return (await import('./storage-fs/gcs.fs')).extractFilesFromGCS(gcsConfig, bucketName, tableName);
}

protected async extractFilesFromAzure(
azureConfig: AzureStorageClientConfig,
bucketName: string,
tableName: string
): Promise<string[]> {
const splitter = bucketName.includes('blob.core') ? '.blob.core.windows.net/' : '.dfs.core.windows.net/';
const parts = bucketName.split(splitter);
const account = parts[0];
const container = parts[1].split('/')[0];
let credential: StorageSharedKeyCredential | ClientSecretCredential | DefaultAzureCredential;
let blobServiceClient: BlobServiceClient;
let getSas;

if (azureConfig.azureKey) {
credential = new StorageSharedKeyCredential(account, azureConfig.azureKey);
getSas = async (name: string, startsOn: Date, expiresOn: Date) => generateBlobSASQueryParameters(
{
containerName: container,
blobName: name,
permissions: ContainerSASPermissions.parse('r'),
startsOn,
expiresOn,
protocol: SASProtocol.Https,
version: '2020-08-04',
},
credential as StorageSharedKeyCredential
).toString();
} else if (azureConfig.clientSecret && azureConfig.tenantId && azureConfig.clientId) {
credential = new ClientSecretCredential(
azureConfig.tenantId,
azureConfig.clientId,
azureConfig.clientSecret,
);
getSas = async (name: string, startsOn: Date, expiresOn: Date) => {
const userDelegationKey = await blobServiceClient.getUserDelegationKey(startsOn, expiresOn);
return generateBlobSASQueryParameters(
{
containerName: container,
blobName: name,
permissions: ContainerSASPermissions.parse('r'),
startsOn,
expiresOn,
protocol: SASProtocol.Https,
version: '2020-08-04',
},
userDelegationKey,
account
).toString();
};
} else {
const opts = {
tenantId: azureConfig.tenantId,
clientId: azureConfig.clientId,
tokenFilePath: azureConfig.tokenFilePath,
};
credential = new DefaultAzureCredential(opts);
getSas = async (name: string, startsOn: Date, expiresOn: Date) => {
// getUserDelegationKey works only for authorization with Microsoft Entra ID
const userDelegationKey = await blobServiceClient.getUserDelegationKey(startsOn, expiresOn);
return generateBlobSASQueryParameters(
{
containerName: container,
blobName: name,
permissions: ContainerSASPermissions.parse('r'),
startsOn,
expiresOn,
protocol: SASProtocol.Https,
version: '2020-08-04',
},
userDelegationKey,
account,
).toString();
};
}

const url = `https://${account}.blob.core.windows.net`;
blobServiceClient = azureConfig.sasToken ?
new BlobServiceClient(`${url}?${azureConfig.sasToken}`) :
new BlobServiceClient(url, credential);

const csvFiles: string[] = [];
const containerClient = blobServiceClient.getContainerClient(container);
const blobsList = containerClient.listBlobsFlat({ prefix: `${tableName}` });
for await (const blob of blobsList) {
if (blob.name && (blob.name.endsWith('.csv.gz') || blob.name.endsWith('.csv'))) {
const starts = new Date();
const expires = new Date(starts.valueOf() + 1000 * 60 * 60);
const sas = await getSas(blob.name, starts, expires);
csvFiles.push(`${url}/${container}/${blob.name}?${sas}`);
}
}

if (csvFiles.length === 0) {
throw new Error('No CSV files were obtained from the bucket');
}

return csvFiles;
// Lazy loading, because it's using azure SDK, which is quite (extremely) heavy.
return (await import('./storage-fs/azure.fs')).extractFilesFromAzure(azureConfig, bucketName, tableName);
}
}
42 changes: 42 additions & 0 deletions packages/cubejs-base-driver/src/storage-fs/aws.fs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { S3, GetObjectCommand, S3ClientConfig } from '@aws-sdk/client-s3';

export type S3StorageClientConfig = S3ClientConfig;

/**
* Returns an array of signed AWS S3 URLs of the unloaded csv files.
*/
export async function extractUnloadedFilesFromS3(
clientOptions: S3StorageClientConfig,
bucketName: string,
prefix: string
): Promise<string[]> {
const storage = new S3(clientOptions);
// It looks that different driver configurations use different formats
// for the bucket - some expect only names, some - full url-like names.
// So we unify this.
bucketName = bucketName.replace(/^[a-zA-Z]+:\/\//, '');

const list = await storage.listObjectsV2({
Bucket: bucketName,
Prefix: prefix,
});
if (list) {
if (!list.Contents) {
return [];
} else {
const csvFiles = await Promise.all(
list.Contents.map(async (file) => {
const command = new GetObjectCommand({
Bucket: bucketName,
Key: file.Key,
});
return getSignedUrl(storage, command, { expiresIn: 3600 });
})
);
return csvFiles;
}
}

throw new Error('Unable to retrieve list of files from S3 storage after unloading.');
}
Loading
Loading