Skip to content

Commit

Permalink
improvement(k8s): better process mgmt and logging for dev mode sync
Browse files Browse the repository at this point in the history
We now poll for more information from the mutagen daemon, and we've
overall improved the process management around it.

This unblocks more development around this feature, such as running
a postSyncCommand (like with the old hot reloading mechanism) and also
makes the use of mutagen easier for other contexts like syncing build
contexts for in-cluster building.
  • Loading branch information
edvald authored and thsig committed Aug 20, 2021
1 parent 6dd2334 commit 7a01e41
Show file tree
Hide file tree
Showing 11 changed files with 446 additions and 215 deletions.
4 changes: 2 additions & 2 deletions core/src/logger/renderers.ts
Expand Up @@ -93,7 +93,7 @@ export function renderSymbolBasic(entry: LogEntry): string {
let { symbol, status } = entry.getLatestMessage()

if (symbol === "empty") {
return " "
return " "
}
if (status === "active" && !symbol) {
symbol = "info"
Expand All @@ -106,7 +106,7 @@ export function renderSymbol(entry: LogEntry): string {
const { symbol } = entry.getLatestMessage()

if (symbol === "empty") {
return " "
return " "
}
return symbol ? `${logSymbols[symbol]} ` : ""
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/plugin-context.ts
Expand Up @@ -14,6 +14,8 @@ import { joi, joiVariables, joiStringMap, DeepPrimitiveMap } from "./config/comm
import { PluginTool } from "./util/ext-tools"
import { ConfigContext, ContextResolveOpts } from "./config/template-contexts/base"
import { resolveTemplateStrings } from "./template-string/template-string"
import { LogEntry } from "./logger/log-entry"
import { logEntrySchema } from "./types/plugin/base"

type WrappedFromGarden = Pick<
Garden,
Expand All @@ -37,6 +39,7 @@ type ResolveTemplateStringsOpts = Omit<ContextResolveOpts, "stack">

export interface PluginContext<C extends GenericProviderConfig = GenericProviderConfig> extends WrappedFromGarden {
command: CommandInfo
log: LogEntry
projectSources: SourceConfig[]
provider: Provider<C>
resolveTemplateStrings: <T>(o: T, opts?: ResolveTemplateStringsOpts) => T
Expand Down Expand Up @@ -64,6 +67,7 @@ export const pluginContextSchema = () =>
The absolute path of the project's Garden dir. This is the directory the contains builds, logs and
other meta data. A custom path can be set when initialising the Garden class. Defaults to \`.garden\`.
`),
log: logEntrySchema(),
production: joi
.boolean()
.default(false)
Expand Down Expand Up @@ -93,6 +97,7 @@ export async function createPluginContext(
command,
environmentName: garden.environmentName,
gardenDirPath: garden.gardenDirPath,
log: garden.log,
projectName: garden.projectName,
projectRoot: garden.projectRoot,
projectSources: garden.getProjectSources(),
Expand Down
3 changes: 2 additions & 1 deletion core/src/plugins/kubernetes/container/deployment.ts
Expand Up @@ -95,11 +95,12 @@ export async function startContainerDevSync({

await startDevModeSync({
ctx,
log: log.info({ section: service.name, symbol: "info", msg: chalk.gray(`Starting sync`) }),
log,
moduleRoot: service.module.path,
namespace,
target,
spec: service.spec.devMode,
serviceName: service.name,
})
}

Expand Down
180 changes: 45 additions & 135 deletions core/src/plugins/kubernetes/dev-mode.ts
Expand Up @@ -6,36 +6,24 @@
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

const AsyncLock = require("async-lock")
import { containerDevModeSchema, ContainerDevModeSpec } from "../container/config"
import { dedent, gardenAnnotationKey } from "../../util/string"
import { fromPairs, set } from "lodash"
import { set } from "lodash"
import { getResourceContainer, getResourcePodSpec } from "./util"
import { HotReloadableResource } from "./hot-reload/hot-reload"
import { LogEntry } from "../../logger/log-entry"
import { joinWithPosix } from "../../util/fs"
import chalk from "chalk"
import { pathExists, readFile, writeFile } from "fs-extra"
import { PluginContext } from "../../plugin-context"
import { join } from "path"
import { safeDump, safeLoad } from "js-yaml"
import { ConfigurationError } from "../../exceptions"
import { ensureMutagenDaemon, killSyncDaemon } from "./mutagen"
import { ensureMutagenSync, mutagenConfigLock } from "./mutagen"
import { joiIdentifier } from "../../config/common"
import { KubernetesPluginContext } from "./config"
import { prepareConnectionOpts } from "./kubectl"
import { sleep } from "../../util/util"

const syncUtilImageName = "gardendev/k8s-sync:0.1.1"
const mutagenAgentPath = "/.garden/mutagen-agent"

interface ActiveSync {
spec: ContainerDevModeSpec
}

const activeSyncs: { [key: string]: ActiveSync } = {}
const syncStartLock = new AsyncLock()

interface ConfigureDevModeParams {
target: HotReloadableResource
spec: ContainerDevModeSpec
Expand All @@ -52,8 +40,6 @@ export const kubernetesDevModeSchema = () =>
`Optionally specify the name of a specific container to sync to. If not specified, the first container in the workload is used.`
),
}).description(dedent`
**EXPERIMENTAL**
Specifies which files or directories to sync to which paths inside the running containers of the service when it's in dev mode, and overrides for the container command and/or arguments.
Note that \`serviceResource\` must also be specified to enable dev mode.
Expand Down Expand Up @@ -105,7 +91,6 @@ export function configureDevMode({ target, spec, containerName }: ConfigureDevMo
const initContainer = {
name: "garden-dev-init",
image: syncUtilImageName,
// TODO: inject agent + SSH server
command: ["/bin/sh", "-c", "cp /usr/local/bin/mutagen-agent " + mutagenAgentPath],
imagePullPolicy: "IfNotPresent",
volumeMounts: [gardenVolumeMount],
Expand All @@ -123,17 +108,12 @@ export function configureDevMode({ target, spec, containerName }: ConfigureDevMo
mainContainer.volumeMounts.push(gardenVolumeMount)
}

const mutagenModeMap = {
"one-way": "one-way-safe",
"one-way-replica": "one-way-replica",
"two-way": "two-way-safe",
}

interface StartDevModeSyncParams extends ConfigureDevModeParams {
ctx: PluginContext
log: LogEntry
moduleRoot: string
namespace: string
serviceName: string
}

export async function startDevModeSync({
Expand All @@ -144,20 +124,16 @@ export async function startDevModeSync({
namespace,
spec,
target,
serviceName,
}: StartDevModeSyncParams) {
if (spec.sync.length === 0) {
return
}
namespace = target.metadata.namespace || namespace
const resourceName = `${target.kind}/${target.metadata.name}`
const key = `${target.kind}--${namespace}--${target.metadata.name}`

return syncStartLock.acquire("start-sync", async () => {
// Check for already active sync
if (activeSyncs[key]) {
return activeSyncs[key]
}
const keyBase = `${target.kind}--${namespace}--${target.metadata.name}`

return mutagenConfigLock.acquire("start-sync", async () => {
// Validate the target
if (target.metadata.annotations?.[gardenAnnotationKey("dev-mode")] !== "true") {
throw new ConfigurationError(`Resource ${resourceName} is not deployed in dev mode`, {
Expand All @@ -177,118 +153,52 @@ export async function startDevModeSync({

const kubectl = ctx.tools["kubernetes.kubectl"]
const kubectlPath = await kubectl.getPath(log)
const k8sCtx = <KubernetesPluginContext>ctx

const mutagen = ctx.tools["kubernetes.mutagen"]
let dataDir = await ensureMutagenDaemon(log, mutagen)
let i = 0

const k8sCtx = <KubernetesPluginContext>ctx
for (const s of spec.sync) {
const key = `${keyBase}-${i}`

// Configure Mutagen with all the syncs
const syncConfigs = fromPairs(
spec.sync.map((s, i) => {
const connectionOpts = prepareConnectionOpts({
provider: k8sCtx.provider,
namespace,
})
const command = [
kubectlPath,
"exec",
"-i",
...connectionOpts,
"--container",
containerName,
`${target.kind}/${target.metadata.name}`,
"--",
mutagenAgentPath,
"synchronizer",
]

const syncConfig = {
const connectionOpts = prepareConnectionOpts({
provider: k8sCtx.provider,
namespace,
})
const command = [
kubectlPath,
"exec",
"-i",
...connectionOpts,
"--container",
containerName,
`${target.kind}/${target.metadata.name}`,
"--",
mutagenAgentPath,
"synchronizer",
]

const sourceDescription = chalk.white(s.source)
const targetDescription = `${chalk.white(s.target)} in ${chalk.white(resourceName)}`
const description = `${sourceDescription} to ${targetDescription}`

ctx.log.info({ symbol: "info", section: serviceName, msg: chalk.gray(`Syncing ${description} (${s.mode})`) })

await ensureMutagenSync({
// Prefer to log to the main view instead of the handler log context
log: ctx.log,
key,
logSection: serviceName,
sourceDescription,
targetDescription,
config: {
alpha: joinWithPosix(moduleRoot, s.source),
beta: `exec:'${command.join(" ")}':${s.target}`,
mode: mutagenModeMap[s.mode],
ignore: {
paths: s.exclude || [],
},
}

log.info(
chalk.gray(
`→ Syncing ${chalk.white(s.source)} to ${chalk.white(s.target)} in ${chalk.white(resourceName)} (${s.mode})`
)
)

return [`${key}-${i}`, syncConfig]
mode: s.mode,
ignore: s.exclude || [],
},
})
)

let config: any = {
sync: {},
}

// Commit the configuration to the Mutagen daemon

let loops = 0
const maxRetries = 10
while (true) {
// When deploying Helm services with dev mode, sometimes the first deployment (e.g. when the namespace has just
// been created) will fail because the daemon can't connect to the pod (despite the call to `waitForResources`)
// in the Helm deployment handler.
//
// In addition, when several services are deployed with dev mode, we occasionally need to retry restarting the
// mutagen daemon after the first try (we need to restart it to reload the updated mutagen project, which
// needs to contain representations of all the sync specs).
//
// When either of those happens, we simply kill the mutagen daemon, wait, and try again (up to a fixed number
// of retries).
//
// TODO: Maybe there's a more elegant way to do this?
try {
const configPath = join(dataDir, "mutagen.yml")

if (await pathExists(configPath)) {
config = safeLoad((await readFile(configPath)).toString())
}

config.sync = { ...config.sync, ...syncConfigs }

await writeFile(configPath, safeDump(config))

await mutagen.exec({
cwd: dataDir,
args: ["project", "start"],
log,
env: {
MUTAGEN_DATA_DIRECTORY: dataDir,
},
})
break
} catch (err) {
const unableToConnect = err.message.match(/unable to connect to beta/)
const alreadyRunning = err.message.match(/project already running/)
if ((unableToConnect || alreadyRunning) && loops < 10) {
loops += 1
if (unableToConnect) {
log.setState(`Synchronization daemon failed to connect, retrying (attempt ${loops}/${maxRetries})...`)
} else if (alreadyRunning) {
log.setState(`Project already running, retrying (attempt ${loops}/${maxRetries})...`)
}
await killSyncDaemon(false)
await sleep(2000 + loops * 500)
dataDir = await ensureMutagenDaemon(log, mutagen)
} else {
log.setError(err.message)
throw err
}
}
i++
}
log.setSuccess("Synchronization daemon started")

// TODO: Attach to Mutagen GRPC to poll for sync updates

const sync: ActiveSync = { spec }
activeSyncs[key] = sync

return sync
})
}
4 changes: 2 additions & 2 deletions core/src/plugins/kubernetes/helm/deployment.ts
Expand Up @@ -22,7 +22,6 @@ import { getServiceResource, getServiceResourceSpec } from "../util"
import { getModuleNamespace, getModuleNamespaceStatus } from "../namespace"
import { getHotReloadSpec, configureHotReload, getHotReloadContainerName } from "../hot-reload/helpers"
import { configureDevMode, startDevModeSync } from "../dev-mode"
import chalk from "chalk"

export async function deployHelmService({
ctx,
Expand Down Expand Up @@ -136,12 +135,13 @@ export async function deployHelmService({
if (devMode && service.spec.devMode && serviceResource && serviceResourceSpec) {
await startDevModeSync({
ctx,
log: log.info({ section: service.name, symbol: "info", msg: chalk.gray(`Starting sync`) }),
log,
moduleRoot: service.sourceModule.path,
namespace: serviceResource.metadata.namespace || namespace,
target: serviceResource,
spec: service.spec.devMode,
containerName: service.spec.devMode.containerName,
serviceName: service.name,
})
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/plugins/kubernetes/helm/status.ts
Expand Up @@ -17,7 +17,6 @@ import { getForwardablePorts } from "../port-forward"
import { KubernetesServerResource } from "../types"
import { getModuleNamespace, getModuleNamespaceStatus } from "../namespace"
import { getServiceResource, getServiceResourceSpec } from "../util"
import chalk from "chalk"
import { startDevModeSync } from "../dev-mode"
import { gardenAnnotationKey } from "../../../util/string"

Expand Down Expand Up @@ -97,12 +96,13 @@ export async function getServiceStatus({

await startDevModeSync({
ctx,
log: log.info({ section: service.name, symbol: "info", msg: chalk.gray(`Starting sync`) }),
log,
moduleRoot: service.sourceModule.path,
namespace,
target,
spec: service.spec.devMode,
containerName: service.spec.devMode.containerName,
serviceName: service.name,
})
} else {
state = "outdated"
Expand Down

0 comments on commit 7a01e41

Please sign in to comment.