Skip to content

Commit

Permalink
fix(k8s): log tailing now returns logs for new pods at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
edvald committed Dec 4, 2018
1 parent 95fcacd commit 432e6dc
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 47 deletions.
38 changes: 0 additions & 38 deletions garden-service/src/plugins/kubernetes/actions.ts
Expand Up @@ -333,44 +333,6 @@ export async function getTestResult(
}
}

export async function getServiceLogs(
{ ctx, service, stream, tail }: GetServiceLogsParams<ContainerModule>,
) {
const context = ctx.provider.config.context
const resourceType = service.spec.daemon ? "daemonset" : "deployment"

const kubectlArgs = ["logs", `${resourceType}/${service.name}`, "--timestamps=true"]

if (tail) {
kubectlArgs.push("--follow")
}

const namespace = await getAppNamespace(ctx, ctx.provider)
const proc = kubectl(context, namespace).spawn(kubectlArgs)
let timestamp: Date

proc.stdout
.pipe(split())
.on("data", (s) => {
if (!s) {
return
}
const [timestampStr, msg] = splitFirst(s, " ")
try {
timestamp = moment(timestampStr).toDate()
} catch { }
void stream.write({ serviceName: service.name, timestamp, msg })
})

return new Promise<GetServiceLogsResult>((resolve, reject) => {
proc.on("error", reject)

proc.on("exit", () => {
resolve({})
})
})
}

