/
application-code.ts
139 lines (123 loc) · 3.75 KB
/
application-code.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import * as ka from 'aws-cdk-lib/aws-kinesisanalytics';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as s3_assets from 'aws-cdk-lib/aws-s3-assets';
import { Construct } from 'constructs';
/**
* The return type of `ApplicationCode.bind`. This represents
* CloudFormation configuration and an s3 bucket holding the Flink application
* JAR file.
*/
export interface ApplicationCodeConfig {
/**
* Low-level Cloudformation ApplicationConfigurationProperty
*/
readonly applicationCodeConfigurationProperty: ka.CfnApplicationV2.ApplicationConfigurationProperty;
/**
* S3 Bucket that stores the Flink application code
*/
readonly bucket: s3.IBucket;
}
/**
* Code configuration providing the location to a Flink application JAR file.
*/
export abstract class ApplicationCode {
/**
* Reference code from an S3 bucket.
*
* @param bucket - an s3 bucket
* @param fileKey - a key pointing to a Flink JAR file
* @param objectVersion - an optional version string for the provided fileKey
*/
public static fromBucket(bucket: s3.IBucket, fileKey: string, objectVersion?: string): ApplicationCode {
return new BucketApplicationCode({
bucket,
fileKey,
objectVersion,
});
}
/**
* Reference code from a local directory containing a Flink JAR file.
*
* @param path - a local directory path
* @parm options - standard s3 AssetOptions
*/
public static fromAsset(path: string, options?: s3_assets.AssetOptions): ApplicationCode {
return new AssetApplicationCode(path, options);
}
/**
* A method to lazily bind asset resources to the parent FlinkApplication.
*/
public abstract bind(scope: Construct): ApplicationCodeConfig;
}
interface BucketApplicationCodeProps {
readonly bucket: s3.IBucket;
readonly fileKey: string;
readonly objectVersion?: string;
}
class BucketApplicationCode extends ApplicationCode {
public readonly bucket?: s3.IBucket;
public readonly fileKey: string;
public readonly objectVersion?: string;
constructor(props: BucketApplicationCodeProps) {
super();
this.bucket = props.bucket;
this.fileKey = props.fileKey;
this.objectVersion = props.objectVersion;
}
public bind(_scope: Construct): ApplicationCodeConfig {
return {
applicationCodeConfigurationProperty: {
applicationCodeConfiguration: {
codeContent: {
s3ContentLocation: {
bucketArn: this.bucket!.bucketArn,
fileKey: this.fileKey,
objectVersion: this.objectVersion,
},
},
codeContentType: 'ZIPFILE',
},
},
bucket: this.bucket!,
};
}
}
class AssetApplicationCode extends ApplicationCode {
private readonly path: string;
private readonly options?: s3_assets.AssetOptions;
private _asset?: s3_assets.Asset;
constructor(path: string, options?: s3_assets.AssetOptions) {
super();
this.path = path;
this.options = options;
}
public bind(scope: Construct): ApplicationCodeConfig {
this._asset = new s3_assets.Asset(scope, 'Code', {
path: this.path,
...this.options,
});
if (!this._asset.isZipArchive) {
throw new Error(`Asset must be a .zip file or a directory (${this.path})`);
}
return {
applicationCodeConfigurationProperty: {
applicationCodeConfiguration: {
codeContent: {
s3ContentLocation: {
bucketArn: this._asset.bucket.bucketArn,
fileKey: this._asset.s3ObjectKey,
},
},
codeContentType: 'ZIPFILE',
},
},
bucket: this._asset.bucket,
};
}
get asset(): s3_assets.Asset | undefined {
return this._asset;
}
get bucket(): s3.IBucket | undefined {
return this._asset?.bucket;
}
}