Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 53 additions & 76 deletions lib/DBSQLClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import thrift, { HttpHeaders } from 'thrift';
import thrift from 'thrift';

import { EventEmitter } from 'events';
import TCLIService from '../thrift/TCLIService';
Expand All @@ -13,12 +13,13 @@ import HttpConnection from './connection/connections/HttpConnection';
import IConnectionOptions from './connection/contracts/IConnectionOptions';
import Status from './dto/Status';
import HiveDriverError from './errors/HiveDriverError';
import { areHeadersEqual, buildUserAgentString, definedOrError } from './utils';
import { buildUserAgentString, definedOrError } from './utils';
import PlainHttpAuthentication from './connection/auth/PlainHttpAuthentication';
import DatabricksOAuth from './connection/auth/DatabricksOAuth';
import IDBSQLLogger, { LogLevel } from './contracts/IDBSQLLogger';
import DBSQLLogger from './DBSQLLogger';
import CloseableCollection from './utils/CloseableCollection';
import IConnectionProvider from './connection/contracts/IConnectionProvider';

function prependSlash(str: string): string {
if (str.length > 0 && str.charAt(0) !== '/') {
Expand All @@ -41,13 +42,11 @@ function getInitialNamespaceOptions(catalogName?: string, schemaName?: string) {
}

export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
private client: TCLIService.Client | null = null;
private connectionProvider?: IConnectionProvider;

private authProvider: IAuthentication | null = null;
private authProvider?: IAuthentication;

private connectionOptions: ConnectionOptions | null = null;

private additionalHeaders: HttpHeaders = {};
private client?: TCLIService.Client;

private readonly logger: IDBSQLLogger;

Expand All @@ -61,30 +60,14 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
this.logger.log(LogLevel.info, 'Created DBSQLClient');
}

private getConnectionOptions(options: ConnectionOptions, headers: HttpHeaders): IConnectionOptions {
const {
host,
port,
path,
clientId,
authType,
// @ts-expect-error TS2339: Property 'token' does not exist on type 'ConnectionOptions'
token,
// @ts-expect-error TS2339: Property 'persistence' does not exist on type 'ConnectionOptions'
persistence,
// @ts-expect-error TS2339: Property 'provider' does not exist on type 'ConnectionOptions'
provider,
...otherOptions
} = options;

private getConnectionOptions(options: ConnectionOptions): IConnectionOptions {
return {
host,
port: port || 443,
path: prependSlash(path),
host: options.host,
port: options.port || 443,
path: prependSlash(options.path),
https: true,
...otherOptions,
socketTimeout: options.socketTimeout,
headers: {
...headers,
'User-Agent': buildUserAgentString(options.clientId),
},
};
Expand Down Expand Up @@ -128,7 +111,38 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
*/
public async connect(options: ConnectionOptions, authProvider?: IAuthentication): Promise<IDBSQLClient> {
this.authProvider = this.getAuthProvider(options, authProvider);
this.connectionOptions = options;

this.connectionProvider = new HttpConnection(this.getConnectionOptions(options));

const thriftConnection = await this.connectionProvider.getThriftConnection();

thriftConnection.on('error', (error: Error) => {
// Error.stack already contains error type and message, so log stack if available,
// otherwise fall back to just error type + message
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter will throw unhandled error when emitting 'error' event.
// Since we already logged it few lines above, just suppress this behaviour
}
});

thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
this.emit('reconnecting', params);
});

thriftConnection.on('close', () => {
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
});

thriftConnection.on('timeout', () => {
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
});

return this;
}

Expand Down Expand Up @@ -158,65 +172,28 @@ export default class DBSQLClient extends EventEmitter implements IDBSQLClient {
}

