/
fileSystemPersist.ts
200 lines (182 loc) · 6.31 KB
/
fileSystemPersist.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.
import * as fs from "fs";
import * as os from "os";
import * as path from "path";
import { diag } from "@opentelemetry/api";
import { PersistentStorage } from "../../../types";
import { DEFAULT_EXPORTER_CONFIG, AzureExporterInternalConfig } from "../../../config";
import { FileAccessControl } from "./fileAccessControl";
import { confirmDirExists, getShallowDirectorySize } from "./fileSystemHelpers";
import { promisify } from "util";
const statAsync = promisify(fs.stat);
const readdirAsync = promisify(fs.readdir);
const readFileAsync = promisify(fs.readFile);
const unlinkAsync = promisify(fs.unlink);
const writeFileAsync = promisify(fs.writeFile);
/**
* File system persist class.
* @internal
*/
export class FileSystemPersist implements PersistentStorage {
static TEMPDIR_PREFIX = "ot-azure-exporter-";
static FILENAME_SUFFIX = ".ai.json";
fileRetemptionPeriod = 7 * 24 * 60 * 60 * 1000; // 7 days
cleanupTimeOut = 60 * 60 * 1000; // 1 hour
maxBytesOnDisk: number = 50_000_000; // ~50MB
private _enabled: boolean;
private _tempDirectory: string = "";
private _fileCleanupTimer: NodeJS.Timer | null = null;
private readonly _options: AzureExporterInternalConfig;
constructor(options: Partial<AzureExporterInternalConfig> = {}) {
this._options = { ...DEFAULT_EXPORTER_CONFIG, ...options };
this._enabled = true;
FileAccessControl.checkFileProtection();
if (!FileAccessControl.OS_PROVIDES_FILE_PROTECTION) {
this._enabled = false;
diag.error(
"Sufficient file protection capabilities were not detected. Files will not be persisted"
);
}
if (!this._options.instrumentationKey) {
this._enabled = false;
diag.error(
`No instrumentation key was provided to FileSystemPersister. Files will not be persisted`
);
}
if (this._enabled) {
this._tempDirectory = path.join(
os.tmpdir(),
FileSystemPersist.TEMPDIR_PREFIX + this._options.instrumentationKey
);
// Starts file cleanup task
if (!this._fileCleanupTimer) {
this._fileCleanupTimer = setTimeout(() => {
this._fileCleanupTask();
}, this.cleanupTimeOut);
this._fileCleanupTimer.unref();
}
}
}
push(value: unknown[]): Promise<boolean> {
if (this._enabled) {
diag.debug("Pushing value to persistent storage", value.toString());
return this._storeToDisk(JSON.stringify(value));
}
return new Promise((resolve) => {
resolve(false);
});
}
async shift(): Promise<unknown> {
if (this._enabled) {
diag.debug("Searching for filesystem persisted files");
try {
const buffer = await this._getFirstFileOnDisk();
if (buffer) {
return JSON.parse(buffer.toString("utf8"));
}
} catch (e: any) {
diag.debug("Failed to read persisted file", e);
}
return null;
}
return new Promise((resolve) => {
resolve(null);
});
}
/**
* Check for temp telemetry files
* reads the first file if exist, deletes it and tries to send its load
*/
private async _getFirstFileOnDisk(): Promise<Buffer | null> {
try {
const stats = await statAsync(this._tempDirectory);
if (stats.isDirectory()) {
const origFiles = await readdirAsync(this._tempDirectory);
const files = origFiles.filter((f) =>
path.basename(f).includes(FileSystemPersist.FILENAME_SUFFIX)
);
if (files.length === 0) {
return null;
} else {
const firstFile = files[0];
const filePath = path.join(this._tempDirectory, firstFile);
const payload = await readFileAsync(filePath);
// delete the file first to prevent double sending
await unlinkAsync(filePath);
return payload;
}
}
return null;
} catch (e: any) {
if (e.code === "ENOENT") {
// File does not exist -- return null instead of throwing
return null;
} else {
throw e;
}
}
}
private async _storeToDisk(payload: string): Promise<boolean> {
try {
await confirmDirExists(this._tempDirectory);
} catch (error: any) {
diag.warn(`Error while checking/creating directory: `, error && error.message);
return false;
}
try {
const size = await getShallowDirectorySize(this._tempDirectory);
if (size > this.maxBytesOnDisk) {
diag.warn(
`Not saving data due to max size limit being met. Directory size in bytes is: ${size}`
);
return false;
}
} catch (error: any) {
diag.warn(`Error while checking size of persistence directory: `, error && error.message);
return false;
}
const fileName = `${new Date().getTime()}${FileSystemPersist.FILENAME_SUFFIX}`;
const fileFullPath = path.join(this._tempDirectory, fileName);
// Mode 600 is w/r for creator and no read access for others
diag.info(`saving data to disk at: ${fileFullPath}`);
try {
await writeFileAsync(fileFullPath, payload, { mode: 0o600 });
} catch (writeError: any) {
diag.warn(`Error writing file to persistent file storage`, writeError);
return false;
}
return true;
}
private async _fileCleanupTask(): Promise<boolean> {
try {
const stats = await statAsync(this._tempDirectory);
if (stats.isDirectory()) {
const origFiles = await readdirAsync(this._tempDirectory);
const files = origFiles.filter((f) =>
path.basename(f).includes(FileSystemPersist.FILENAME_SUFFIX)
);
if (files.length === 0) {
return false;
} else {
files.forEach(async (file) => {
// Check expiration
const fileCreationDate: Date = new Date(
parseInt(file.split(FileSystemPersist.FILENAME_SUFFIX)[0])
);
const expired = new Date(+new Date() - this.fileRetemptionPeriod) > fileCreationDate;
if (expired) {
const filePath = path.join(this._tempDirectory, file);
await unlinkAsync(filePath);
}
});
return true;
}
}
return false;
} catch (error: any) {
diag.info(`Failed cleanup of persistent file storage expired files`, error);
return false;
}
}
}