function getTestResultKey(module: ContainerModule, testName: string, version: ModuleVersion) {
return `test-result--${module.name}--${testName}--${version.versionString}`
}
2 changes: 1 addition & 1 deletion garden-service/src/plugins/kubernetes/kubernetes.ts
Expand Up @@ -20,7 +20,6 @@ import { getGenericTaskStatus } from "../generic"
import {
deleteService,
execInService,
getServiceLogs,
getServiceOutputs,
getTestResult,
hotReload,
Expand All @@ -34,6 +33,7 @@ import { helmHandlers } from "./helm"
import { getSecret, setSecret, deleteSecret } from "./secrets"
import { containerRegistryConfigSchema, ContainerRegistryConfig } from "../container"
import { getRemoteEnvironmentStatus, prepareRemoteEnvironment, cleanupEnvironment } from "./init"
import { getServiceLogs } from "./logs"

export const name = "kubernetes"

Expand Down
117 changes: 117 additions & 0 deletions garden-service/src/plugins/kubernetes/logs.ts
@@ -0,0 +1,117 @@
/*
* Copyright (C) 2018 Garden Technologies, Inc. <info@garden.io>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

import * as split from "split"
import moment = require("moment")
import Stream from "ts-stream"

import { GetServiceLogsResult, ServiceLogEntry } from "../../types/plugin/outputs"
import { GetServiceLogsParams } from "../../types/plugin/params"
import { ContainerModule } from "../container"
import { getAppNamespace } from "./namespace"
import { splitFirst } from "../../util/util"
import { BinaryCmd } from "../../util/ext-tools"
import { kubectl } from "./kubectl"
import { ContainerService } from "../../../tmp/dist/build/plugins/container"
import { LogEntry } from "../../logger/log-entry"

export async function getServiceLogs(
{ ctx, log, service, stream, tail }: GetServiceLogsParams<ContainerModule>,
) {
const context = ctx.provider.config.context
const namespace = await getAppNamespace(ctx, ctx.provider)

const proc = tail
? await tailLogs(context, namespace, service, stream, log)
: await getLogs(context, namespace, service, stream)

return new Promise<GetServiceLogsResult>((resolve, reject) => {
proc.on("error", reject)

proc.on("exit", () => {
resolve({})
})
})
}

async function tailLogs(
context: string, namespace: string, service: ContainerService, stream: Stream<ServiceLogEntry>, log: LogEntry,
) {
const args = [
"--color", "never",
"--context", context,
"--namespace", namespace,
"--output", "json",
"--selector", `service=${service.name}`,
"--timestamps",
]

console.log(args.join(" "))

const proc = await stern.spawn({ args, log })
let timestamp: Date | undefined

proc.stdout
.pipe(split())
.on("data", (line) => {
if (!line || line[0] !== "{") {
return
}
const obj = JSON.parse(line)
const [timestampStr, msg] = splitFirst(obj.message, " ")
try {
timestamp = moment(timestampStr).toDate()
} catch { }
void stream.write({ serviceName: service.name, timestamp, msg: msg.trimRight() })
})

return proc
}

async function getLogs(
context: string, namespace: string, service: ContainerService, stream: Stream<ServiceLogEntry>,
) {
const resourceType = service.spec.daemon ? "daemonset" : "deployment"
const kubectlArgs = ["logs", `${resourceType}/${service.name}`, "--timestamps=true"]

const proc = kubectl(context, namespace).spawn(kubectlArgs)
let timestamp: Date

proc.stdout
.pipe(split())
.on("data", (s) => {
if (!s) {
return
}
const [timestampStr, msg] = splitFirst(s, " ")
try {
timestamp = moment(timestampStr).toDate()
} catch { }
void stream.write({ serviceName: service.name, timestamp, msg })
})

return proc
}

const stern = new BinaryCmd({
name: "stern",
specs: {
darwin: {
url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_darwin_amd64",
sha256: "b91dbcfd3bbda69cd7a7abd80a225ce5f6bb9d6255b7db192de84e80e4e547b7",
},
linux: {
url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_linux_amd64",
sha256: "a0335b298f6a7922c35804bffb32a68508077b2f35aaef44d9eb116f36bc7eda",
},
win32: {
url: "https://github.com/wercker/stern/releases/download/1.10.0/stern_windows_amd64.exe",
sha256: "8cb94d3f47c831f2b0a59286336b41569ab38cb1528755545cb490536274f885",
},
},
})
4 changes: 2 additions & 2 deletions garden-service/src/tasks/deploy.ts
Expand Up @@ -112,7 +112,7 @@ export class DeployTask extends BaseTask {
})

// TODO: get version from build task results
const { versionString } = await this.service.module.version
const { versionString } = this.version
const hotReloadEnabled = includes(this.hotReloadServiceNames, this.service.name)
const status = await this.garden.actions.getServiceStatus({
service: this.service,
Expand All @@ -133,7 +133,7 @@ export class DeployTask extends BaseTask {
return status
}

log.setState("Deploying")
log.setState(`Deploying version ${versionString}...`)

const dependencies = await this.garden.getServices(this.service.config.dependencies)

Expand Down
2 changes: 1 addition & 1 deletion garden-service/src/types/plugin/outputs.ts
Expand Up @@ -97,7 +97,7 @@ export const execInServiceResultSchema = Joi.object()

export interface ServiceLogEntry {
serviceName: string
timestamp: Date
timestamp?: Date
msg: string
}

Expand Down
17 changes: 12 additions & 5 deletions garden-service/src/util/ext-tools.ts
Expand Up @@ -19,6 +19,7 @@ import { LogEntry } from "../logger/log-entry"
import { Extract } from "unzipper"
import { createHash } from "crypto"
import * as uuid from "uuid"
import * as spawn from "cross-spawn"

const globalGardenPath = join(homedir(), ".garden")
const toolsPath = join(globalGardenPath, "tools")
Expand All @@ -27,6 +28,7 @@ interface ExecParams {
cwd?: string
log: LogEntry
args?: string[]
timeout?: number
}

abstract class Cmd {
Expand Down Expand Up @@ -110,8 +112,8 @@ export class BinaryCmd extends Cmd {
const tmpPath = join(this.toolPath, this.versionDirname + "." + uuid.v4().substr(0, 8))
const tmpExecutable = join(tmpPath, ...this.executableSubpath)

log.setState(`Fetching ${this.name}...`)
const debug = log.debug(`Downloading ${this.spec.url}...`)
const logEntry = log.verbose(`Fetching ${this.name}...`)
const debug = logEntry.debug(`Downloading ${this.spec.url}...`)

await ensureDir(tmpPath)

Expand All @@ -136,19 +138,24 @@ export class BinaryCmd extends Cmd {
}

debug && debug.setSuccess("Done")
log.setSuccess(`Fetched ${this.name}`)
logEntry.setSuccess(`Fetched ${this.name}`)
}

async exec({ cwd, args, log }: ExecParams) {
async exec({ cwd, args, log, timeout }: ExecParams) {
await this.download(log)
return execa(this.executablePath, args || [], { cwd: cwd || this.defaultCwd })
return execa(this.executablePath, args || [], { cwd: cwd || this.defaultCwd, timeout })
}

async stdout(params: ExecParams) {
const res = await this.exec(params)
return res.stdout
}

async spawn({ cwd, args, log }: ExecParams) {
await this.download(log)
return spawn(this.executablePath, args || [], { cwd })
}

private async fetch(targetPath: string, log: LogEntry) {
const response = await Axios({
method: "GET",
Expand Down

0 comments on commit 432e6dc

Please sign in to comment.