private async getClient() {
if (!this.connectionOptions || !this.authProvider) {
if (!this.connectionProvider) {
throw new HiveDriverError('DBSQLClient: not connected');
}

const authHeaders = await this.authProvider.authenticate();
// When auth headers change - recreate client. Thrift library does not provide API for updating
// changed options, therefore we have to recreate both connection and client to apply new headers
if (!this.client || !areHeadersEqual(this.additionalHeaders, authHeaders)) {
if (!this.client) {
this.logger.log(LogLevel.info, 'DBSQLClient: initializing thrift client');
this.additionalHeaders = authHeaders;
const connectionOptions = this.getConnectionOptions(this.connectionOptions, this.additionalHeaders);
this.client = this.thrift.createClient(TCLIService, await this.connectionProvider.getThriftConnection());
}

const connection = await this.createConnection(connectionOptions);
this.client = this.thrift.createClient(TCLIService, connection.getConnection());
if (this.authProvider) {
const authHeaders = await this.authProvider.authenticate();
this.connectionProvider.setHeaders(authHeaders);
}

return this.client;
}

private async createConnection(options: IConnectionOptions) {
const connectionProvider = new HttpConnection();
const connection = await connectionProvider.connect(options);
const thriftConnection = connection.getConnection();

thriftConnection.on('error', (error: Error) => {
// Error.stack already contains error type and message, so log stack if available,
// otherwise fall back to just error type + message
this.logger.log(LogLevel.error, error.stack || `${error.name}: ${error.message}`);
try {
this.emit('error', error);
} catch (e) {
// EventEmitter will throw unhandled error when emitting 'error' event.
// Since we already logged it few lines above, just suppress this behaviour
}
});

thriftConnection.on('reconnecting', (params: { delay: number; attempt: number }) => {
this.logger.log(LogLevel.debug, `Reconnecting, params: ${JSON.stringify(params)}`);
this.emit('reconnecting', params);
});

thriftConnection.on('close', () => {
this.logger.log(LogLevel.debug, 'Closing connection.');
this.emit('close');
});

thriftConnection.on('timeout', () => {
this.logger.log(LogLevel.debug, 'Connection timed out.');
this.emit('timeout');
});

return connection;
}

public async close(): Promise<void> {
await this.sessions.closeAll();

this.client = null;
this.authProvider = null;
this.connectionOptions = null;
this.client = undefined;
this.connectionProvider = undefined;
this.authProvider = undefined;
}
}
6 changes: 3 additions & 3 deletions lib/connection/auth/DatabricksOAuth/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { HttpHeaders } from 'thrift';
import { HeadersInit } from 'node-fetch';
import IAuthentication from '../../contracts/IAuthentication';
import IDBSQLLogger from '../../../contracts/IDBSQLLogger';
import OAuthPersistence, { OAuthPersistenceCache } from './OAuthPersistence';
Expand All @@ -9,7 +9,7 @@ interface DatabricksOAuthOptions extends OAuthManagerOptions {
scopes?: OAuthScopes;
logger?: IDBSQLLogger;
persistence?: OAuthPersistence;
headers?: HttpHeaders;
headers?: HeadersInit;
}

export default class DatabricksOAuth implements IAuthentication {
Expand All @@ -27,7 +27,7 @@ export default class DatabricksOAuth implements IAuthentication {
this.manager = OAuthManager.getManager(this.options);
}

public async authenticate(): Promise<HttpHeaders> {
public async authenticate(): Promise<HeadersInit> {
const { host, scopes, headers } = this.options;

const persistence = this.options.persistence ?? this.defaultPersistence;
Expand Down
8 changes: 4 additions & 4 deletions lib/connection/auth/PlainHttpAuthentication.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
import { HttpHeaders } from 'thrift';
import { HeadersInit } from 'node-fetch';
import IAuthentication from '../contracts/IAuthentication';

interface PlainHttpAuthenticationOptions {
username?: string;
password?: string;
headers?: HttpHeaders;
headers?: HeadersInit;
}

export default class PlainHttpAuthentication implements IAuthentication {
private readonly username: string;

private readonly password: string;

private readonly headers: HttpHeaders;
private readonly headers: HeadersInit;

constructor(options: PlainHttpAuthenticationOptions) {
this.username = options?.username || 'anonymous';
this.password = options?.password ?? 'anonymous';
this.headers = options?.headers || {};
}

public async authenticate(): Promise<HttpHeaders> {
public async authenticate(): Promise<HeadersInit> {
return {
...this.headers,
Authorization: `Bearer ${this.password}`,
Expand Down
70 changes: 43 additions & 27 deletions lib/connection/connections/HttpConnection.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,36 @@
import thrift from 'thrift';
import https from 'https';
import http from 'http';
import { HeadersInit } from 'node-fetch';

import IThriftConnection from '../contracts/IThriftConnection';
import IConnectionProvider from '../contracts/IConnectionProvider';
import IConnectionOptions from '../contracts/IConnectionOptions';
import globalConfig from '../../globalConfig';

import ThriftHttpConnection from './ThriftHttpConnection';

export default class HttpConnection implements IConnectionProvider, IThriftConnection {
private connection: any;
export default class HttpConnection implements IConnectionProvider {
private readonly options: IConnectionOptions;

private headers: HeadersInit = {};

private connection?: ThriftHttpConnection;

constructor(options: IConnectionOptions) {
this.options = options;
}

public setHeaders(headers: HeadersInit) {
this.headers = headers;
this.connection?.setHeaders({
...this.options.headers,
...this.headers,
});
}

private async getAgent(): Promise<http.Agent> {
const { options } = this;

private async getAgent(options: IConnectionOptions): Promise<http.Agent> {
const httpAgentOptions: http.AgentOptions = {
keepAlive: true,
maxSockets: 5,
Expand All @@ -32,30 +50,28 @@ export default class HttpConnection implements IConnectionProvider, IThriftConne
return options.https ? new https.Agent(httpsAgentOptions) : new http.Agent(httpAgentOptions);
}

async connect(options: IConnectionOptions): Promise<IThriftConnection> {
const agent = await this.getAgent(options);

this.connection = new ThriftHttpConnection(
{
url: `${options.https ? 'https' : 'http'}://${options.host}:${options.port}${options.path ?? '/'}`,
transport: thrift.TBufferedTransport,
protocol: thrift.TBinaryProtocol,
},
{
agent,
timeout: options.socketTimeout ?? globalConfig.socketTimeout,
headers: options.headers,
},
);

return this;
}
public async getThriftConnection(): Promise<any> {
if (!this.connection) {
const { options } = this;
const agent = await this.getAgent();

getConnection() {
return this.connection;
}
this.connection = new ThriftHttpConnection(
{
url: `${options.https ? 'https' : 'http'}://${options.host}:${options.port}${options.path ?? '/'}`,
transport: thrift.TBufferedTransport,
protocol: thrift.TBinaryProtocol,
},
{
agent,
timeout: options.socketTimeout ?? globalConfig.socketTimeout,
headers: {
...options.headers,
...this.headers,
},
},
);
}

isConnected(): boolean {
return !!this.connection;
return this.connection;
}
}
11 changes: 9 additions & 2 deletions lib/connection/connections/ThriftHttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { EventEmitter } from 'events';
import { TBinaryProtocol, TBufferedTransport, Thrift, TProtocol, TProtocolConstructor, TTransport } from 'thrift';
import fetch, { RequestInit, Response, FetchError } from 'node-fetch';
import fetch, { RequestInit, HeadersInit, Response, FetchError } from 'node-fetch';
// @ts-expect-error TS7016: Could not find a declaration file for module
import InputBufferUnderrunError from 'thrift/lib/nodejs/lib/thrift/input_buffer_underrun_error';

Expand Down Expand Up @@ -48,7 +48,7 @@ type ThriftClient = {
export default class ThriftHttpConnection extends EventEmitter {
private readonly url: string;

private readonly config: RequestInit;
private config: RequestInit;

// This field is used by Thrift internally, so name and type are important
private readonly transport: TTransportType;
Expand All @@ -67,6 +67,13 @@ export default class ThriftHttpConnection extends EventEmitter {
this.protocol = options.protocol ?? TBinaryProtocol;
}

public setHeaders(headers: HeadersInit) {
this.config = {
...this.config,
headers,
};
}

public write(data: Buffer, seqId: number) {
const requestConfig: RequestInit = {
...this.config,
Expand Down
4 changes: 2 additions & 2 deletions lib/connection/contracts/IAuthentication.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { HttpHeaders } from 'thrift';
import { HeadersInit } from 'node-fetch';

export default interface IAuthentication {
authenticate(): Promise<HttpHeaders>;
authenticate(): Promise<HeadersInit>;
}
7 changes: 4 additions & 3 deletions lib/connection/contracts/IConnectionProvider.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import IConnectionOptions from './IConnectionOptions';
import IThriftConnection from './IThriftConnection';
import { HeadersInit } from 'node-fetch';

export default interface IConnectionProvider {
connect(options: IConnectionOptions): Promise<IThriftConnection>;
getThriftConnection(): Promise<any>;

setHeaders(headers: HeadersInit): void;
}
5 changes: 0 additions & 5 deletions lib/connection/contracts/IThriftConnection.ts

This file was deleted.

Loading