Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mis/portal): 集群停用功能 #1266

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/eleven-feet-turn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/mis-server": minor
---

在管理系统和门户系统中增加依赖于管理系统的集群停用功能,在数据库中新增 Cluster 表单
**注意:停用后集群将不可用,集群所有数据不再更新。再启用后请手动同步平台数据!**
5 changes: 5 additions & 0 deletions .changeset/great-starfishes-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@scow/ai": patch
---

同步操作日志服务中的日志类型,增加启用集群,停用集群
5 changes: 5 additions & 0 deletions .changeset/grumpy-months-cover.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@scow/config": patch
---

增加集群停用功能通用类型
12 changes: 12 additions & 0 deletions .changeset/long-kids-wash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"@scow/portal-server": minor
"@scow/portal-web": minor
"@scow/mis-web": minor
"@scow/lib-server": minor
"@scow/cli": minor
"@scow/lib-web": minor
"@scow/docs": minor
---

在管理系统和门户系统中增加依赖于管理系统的集群停用功能
**注意:停用后集群将不可用,集群所有数据不再更新。再启用后请手动同步平台数据!**
6 changes: 6 additions & 0 deletions .changeset/weak-chicken-worry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@scow/grpc-api": minor
---

新增集群停用功能 api: getClustersRuntimeInfo, activateCluster, deactivateCluster
新增获取集群配置信息api: getClusterConfigFiles
2 changes: 2 additions & 0 deletions apps/ai/src/models/operationLog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ export const OperationType: OperationTypeEnum = {
setAccountBlockThreshold: "setAccountBlockThreshold",
setAccountDefaultBlockThreshold: "setAccountDefaultBlockThreshold",
userChangeTenant: "userChangeTenant",
activateCluster: "activateCluster",
deactivateCluster: "deactivateCluster",
customEvent: "customEvent",
};

