Skip to content

Commit

Permalink
added cache refresing functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
yianzhang14 committed Mar 7, 2024
1 parent 68ca9a3 commit c69a44f
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 18 deletions.
1 change: 1 addition & 0 deletions docs/codebase.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ Improvements:
- potentially remove some unused classes
- is there any point in having connectors be passed to it instead of just making a baseconnector
- think if globus transfers of data should be cached
- remove the FolderUploaderHelper -- too generic for its own good

### JupyterHub

Expand Down
151 changes: 143 additions & 8 deletions server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
jupyterGlobusMap
} from "./configs/config";
import DB from "./src/DB";
import { FolderUploaderHelper } from "./src/FolderUploader";
import JupyterHub from "./src/JupyterHub";
import GitUtil from "./src/lib/GitUtil";
import GlobusUtil, { GlobusTaskListManager } from "./src/lib/GlobusUtil";
Expand All @@ -40,6 +41,7 @@ import type {
initGlobusDownloadBody,
createJobBody,
updateJobBody,
refreshCacheBody,
} from "./src/types";

// create the express app
Expand Down Expand Up @@ -138,6 +140,12 @@ const schemas = {
},
required: ["jupyterhubApiToken", "toEndpoint", "toPath"],
},
refreshCache: {
type: "object",
properties: {
hpc: { type: "string" },
}
}
};

// handler for route errors
Expand Down Expand Up @@ -635,6 +643,124 @@ app.get("/git", async function (req, res) {
res.json({ git: await parseGit(gits) });
});

/**
* @openapi
* /git/refresh/:id:
* put:
* description: Refreshes a given git repo (with id :id) on the specified HPC, or all HPCs if HPC is not provided
* responses:
* 200:
* description: Refresh completed successfully
* 401:
* description: Cache refresh failed
* 402:
* description: Request body malformed
* 404:
* description: Provided git id was not found
*/
app.put("/git/refresh/:id", async function (req, res) {
const errors = requestErrors(
validator.validate(req.body, schemas.refreshCache)
);

if (errors.length > 0) {
res.status(402).json({ error: "invalid input", messages: errors });
return;
}

const body = req.body as refreshCacheBody;

const gitId = req.params.id;

const connection = await db.connect();
const git = await connection
.getRepository(Git)
.findOneOrFail({id: gitId});

if (!git) {
res.status(404).json({ error: "unknown folder with id " + gitId });
return;
}

try {
await GitUtil.refreshGit(git);

if (body.hpc) {
await FolderUploaderHelper.cachedUpload({ gitId: git.id }, body.hpc, "cache");
} else {
for (const hpc of Object.keys(hpcConfigMap)) {
// vv fun fact! you can avoid awaiting for a promise with the void keyword
await FolderUploaderHelper.cachedUpload({gitId: git.id}, hpc, "cache");
}
}
} catch (err) {
res.status(401).json({ error: `something went wrong with refreshing the cache; experienced error: ${Helper.assertError(err).toString()}`});
return;
}

});

/**
* @openapi
* /git/refresh/hpc/:id
* put:
* description: For the given HPC id (:id), refresh all git repos on it.
* responses:
* 200:
* description: Refresh completed successfully
* 401:
* description: Something went wrong with the cache reloading
*/
app.put("/git/refresh/hpc/:id", async function (req, res) {
const hpc = req.params.id;

const connection = await db.connect();
const repos = await connection
.getRepository(Git)
.find();

try {
for (const repo of repos) {
await GitUtil.refreshGit(repo);
await FolderUploaderHelper.cachedUpload({ gitId: repo.id }, hpc, "cache");
}
} catch (err) {
res.status(401).json({ error: `something went wrong with refreshing the cache; experienced error: ${Helper.assertError(err).toString()}`});
return;
}
});

/**
* @openapi
* /git/refresh
* put:
* description: Refresh all git repos on all HPCs.
* responses:
* 200:
* description: Refresh completed successfully
* 401:
* description: Something went wrong with the cache reloading
*/
app.put("/git/refresh", async function (req, res) {
const connection = await db.connect();

const repos = await connection.getRepository(Git).find();

try {
for (const repo of repos) {
await GitUtil.refreshGit(repo);

for (const hpc of Object.keys(hpcConfigMap)) {
// vv fun fact! you can avoid awaiting for a promise with the void keyword
await FolderUploaderHelper.cachedUpload({gitId: repo.id}, hpc, "cache");
}
}
} catch (err) {
res.status(401).json({ error: `something went wrong with refreshing the cache; experienced error: ${Helper.assertError(err).toString()}`});
return;
}
});

