Skip to content

Commit

Permalink
feat(data caching): add WIP read through data cache
Browse files Browse the repository at this point in the history
Uses an FsDataStore to stream data to disk as it's read from another
source. Currently writes, but never reads. Reading when implemented
will use the DB to lookup hashes by ID.
  • Loading branch information
djwhitt committed Feb 6, 2023
1 parent 598e7ae commit 3423035
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 3 deletions.
10 changes: 8 additions & 2 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import YAML from 'yaml';
import { ArweaveCompositeClient } from './arweave/composite-client.js';
import { GatewayDataSource } from './data/gateway-data-source.js';
import { ReadThroughChunkDataCache } from './data/read-through-chunk-data-cache.js';
import { ReadThroughDataCache } from './data/read-through-data-cache.js';
import { SequentialDataSource } from './data/sequential-data-source.js';
import { TxChunksDataSource } from './data/tx-chunks-data-source.js';
import { StandaloneSqliteDatabase } from './database/standalone-sqlite.js';
Expand All @@ -46,6 +47,7 @@ import {
import { apolloServer } from './routes/graphql/index.js';
import { FsBlockStore } from './store/fs-block-store.js';
import { FsChunkDataStore } from './store/fs-chunk-data-store.js';
import { FsDataStore } from './store/fs-data-store.js';
import { FsTransactionStore } from './store/fs-transaction-store.js';
import { BlockImporter } from './workers/block-importer.js';
import { TransactionFetcher } from './workers/transaction-fetcher.js';
Expand Down Expand Up @@ -162,9 +164,13 @@ const gatewayDataSource = new GatewayDataSource({
trustedGatewayUrl,
});

const contiguousDataSource = new SequentialDataSource({
const contiguousDataSource = new ReadThroughDataCache({
log,
dataSources: [gatewayDataSource, txChunksDataSource, arweaveClient],
dataSource: new SequentialDataSource({
log,
dataSources: [gatewayDataSource, txChunksDataSource, arweaveClient],
}),
dataStore: new FsDataStore({ log, baseDir: 'data/data' }),
});

const manifestPathResolver = new StreamingManifestPathResolver({
Expand Down
85 changes: 85 additions & 0 deletions src/data/read-through-data-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import crypto from 'crypto';
import { Writable } from 'stream';
import winston from 'winston';

import { toB64Url } from '../lib/encoding.js';
import {
ContiguousData,
ContiguousDataSource,
ContiguousDataStore,
} from '../types.js';

export class ReadThroughDataCache implements ContiguousDataSource {
private log: winston.Logger;
private dataSource: ContiguousDataSource;
private dataStore: ContiguousDataStore;

constructor({
log,
dataSource,
dataStore,
}: {
log: winston.Logger;
dataSource: ContiguousDataSource;
dataStore: ContiguousDataStore;
}) {
this.log = log.child({ class: 'ReadThroughDataCache' });
this.dataSource = dataSource;
this.dataStore = dataStore;
}

async getData(id: string): Promise<ContiguousData> {
this.log.info(`Fetching data for ${id}`);
// TODO check if data is in FS store
// TODO stream from FS store if it is

const data = await this.dataSource.getData(id);
let cacheStream: Writable;
try {
cacheStream = await this.dataStore.createWriteStream();
// TODO handle stream errors
data.stream.pipe(cacheStream);
} catch (error: any) {
this.log.error('Error creating cache stream:', {
id,
message: error.message,
stack: error.stack,
});
}

const hash = crypto.createHash('sha256');
data.stream.on('data', (chunk) => {
hash.update(chunk);
});

data.stream.on('error', (error) => {
this.log.error('Error reading data:', {
id,
message: error.message,
stack: error.stack,
});
// TODO delete temp file
});

// TODO should this be on cacheStream?
data.stream.on('end', () => {
if (cacheStream !== undefined) {
const digest = hash.digest();
const b64uDigest = toB64Url(digest);

this.log.info('Successfully cached data:', {
id,
hash: b64uDigest,
});
this.dataStore.finalize(cacheStream, digest);
} else {
this.log.error('Error caching data:', {
id,
message: 'no cache stream',
});
}
});

return data;
}
}
85 changes: 85 additions & 0 deletions src/store/fs-data-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import crypto from 'crypto';
import fs from 'fs';
import { Readable } from 'stream';
import winston from 'winston';

import { toB64Url } from '../lib/encoding.js';
import { ContiguousDataStore } from '../types.js';

export class FsDataStore implements ContiguousDataStore {
private log: winston.Logger;
private baseDir: string;

constructor({ log, baseDir }: { log: winston.Logger; baseDir: string }) {
this.log = log.child({ class: this.constructor.name });
this.baseDir = baseDir;
}

private tempDir() {
return `${this.baseDir}/tmp`;
}

private createTempPath() {
return `${this.tempDir()}/${crypto.randomBytes(16).toString('hex')}`;
}

private dataDir(b64uHashString: string) {
const hashPrefix = `${b64uHashString.substring(
0,
2,
)}/${b64uHashString.substring(2, 4)}`;
return `${this.baseDir}/data/${hashPrefix}`;
}

private dataPath(hash: Buffer) {
const hashString = toB64Url(hash);
return `${this.dataDir(hashString)}/${hashString}`;
}

async has(hash: Buffer) {
try {
await fs.promises.access(this.dataPath(hash), fs.constants.F_OK);
return true;
} catch (error) {
return false;
}
}

async get(hash: Buffer): Promise<Readable | undefined> {
try {
if (await this.has(hash)) {
return fs.createReadStream(this.dataPath(hash));
}
} catch (error: any) {
// TODO log hash
this.log.error('Failed to get contigous data stream', {
message: error.message,
stack: error.stack,
});
}
return undefined;
}

async createWriteStream() {
const tempPath = this.createTempPath();
await fs.promises.mkdir(this.tempDir(), { recursive: true });
const file = fs.createWriteStream(tempPath);
return file;
}

async finalize(stream: fs.WriteStream, hash: Buffer) {
try {
stream.end();
const dataDir = this.dataDir(toB64Url(hash));
await fs.promises.mkdir(dataDir, { recursive: true });
await fs.promises.rename(stream.path, this.dataPath(hash));
} catch (error: any) {
this.log.error('Failed to finalize contigous data stream', {
message: error.message,
stack: error.stack,
});
}
}

// TODO del?
}
9 changes: 8 additions & 1 deletion src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import { Readable } from 'stream';
import { Readable, Writable } from 'stream';

export interface B64uTag {
name: string;
Expand Down Expand Up @@ -154,6 +154,13 @@ export interface ChunkMetadataStore {
set(chunkMetadata: ChunkMetadata): Promise<void>;
}

export interface ContiguousDataStore {
has(hash: Buffer): Promise<boolean>;
get(hash: Buffer): Promise<Readable | undefined>;
createWriteStream(): Promise<Writable>;
finalize(stream: Writable, hash: Buffer): Promise<void>;
}

export interface ChainSource {
getBlockByHeight(height: number): Promise<PartialJsonBlock>;
getTx(txId: string): Promise<PartialJsonTransaction>;
Expand Down

0 comments on commit 3423035

Please sign in to comment.