Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes in adapters in order to store parentID #985

Open
wants to merge 1 commit into
base: devel
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/adapters/src/docker-instance-adapter.ts
Original file line number Diff line number Diff line change
@@ -186,6 +186,8 @@ IComponent {

const networkSetup = await this.getNetworkSetup();

const volumeId = config.id + "_" + config.parent_id;

const envs = getRunnerEnvEntries({
sequencePath: path.join(config.sequenceDir, config.entrypointPath),
instancesServerPort,
@@ -200,7 +202,7 @@ IComponent {
imageName: config.container.image,
volumes: [
...extraVolumes,
{ mountPoint: config.sequenceDir, volume: config.id, writeable: false }
{ mountPoint: config.sequenceDir, volume: volumeId, writeable: false }
],
labels: {
"scramjet.sequence.name": config.name
30 changes: 22 additions & 8 deletions packages/adapters/src/docker-sequence-adapter.ts
Original file line number Diff line number Diff line change
@@ -108,13 +108,18 @@ class DockerSequenceAdapter implements ISequenceAdapter {
this.logger.debug("Identify started", volume, this.config.docker.prerunner?.maxMem || 0);

const ret = await this.parsePackage(streams, wait, volume);
const [, parentId] = volume.split("_");

if (!ret.id) {
return undefined;
}

this.logger.info("Identified image for volume", { volume, image: ret.container?.image });

if (parentId) {
ret.parent_id = parentId;
}

return ret;
} catch (e: any) {
this.logger.error("Docker failed", e.message, volume);
@@ -132,17 +137,18 @@ class DockerSequenceAdapter implements ISequenceAdapter {
* @param {Readable} stream Stream containing sequence to be identified.
* @param {string} id Id for the new docker volume where sequence will be stored.
* @param {boolean} override Removes previous sequence
* @param {string} parentId Id which indicates sequence's source.

* @returns {Promise<SequenceConfig>} Promise resolving to sequence config.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
async identify(stream: Readable, id: string, override = false, parentId: string): Promise<SequenceConfig> {
const volStart = new Date();

if (override) {
await this.dockerHelper.removeVolume(id);
}

const volumeId = await this.createVolume(id);

const volumeId = await this.createVolume(id, parentId);
const volSecs = (new Date().getTime() - volStart.getTime()) / 1000;

appendFile("timing-log.ndjson", JSON.stringify({
@@ -189,6 +195,10 @@ class DockerSequenceAdapter implements ISequenceAdapter {

await this.fetch(config.container.image);

if (parentId) {
config.parent_id = parentId;
}

return config;
} catch (err: any) {
this.logger.error("Identify failed on volume", id);
@@ -204,11 +214,12 @@ class DockerSequenceAdapter implements ISequenceAdapter {
* Creates volume with provided id.
*
* @param {string} id Volume id.
* @param {string} parentId Sequence's parentId.
* @returns {DockerVolume} Created volume.
*/
private async createVolume(id: string): Promise<DockerVolume> {
private async createVolume(id: string, parentId?: string): Promise<DockerVolume> {
try {
return await this.dockerHelper.createVolume(id);
return await this.dockerHelper.createVolume(id, parentId);
} catch (error: any) {
this.logger.error("Error creating volume", id);

@@ -233,7 +244,6 @@ class DockerSequenceAdapter implements ISequenceAdapter {
const parseStart = new Date();

const [preRunnerResult] = (await Promise.all([readStreamedJSON(streams.stdout as Readable), wait])) as any;

const parseSecs = (new Date().getTime() - parseStart.getTime()) / 1000;

appendFile("timing-log.ndjson", JSON.stringify({
@@ -252,6 +262,7 @@ class DockerSequenceAdapter implements ISequenceAdapter {
const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(preRunnerResult);
const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {};
const config = validPackageJson.scramjet?.config ? { ...validPackageJson.scramjet.config } : {};
const [id, parentId] = volumeId.split("_");

const container = Object.assign({}, this.config.docker.runner);

@@ -268,7 +279,8 @@ class DockerSequenceAdapter implements ISequenceAdapter {
config,
sequenceDir: PACKAGE_DIR,
entrypointPath: validPackageJson.main,
id: volumeId,
id: id,
parent_id: parentId,
description: validPackageJson.description,
author: validPackageJson.author,
keywords: validPackageJson.keywords,
@@ -288,7 +300,9 @@ class DockerSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to DockerSequenceAdapter: ${config.type}`);
}

await this.dockerHelper.removeVolume(config.id);
const volumeId = config.id + "_" + config.parent_id;

await this.dockerHelper.removeVolume(volumeId);

this.logger.debug("Volume removed", config.id);
}
5 changes: 4 additions & 1 deletion packages/adapters/src/dockerode-docker-helper.ts
Original file line number Diff line number Diff line change
@@ -261,9 +261,12 @@ export class DockerodeDockerHelper implements IDockerHelper {
* Creates docker volume.
*
* @param name Volume name. Optional. If not provided, volume will be named with unique name.
* @param parentId Volume parentId. Optional. If not provided, volume will be named the same as the name.
* @returns Volume name.
*/
async createVolume(name: string = ""): Promise<DockerVolume> {
async createVolume(name: string = "", parentId: string = name): Promise<DockerVolume> {
name += "_" + parentId;

return this.dockerode.createVolume({
Name: name,
Labels: {
33 changes: 26 additions & 7 deletions packages/adapters/src/kubernetes-sequence-adapter.ts
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ import { isDefined, readStreamedJSON } from "@scramjet/utility";
import { sequencePackageJSONDecoder } from "./validate-sequence-package-json";
import { adapterConfigDecoder } from "./kubernetes-config-decoder";
import { detectLanguage } from "./utils";
import { IDProvider } from "@scramjet/model";

/**
* Returns existing Sequence configuration.
@@ -24,8 +25,23 @@ import { detectLanguage } from "./utils";
* @param {string} id Sequence Id.
* @returns {ProcessSequenceConfig} Sequence configuration.
*/
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise<KubernetesSequenceConfig> {
const sequenceDir = path.join(sequencesRoot, id);
// eslint-disable-next-line max-len
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise<KubernetesSequenceConfig> {
let sequenceDir: string;

if (parentId) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
[id, parentId] = id.split("_");
const valid = IDProvider.isValid(id);

if (valid) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
sequenceDir = path.join(sequencesRoot, id);
}
}

const packageJsonPath = path.join(sequenceDir, "package.json");
const packageJson = await readStreamedJSON(createReadStream(packageJsonPath));

@@ -38,6 +54,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin
version: validPackageJson.version ?? "",
name: validPackageJson.name ?? "",
id,
parent_id: parentId || id,
sequenceDir,
engines,
description: validPackageJson.description,
@@ -110,13 +127,15 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {
*
* @param {Readable} stream Stream with packed sequence.
* @param {string} id Sequence Id.
* @param {boolean} override Removes previous sequence
* @param {boolean} override Removes previous sequence.
* @param {string} parentId Sequence's parentId.

* @returns {Promise<SequenceConfig>} Promise resolving to identified sequence configuration.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
async identify(stream: Readable, id: string, override = false, parentId = id): Promise<SequenceConfig> {
// 1. Unpack package.json to stdout and map to config
// 2. Create compressed package on the disk
const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id);
const sequenceDir = path.join(this.adapterConfig.sequencesRoot, id + "_" + parentId);

if (override) {
await fs.rm(sequenceDir, { recursive: true, force: true });
@@ -134,7 +153,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {

await new Promise(res => uncompressingProc.on("close", res));

return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id);
return getRunnerConfigForStoredSequence(this.adapterConfig.sequencesRoot, id, parentId);
}

/**
@@ -148,7 +167,7 @@ class KubernetesSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to KubernetesSequenceAdapter: ${config.type}`);
}

const sequenceDir = path.join(this.adapterConfig.sequencesRoot, config.id);
const sequenceDir = config.sequenceDir;

this.logger.debug("Removing sequence directory...", sequenceDir);

35 changes: 26 additions & 9 deletions packages/adapters/src/process-sequence-adapter.ts
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ import path from "path";
import { exec } from "child_process";
import { isDefined, readStreamedJSON } from "@scramjet/utility";
import { sequencePackageJSONDecoder } from "./validate-sequence-package-json";
import { SequenceAdapterError } from "@scramjet/model";
import { IDProvider, SequenceAdapterError } from "@scramjet/model";
import { detectLanguage } from "./utils";

/**
@@ -23,12 +23,25 @@ import { detectLanguage } from "./utils";
* @param {string} id Sequence Id.
* @returns {ProcessSequenceConfig} Sequence configuration.
*/
// eslint-disable-next-line complexity
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string): Promise<ProcessSequenceConfig> {
const sequenceDir = path.join(sequencesRoot, id);
// eslint-disable-next-line complexity, max-len
async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: string, parentId?: string): Promise<ProcessSequenceConfig> {
let sequenceDir: string;

if (parentId) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
[id, parentId] = id.split("_");
const valid = IDProvider.isValid(id);

if (valid) {
sequenceDir = path.join(sequencesRoot, id + "_" + parentId);
} else {
sequenceDir = path.join(sequencesRoot, id);
}
}

const packageJsonPath = path.join(sequenceDir, "package.json");
const packageJson = await readStreamedJSON(createReadStream(packageJsonPath));

const validPackageJson = await sequencePackageJSONDecoder.decodeToPromise(packageJson);
const engines = validPackageJson.engines ? { ...validPackageJson.engines } : {};

@@ -39,6 +52,7 @@ async function getRunnerConfigForStoredSequence(sequencesRoot: string, id: strin
version: validPackageJson.version ?? "",
name: validPackageJson.name ?? "",
id,
parent_id: parentId || id,
sequenceDir,
description: validPackageJson.description,
author: validPackageJson.author,
@@ -82,6 +96,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
*/
async list(): Promise<SequenceConfig[]> {
const storedSequencesIds = await fs.readdir(this.config.sequencesRoot);

const sequencesConfigs = (await Promise.all(
storedSequencesIds
.map((id) => getRunnerConfigForStoredSequence(this.config.sequencesRoot, id))
@@ -100,10 +115,12 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
* @param {Readable} stream Stream with packed sequence.
* @param {string} id Sequence Id.
* @param {boolean} override Removes previous sequence
* @param {string} parentId Sequence's parentId.

* @returns {Promise<SequenceConfig>} Promise resolving to identified sequence configuration.
*/
async identify(stream: Readable, id: string, override = false): Promise<SequenceConfig> {
const sequenceDir = path.join(this.config.sequencesRoot, id);
async identify(stream: Readable, id: string, override = false, parentId = id): Promise<SequenceConfig> {
const sequenceDir = path.join(this.config.sequencesRoot, id + "_" + parentId);

if (override) {
await fs.rm(sequenceDir, { recursive: true, force: true });
@@ -140,7 +157,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {

this.logger.debug("Unpacking sequence succeeded", stderrOutput);

return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id);
return getRunnerConfigForStoredSequence(this.config.sequencesRoot, id, parentId);
}

/**
@@ -154,7 +171,7 @@ class ProcessSequenceAdapter implements ISequenceAdapter {
throw new Error(`Incorrect SequenceConfig passed to ProcessSequenceAdapter: ${config.type}`);
}

const sequenceDir = path.join(this.config.sequencesRoot, config.id);
const sequenceDir = config.sequenceDir;

return fs.rm(sequenceDir, { recursive: true });
}
2 changes: 1 addition & 1 deletion packages/adapters/src/types.ts
Original file line number Diff line number Diff line change
@@ -260,7 +260,7 @@ export interface IDockerHelper {
*
* @returns {Promise<DockerVolume>} Created volume.
*/
createVolume: (name?: string) => Promise<DockerVolume>;
createVolume: (name?: string, parentId?: string) => Promise<DockerVolume>;

/**
* Removes volume.
3 changes: 2 additions & 1 deletion packages/host/src/lib/csi-controller.ts
Original file line number Diff line number Diff line change
@@ -790,7 +790,8 @@ export class CSIController extends TypedEmitter<Events> {
id: this.sequence.id,
config: this.sequence.config,
name: this.sequence.name,
location : this.sequence.location
location : this.sequence.location,
parent_id: this.sequence.parent_id
},
ports: this.info.ports,
created: this.info.created,
Loading
Oops, something went wrong.