/**
* @openapi
* /folder:
Expand Down Expand Up @@ -745,13 +871,17 @@ app.delete("/folder/:folderId", authMiddleWare, async function (req, res) {
* description: Returns "unknown folder with id" when the specified folder is not found
*/
app.put("/folder/:folderId", authMiddleWare, async function (req, res) {
const body = req.body as updateFolderBody;
const errors = requestErrors(validator.validate(body, schemas.updateFolder));
const errors = requestErrors(
validator.validate(req.body, schemas.updateFolder)
);

if (errors.length > 0) {
res.status(402).json({ error: "invalid input", messages: errors });
return;
}

const body = req.body as updateFolderBody;

if (!res.locals.username) {
res.status(402).json({ error: "invalid token" });
return;
Expand Down Expand Up @@ -812,15 +942,17 @@ app.post(
"/folder/:folderId/download/globus-init",
authMiddleWare,
async function (req, res) {
const body = req.body as initGlobusDownloadBody;
const errors = requestErrors(
validator.validate(body, schemas.initGlobusDownload)
validator.validate(req.body, schemas.initGlobusDownload)
);

if (errors.length > 0) {
res.status(402).json({ error: "invalid input", messages: errors });
return;
}

const body = req.body as initGlobusDownloadBody;

if (!res.locals.username) {
res.status(402).json({ error: "invalid token" });
return;
Expand Down Expand Up @@ -973,14 +1105,15 @@ app.get(
* description: Returns "invalid input" and a list of errors with the format of the req body
*/
app.post("/job", authMiddleWare, async function (req, res) {
const body = req.body as createJobBody;
const errors = requestErrors(validator.validate(body, schemas.createJob));
const errors = requestErrors(validator.validate(req.body, schemas.createJob));

if (errors.length > 0) {
res.status(402).json({ error: "invalid input", messages: errors });
return;
}

const body = req.body as createJobBody;

// try to extract maintainer and hpc associated with the job
const maintainerName: string = body.maintainer ?? "community_contribution"; // default to community contribution job maintainer
const maintainer = maintainerConfigMap[maintainerName];
Expand Down Expand Up @@ -1069,13 +1202,15 @@ app.post("/job", authMiddleWare, async function (req, res) {
* description: Returns internal error when there is an exception while updating the job details
*/
app.put("/job/:jobId", authMiddleWare, async function (req, res) {
const body = req.body as updateJobBody;
const errors = requestErrors(validator.validate(body, schemas.updateJob));
const errors = requestErrors(validator.validate(req.body, schemas.updateJob));

if (errors.length > 0) {
res.status(402).json({ error: "invalid input", messages: errors });
return;
}

const body = req.body as updateJobBody;

if (!res.locals.username) {
res.status(402).json({ error: "invalid token" });
return;
Expand Down
2 changes: 1 addition & 1 deletion src/FolderUploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ abstract class CachedFolderUploader extends BaseFolderUploader {
await this.refreshCache();
}

await this.register(); // TODO: possibly indicate this was a cached upload
// await this.register(); // TODO: reenable this and mark it as cached somehow
await this.pullFromCache();
}
}
Expand Down
16 changes: 15 additions & 1 deletion src/Supervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import { config, maintainerConfigMap, hpcConfigMap } from "../configs/config";
import connectionPool from "./connectors/ConnectionPool";
import DB from "./DB";
import Emitter from "./Emitter";
import { FolderUploaderHelper } from "./FolderUploader";
import GitUtil from "./lib/GitUtil";
import * as Helper from "./lib/Helper";
import BaseMaintainer from "./maintainers/BaseMaintainer";
import { Git } from "./models/Git";
import { Job } from "./models/Job";
import Queue from "./Queue";
import { SSH } from "./types";
Expand Down Expand Up @@ -65,7 +68,18 @@ class Supervisor {

// in the main loop, check if we should rezip things to keep things up to date
if (Date.now() - this.prevRefreshTime >= 24 * 60 * 60 * 1000) { // number of milliseconds in a day
// TODO
const connection = await this.db.connect();

const repos = await connection.getRepository(Git).find();

for (const repo of repos) {
await GitUtil.refreshGit(repo);

for (const hpc of Object.keys(hpcConfigMap)) {
// vv fun fact! you can avoid awaiting for a promise with the void keyword
void FolderUploaderHelper.cachedUpload({gitId: repo.id}, hpc, "cache");
}
}
}

// iterate over all HPCs
Expand Down
2 changes: 1 addition & 1 deletion src/maintainers/CommunityContributionMaintainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class CommunityContributionMaintainer extends BaseMaintainer {
await FolderUploaderHelper.cachedUpload(
localExecutableFolder,
this.job.hpc,
this.job.userId,
"cache", // this.job.userId // upload it under the 'cache' user
this.job.id,
this.connector
)
Expand Down
18 changes: 11 additions & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,17 @@ export interface createJobBody {

export interface updateJobBody {
jupyterhubApiToken: string,
param?: object,
env?: object,
slurm?: object,
localExecutableFolder?: object,
localDataFolder?: object,
remoteDataFolder?: object,
remoteExecutableFolder?: object,
param?: object,
env?: object,
slurm?: object,
localExecutableFolder?: object,
localDataFolder?: object,
remoteDataFolder?: object,
remoteExecutableFolder?: object,
}

export interface refreshCacheBody {
hpc?: string
}

export type PushFunction = (_args: unknown[]) => Promise<number>;
Expand Down

0 comments on commit c69a44f

Please sign in to comment.