3 changes: 3 additions & 0 deletions apps/cli/src/compose/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ export const createComposeSpec = (config: InstallConfigSchema) => {
environment: {
SCOW_LAUNCH_APP: "portal-server",
PORTAL_BASE_PATH: portalBasePath,
MIS_DEPLOYED: config.mis ? "true" : "false",
MIS_SERVER_URL: config.mis ? "mis-server:5000" : "",
...serviceLogEnv,
...nodeOptions ? { NODE_OPTIONS: nodeOptions } : {},
},
Expand All @@ -269,6 +271,7 @@ export const createComposeSpec = (config: InstallConfigSchema) => {
"BASE_PATH": portalBasePath,
"MIS_URL": join(BASE_PATH, MIS_PATH),
"MIS_DEPLOYED": config.mis ? "true" : "false",
"MIS_SERVER_URL": config.mis ? "mis-server:5000" : "",
"AI_URL": join(BASE_PATH, AI_PATH),
"AI_DEPLOYED": config.ai ? "true" : "false",
"AUTH_EXTERNAL_URL": config.auth.custom?.external?.url || join(BASE_PATH, "/auth"),
Expand Down
1 change: 1 addition & 0 deletions apps/cli/tests/compose.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ it("generate correct paths", async () => {
const composeConfig = createComposeSpec(config);

expect(composeConfig.services["portal-web"].environment).toContain("MIS_URL=/mis");
expect(composeConfig.services["portal-web"].environment).toContain("MIS_SERVER_URL=mis-server:5000");
expect(composeConfig.services["mis-web"].environment).toContain("PORTAL_URL=/");
expect(composeConfig.services["ai"].environment).toContain("MIS_URL=/mis");
});
Expand Down
1 change: 1 addition & 0 deletions apps/mis-server/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export async function createServer() {
for (const plugin of plugins) {
await server.register(plugin);
}

await server.register(accountServiceServer);
await server.register(userServiceServer);
await server.register(adminServiceServer);
Expand Down
8 changes: 5 additions & 3 deletions apps/mis-server/src/bl/PriceMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { Logger } from "@ddadaal/tsgrpc-server";
import { MySqlDriver, SqlEntityManager } from "@mikro-orm/mysql";
import { Partition } from "@scow/scheduler-adapter-protos/build/protos/config";
import { calculateJobPrice } from "src/bl/jobPrice";
import { clusters } from "src/config/clusters";
import { configClusters } from "src/config/clusters";
import { misConfig } from "src/config/mis";
import { JobPriceInfo } from "src/entities/JobInfo";
import { AmountStrategy, JobPriceItem } from "src/entities/JobPriceItem";
Expand Down Expand Up @@ -90,7 +90,10 @@ export async function createPriceMap(

// partitions info for all clusters
const partitionsForClusters: Record<string, Partition[]> = {};

// call for all config clusters
const reply = await clusterPlugin.callOnAll(
configClusters,
logger,
async (client) => await asyncClientCall(client.config, "getClusterConfig", {}),
);
Expand All @@ -106,10 +109,9 @@ export async function createPriceMap(

const missingPaths = [] as string[];

for (const cluster in clusters) {
for (const cluster in configClusters) {
for (const partition of partitionsForClusters[cluster]) {
const path = [cluster, partition.name];

const { qos } = partition;

if (path.join(".") in defaultPrices) {
Expand Down
64 changes: 53 additions & 11 deletions apps/mis-server/src/bl/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ import { asyncClientCall } from "@ddadaal/tsgrpc-client";
import { Logger } from "@ddadaal/tsgrpc-server";
import { Loaded } from "@mikro-orm/core";
import { MySqlDriver, SqlEntityManager } from "@mikro-orm/mysql";
import { ClusterConfigSchema } from "@scow/config/build/cluster";
import { BlockedFailedUserAccount } from "@scow/protos/build/server/admin";
import { Account } from "src/entities/Account";
import { UserAccount, UserStatus } from "src/entities/UserAccount";
import { ClusterPlugin } from "src/plugins/clusters";
import { callHook } from "src/plugins/hookClient";

import { getActivatedClusters } from "./clustersUtils";


/**
* Update block status of accounts and users in the slurm.
* If it is whitelisted, it doesn't block.
Expand All @@ -31,15 +35,33 @@ export async function updateBlockStatusInSlurm(
) {
const blockedAccounts: string[] = [];
const blockedFailedAccounts: string[] = [];
const blockedUserAccounts: [string, string][] = [];
const blockedFailedUserAccounts: BlockedFailedUserAccount[] = [];

const accounts = await em.find(Account, { blockedInCluster: true });

const currentActivatedClusters = await getActivatedClusters(em, logger).catch((e) => {
logger.info(e);
return {};
});

if (Object.keys(currentActivatedClusters).length === 0) {
logger.info("No available activated clusters in SCOW.");
return {
blockedAccounts,
blockedFailedAccounts,
blockedUserAccounts,
blockedFailedUserAccounts,
};
}

for (const account of accounts) {
if (account.whitelist) {
continue;
}

try {
await clusterPlugin.callOnAll(logger, async (client) =>
await clusterPlugin.callOnAll(currentActivatedClusters, logger, async (client) =>
await asyncClientCall(client.account, "blockAccount", {
accountName: account.accountName,
}),
Expand All @@ -50,15 +72,14 @@ export async function updateBlockStatusInSlurm(
}
}

const blockedUserAccounts: [string, string][] = [];
const blockedFailedUserAccounts: BlockedFailedUserAccount[] = [];

const userAccounts = await em.find(UserAccount, {
blockedInCluster: UserStatus.BLOCKED,
}, { populate: ["user", "account"]});

for (const ua of userAccounts) {
try {
await clusterPlugin.callOnAll(logger, async (client) =>
await clusterPlugin.callOnAll(currentActivatedClusters, logger, async (client) =>
await asyncClientCall(client.user, "blockUserInAccount", {
accountName: ua.account.$.accountName,
userId: ua.user.$.userId,
Expand Down Expand Up @@ -108,9 +129,22 @@ export async function updateUnblockStatusInSlurm(
const unblockedAccounts: string[] = [];
const unblockedFailedAccounts: string[] = [];

const currentActivatedClusters = await getActivatedClusters(em, logger).catch((e) => {
logger.info(e);
return {};
});

if (Object.keys(currentActivatedClusters).length === 0) {
logger.info("No available activated clusters in SCOW.");
return {
unblockedAccounts,
unblockedFailedAccounts,
};
}

for (const account of accounts) {
try {
await clusterPlugin.callOnAll(logger, async (client) =>
await clusterPlugin.callOnAll(currentActivatedClusters, logger, async (client) =>
await asyncClientCall(client.account, "unblockAccount", {
accountName: account.accountName,
}),
Expand Down Expand Up @@ -140,7 +174,10 @@ export async function updateUnblockStatusInSlurm(
* @returns Operation result
**/
export async function blockAccount(
account: Loaded<Account, "tenant">, clusterPlugin: ClusterPlugin["clusters"], logger: Logger,
account: Loaded<Account, "tenant">,
currentActivatedClusters: Record<string, ClusterConfigSchema>,
clusterPlugin: ClusterPlugin["clusters"],
logger: Logger,
): Promise<"AlreadyBlocked" | "Whitelisted" | "OK"> {

if (account.blockedInCluster) { return "AlreadyBlocked"; }
Expand All @@ -149,7 +186,7 @@ export async function blockAccount(
return "Whitelisted";
}

await clusterPlugin.callOnAll(logger, async (client) => {
await clusterPlugin.callOnAll(currentActivatedClusters, logger, async (client) => {
await asyncClientCall(client.account, "blockAccount", {
accountName: account.accountName,
});
Expand All @@ -170,12 +207,15 @@ export async function blockAccount(
* @returns Operation result
**/
export async function unblockAccount(
account: Loaded<Account, "tenant">, clusterPlugin: ClusterPlugin["clusters"], logger: Logger,
account: Loaded<Account, "tenant">,
currentActivatedClusters: Record<string, ClusterConfigSchema>,
clusterPlugin: ClusterPlugin["clusters"],
logger: Logger,
): Promise<"OK" | "ALREADY_UNBLOCKED"> {

if (!account.blockedInCluster) { return "ALREADY_UNBLOCKED"; }

await clusterPlugin.callOnAll(logger, async (client) => {
await clusterPlugin.callOnAll(currentActivatedClusters, logger, async (client) => {
await asyncClientCall(client.account, "unblockAccount", {
accountName: account.accountName,
});
Expand All @@ -193,6 +233,7 @@ export async function unblockAccount(
* */
export async function blockUserInAccount(
ua: Loaded<UserAccount, "user" | "account">,
currentActivatedClusters: Record<string, ClusterConfigSchema>,
clusterPlugin: ClusterPlugin, logger: Logger,
) {
if (ua.blockedInCluster == UserStatus.BLOCKED) {
Expand All @@ -202,7 +243,7 @@ export async function blockUserInAccount(
const accountName = ua.account.$.accountName;
const userId = ua.user.$.userId;

await clusterPlugin.clusters.callOnAll(logger, async (client) =>
await clusterPlugin.clusters.callOnAll(currentActivatedClusters, logger, async (client) =>
await asyncClientCall(client.user, "blockUserInAccount", {
accountName,
userId,
Expand All @@ -222,6 +263,7 @@ export async function blockUserInAccount(
* */
export async function unblockUserInAccount(
ua: Loaded<UserAccount, "user" | "account">,
currentActivatedClusters: Record<string, ClusterConfigSchema>,
clusterPlugin: ClusterPlugin, logger: Logger,
) {
if (ua.blockedInCluster === UserStatus.UNBLOCKED) {
Expand All @@ -231,7 +273,7 @@ export async function unblockUserInAccount(
const accountName = ua.account.getProperty("accountName");
const userId = ua.user.getProperty("userId");

await clusterPlugin.clusters.callOnAll(logger, async (client) =>
await clusterPlugin.clusters.callOnAll(currentActivatedClusters, logger, async (client) =>
await asyncClientCall(client.user, "unblockUserInAccount", {
accountName,
userId,
Expand Down
27 changes: 18 additions & 9 deletions apps/mis-server/src/bl/charging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import { Logger } from "@ddadaal/tsgrpc-server";
import { Loaded } from "@mikro-orm/core";
import { SqlEntityManager } from "@mikro-orm/mysql";
import { ClusterConfigSchema } from "@scow/config/build/cluster";
import { Decimal, decimalToMoney } from "@scow/lib-decimal";
import { blockAccount, blockUserInAccount, unblockAccount, unblockUserInAccount } from "src/bl/block";
import { Account } from "src/entities/Account";
Expand Down Expand Up @@ -58,6 +59,7 @@ export function checkShouldUnblockAccount(account: Loaded<Account, "tenant">) {

export async function pay(
request: PayRequest, em: SqlEntityManager,
currentActivatedClusters: Record<string, ClusterConfigSchema>,
logger: Logger, clusterPlugin: ClusterPlugin,
) {
const {
Expand Down Expand Up @@ -92,15 +94,15 @@ export async function pay(
&& checkShouldUnblockAccount(target)
) {
logger.info("Unblock account %s", target.accountName);
await unblockAccount(target, clusterPlugin.clusters, logger);
await unblockAccount(target, currentActivatedClusters, clusterPlugin.clusters, logger);
}

if (
target instanceof Account
&& checkShouldBlockAccount(target)
) {
logger.info("Block account %s", target.accountName);
await blockAccount(target, clusterPlugin.clusters, logger);
await blockAccount(target, currentActivatedClusters, clusterPlugin.clusters, logger);
}

return {
Expand All @@ -120,6 +122,7 @@ type ChargeRequest = {

export async function charge(
request: ChargeRequest, em: SqlEntityManager,
currentActivatedClusters: Record<string, ClusterConfigSchema>,
logger: Logger, clusterPlugin: ClusterPlugin,
) {
const { target, amount, comment, type, userId, metadata } = request;
Expand All @@ -144,7 +147,7 @@ export async function charge(
&& checkShouldBlockAccount(target)
) {
logger.info("Block account %s due to out of balance.", target.accountName);
await blockAccount(target, clusterPlugin.clusters, logger);
await blockAccount(target, currentActivatedClusters, clusterPlugin.clusters, logger);
}

return {
Expand All @@ -155,7 +158,10 @@ export async function charge(

export async function addJobCharge(
ua: Loaded<UserAccount, "user" | "account">,
charge: Decimal, clusterPlugin: ClusterPlugin, logger: Logger,
charge: Decimal,
currentActivatedClusters: Record<string, ClusterConfigSchema>,
clusterPlugin: ClusterPlugin,
logger: Logger,
) {
if (ua.usedJobCharge && ua.jobChargeLimit) {
ua.usedJobCharge = ua.usedJobCharge.plus(charge);
Expand All @@ -167,16 +173,19 @@ export async function addJobCharge(
).shouldBlockInCluster;

if (shouldBlockUserInCluster) {
await blockUserInAccount(ua, clusterPlugin, logger);
await blockUserInAccount(ua, currentActivatedClusters, clusterPlugin, logger);
} else {
await unblockUserInAccount(ua, clusterPlugin, logger);
await unblockUserInAccount(ua, currentActivatedClusters, clusterPlugin, logger);
}
}
}

export async function setJobCharge(
ua: Loaded<UserAccount, "user" | "account">,
charge: Decimal, clusterPlugin: ClusterPlugin, logger: Logger,
charge: Decimal,
currentActivatedClusters: Record<string, ClusterConfigSchema>,
clusterPlugin: ClusterPlugin,
logger: Logger,
) {
ua.jobChargeLimit = charge;
if (!ua.usedJobCharge) {
Expand All @@ -190,9 +199,9 @@ export async function setJobCharge(
).shouldBlockInCluster;

if (shouldBlockUserInCluster) {
await blockUserInAccount(ua, clusterPlugin, logger);
await blockUserInAccount(ua, currentActivatedClusters, clusterPlugin, logger);
} else {
await unblockUserInAccount(ua, clusterPlugin, logger);
await unblockUserInAccount(ua, currentActivatedClusters, clusterPlugin, logger);
}
}
}
Loading
Loading