Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(app): keep git-proxy alive on session shutdown #951

Merged
merged 18 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 108 additions & 15 deletions git-https-proxy/main.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,104 @@
package main

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"net/url"
"os"
"os/signal"
"regexp"
"strings"
"sync"
"syscall"
"time"

"github.com/elazarl/goproxy"
)

func main() {
config := parseEnv()
// INFO: Make a channel that will receive the SIGTERM on shutdown
sigTerm := make(chan os.Signal, 1)
signal.Notify(sigTerm, syscall.SIGTERM, syscall.SIGINT)
ctx := context.Background()
// INFO: Used to coordinate shutdown between git-proxy and the session when the user
// is not anonymous and there may be an autosave branch that needs to be created
shutdownFlags := shutdownFlagsStruct{
sigtermReceived: false,
shutdownAllowed: false,
}

// INFO: Setup servers
proxyHandler := getProxyHandler(config)
proxyServer := http.Server{
Addr: fmt.Sprintf("0.0.0.0:%s", config.ProxyPort),
Addr: fmt.Sprintf(":%s", config.ProxyPort),
Handler: proxyHandler,
}
healthHandler := getHealthHandler(config)
healthHandler := getHealthHandler(config, &shutdownFlags)
healthServer := http.Server{
Addr: fmt.Sprintf("0.0.0.0:%s", config.HealthPort),
Addr: fmt.Sprintf(":%s", config.HealthPort),
Handler: healthHandler,
}

// INFO: Run servers in the background
go func() {
// Run the health server in the "background"
log.Println("Health server active on port", config.HealthPort)
log.Printf("Health server active on port %s\n", config.HealthPort)
log.Fatalln(healthServer.ListenAndServe())
}()
log.Println("Git proxy active on port", config.ProxyPort)
log.Println("Repo Url:", config.RepoUrl, "anonymous session:", config.AnonymousSession)
log.Fatalln(proxyServer.ListenAndServe())
go func() {
log.Printf("Git proxy active on port %s\n", config.ProxyPort)
log.Printf("Repo Url: %v, anonymous session: %v\n", config.RepoUrl, config.AnonymousSession)
log.Fatalln(proxyServer.ListenAndServe())
}()

// INFO: Block until you receive sigTerm to shutdown. All of this is necessary
// because the proxy has to shut down only after all the other containers do so in case
// any other containers (i.e. session or sidecar) need git right before shutting down,
// and this is the case exactly for creating autosave branches.
<- sigTerm
if config.AnonymousSession {
log.Print("SIGTERM received. Shutting down servers.\n")
healthServer.Shutdown(ctx)
proxyServer.Shutdown(ctx)
} else {
log.Printf(
"SIGTERM received. Waiting for /shutdown to be called or timing out in %v\n",
config.SessionTerminationGracePeriod,
)
sigTermTime := time.Now()
shutdownFlags.lock.Lock()
shutdownFlags.sigtermReceived = true
shutdownFlags.lock.Unlock()
for {
if shutdownFlags.shutdownAllowed || (time.Now().Sub(sigTermTime) > config.SessionTerminationGracePeriod) {
log.Printf(
"Shutting down servers. SIGTERM received: %v, Shutdown allowed: %v.\n",
shutdownFlags.sigtermReceived,
shutdownFlags.shutdownAllowed,
)
err := healthServer.Shutdown(ctx)
if err != nil {
log.Fatalln(err)
}
err = proxyServer.Shutdown(ctx)
if err != nil {
log.Fatalln(err)
}
break
}
time.Sleep(time.Second * 5)
}
}
}

type shutdownFlagsStruct struct {
sigtermReceived bool
shutdownAllowed bool
lock sync.Mutex
}

type gitProxyConfig struct {
Expand All @@ -42,26 +107,38 @@ type gitProxyConfig struct {
AnonymousSession bool
EncodedCredentials string
RepoUrl *url.URL
SessionTerminationGracePeriod time.Duration

}

// Parse the environment variables used as the configuration for the proxy.
func parseEnv() *gitProxyConfig {
var ok, anonymousSession bool
var gitlabOauthToken, proxyPort, healthPort, anonymousSessionStr, encodedCredentials string
var gitlabOauthToken, proxyPort, healthPort, anonymousSessionStr, encodedCredentials, SessionTerminationGracePeriodSeconds string
var repoUrl *url.URL
if proxyPort, ok = os.LookupEnv("MITM_PROXY_PORT"); !ok {
var err error
var SessionTerminationGracePeriod time.Duration
if proxyPort, ok = os.LookupEnv("GIT_PROXY_PORT"); !ok {
proxyPort = "8080"
}
if healthPort, ok = os.LookupEnv("HEALTH_PORT"); !ok {
if healthPort, ok = os.LookupEnv("GIT_PROXY_HEALTH_PORT"); !ok {
healthPort = "8081"
}
if anonymousSessionStr, ok = os.LookupEnv("ANONYMOUS_SESSION"); !ok {
anonymousSessionStr = "true"
}
if SessionTerminationGracePeriodSeconds, ok = os.LookupEnv("SESSION_TERMINATION_GRACE_PERIOD_SECONDS"); !ok {
SessionTerminationGracePeriodSeconds = "600"
}
SessionTerminationGracePeriodSeconds = fmt.Sprintf("%ss", SessionTerminationGracePeriodSeconds)
SessionTerminationGracePeriod, err = time.ParseDuration(SessionTerminationGracePeriodSeconds)
if err != nil {
log.Fatalln(err)
}
anonymousSession = anonymousSessionStr == "true"
gitlabOauthToken = os.Getenv("GITLAB_OAUTH_TOKEN")
encodedCredentials = encodeCredentials(gitlabOauthToken)
repoUrl, err := url.Parse(os.Getenv("REPOSITORY_URL"))
repoUrl, err = url.Parse(os.Getenv("REPOSITORY_URL"))
if err != nil {
log.Fatal(err)
}
Expand All @@ -71,6 +148,7 @@ func parseEnv() *gitProxyConfig {
AnonymousSession: anonymousSession,
EncodedCredentials: encodedCredentials,
RepoUrl: repoUrl,
SessionTerminationGracePeriod: SessionTerminationGracePeriod,
}
}

Expand Down Expand Up @@ -151,11 +229,15 @@ func getProxyHandler(config *gitProxyConfig) *goproxy.ProxyHttpServer {
}

// The proxy does not expose a health endpoint. Therefore the purpose of this server
// handler is to just fill that functionality. To ensure that the proxy is fully up
// handler is to fill that functionality. To ensure that the proxy is fully up
// and running the health server will use the proxy as a proxy for the health endpoint.
// This is necessary because sending any requests directly to the proxy results in a 500
// with a message that the proxy only accepts proxy requests and no direct requests.
func getHealthHandler(config *gitProxyConfig) *http.ServeMux {
// In addition this server also handles the shutdown of the git proxy. This is necessary because
// k8s does not enforce a shutdown order for containers. But we need the git proxy to wait on the
// autosave creation to finish before it shuts down. Otherwise once the session is shut down
// in many cases the git proxy shutsdown quickly before the session and autosave creation fails.
func getHealthHandler(config *gitProxyConfig, shutdownFlags *shutdownFlagsStruct) *http.ServeMux {
handler := http.NewServeMux()
handler.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
Expand Down Expand Up @@ -185,6 +267,17 @@ func getHealthHandler(config *gitProxyConfig) *http.ServeMux {
w.WriteHeader(http.StatusBadRequest)
}
})

handler.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
if !shutdownFlags.sigtermReceived {
// INFO: Cannot shut down yet
w.WriteHeader(http.StatusConflict)
} else {
// INFO: Ok to shut down
shutdownFlags.lock.Lock()
defer shutdownFlags.lock.Unlock()
w.WriteHeader(http.StatusOK)
shutdownFlags.shutdownAllowed = true
}
})
return handler
}
2 changes: 2 additions & 0 deletions git-https-proxy/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -59,6 +60,7 @@ func getTestConfig(isSessionAnonymous bool, token string, injectionUrl *url.URL)
AnonymousSession: isSessionAnonymous,
EncodedCredentials: encodeCredentials(token),
RepoUrl: injectionUrl,
SessionTerminationGracePeriod: 30 * time.Second,
}
}

Expand Down
5 changes: 3 additions & 2 deletions git_services/Dockerfile.init
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM python:3.9-slim
LABEL maintainer="Swiss Data Science Center <info@datascience.ch>"

RUN apt-get update && \
apt-get install -y --no-install-recommends git=1:2.30.2-1 git-lfs=2.13.2-1+b5 curl && \
apt-get install -y --no-install-recommends git=1:2.30.2-1 git-lfs=2.13.2-1+b5 curl tini && \
apt-get purge -y --auto-remove && \
rm -rf /var/lib/apt/lists/* && \
useradd jovyan -u1000 -g100 --create-home
Expand All @@ -20,4 +20,5 @@ ENV PATH "/home/jovyan/.local/bin:$PATH"
RUN curl -sSL https://install.python-poetry.org | python3 - && \
poetry install --no-dev

ENTRYPOINT ["poetry", "run", "python", "-m", "git_services.init.clone"]
ENTRYPOINT ["tini", "-g", "--"]
CMD ["poetry", "run", "python", "-m", "git_services.init.clone"]
5 changes: 3 additions & 2 deletions git_services/Dockerfile.sidecar
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM python:3.9-slim
LABEL maintainer="Swiss Data Science Center <info@datascience.ch>"

RUN apt-get update && \
apt-get install -y --no-install-recommends git=1:2.30.2-1 git-lfs=2.13.2-1+b5 curl && \
apt-get install -y --no-install-recommends git=1:2.30.2-1 git-lfs=2.13.2-1+b5 curl tini && \
apt-get purge -y --auto-remove && \
rm -rf /var/lib/apt/lists/* && \
useradd jovyan -u1000 -g100 --create-home
Expand All @@ -25,4 +25,5 @@ ENV HOST="0.0.0.0"
# even if this is an invalid rpc request.
HEALTHCHECK CMD curl http://$HOST:4000

ENTRYPOINT ["poetry", "run", "python", "-m", "git_services.sidecar.rpc_server"]
ENTRYPOINT ["tini", "-g", "--"]
CMD ["poetry", "run", "python", "-m", "git_services.sidecar.rpc_server"]
2 changes: 1 addition & 1 deletion git_services/git_services/cli/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class SentryConfig:
def __post_init__(self):
if type(self.enabled) is str:
# NOTE: Required because bool("False") == True and environment vars are always strings
self.enabled = (self.enabled.lower() == "true")
self.enabled = self.enabled.lower() == "true"
# INFO: Convert empty strings to None
for attr_name in ["dsn", "environment"]:
attr_val = getattr(self, attr_name)
Expand Down
4 changes: 1 addition & 3 deletions git_services/git_services/init/cloner.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ def _wait_for_server(self, timeout_mins=None):
start = datetime.now()

while True:
logging.info(
f"Waiting for git to become available with timeout mins {timeout_mins}..."
)
logging.info(f"Waiting for git to become available with timeout mins {timeout_mins}...")
res = requests.get(self.git_url)
if res.status_code >= 200 and res.status_code < 400:
logging.info("Git is available")
Expand Down
82 changes: 53 additions & 29 deletions git_services/git_services/sidecar/rpc_server.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
from git_services.sidecar.config import config_from_env
from jsonrpc import JSONRPCResponseManager, dispatcher
import os
import requests
from werkzeug.wrappers import Request, Response
from werkzeug.serving import run_simple
from pathlib import Path
from subprocess import PIPE, Popen
import shlex

from git_services.cli import GitCLI
from git_services.cli.sentry import setup_sentry
from git_services.sidecar.config import config_from_env


@dispatcher.add_method
Expand Down Expand Up @@ -53,40 +57,60 @@ def status(path: str = ".", **kwargs):
@dispatcher.add_method
def autosave(**kwargs):
"""Create an autosave branch with uncommitted work."""
repo_path = os.environ.get("MOUNT_PATH")
status_result = status(path=repo_path)
should_commit = not status_result["clean"]
should_push = status_result["ahead"] > 0

if not should_commit and not should_push:
return
try:
git_proxy_health_port = os.getenv("GIT_PROXY_HEALTH_PORT", "8081")
repo_path = os.environ.get("MOUNT_PATH")
status_result = status(path=repo_path)
should_commit = not status_result["clean"]
should_push = status_result["ahead"] > 0

initial_commit = os.environ["CI_COMMIT_SHA"][0:7]
current_commit = status_result["commit"][0:7]
current_branch = status_result["branch"]
if not (should_commit or should_push):
return

user = os.environ["RENKU_USERNAME"]
initial_commit = os.environ["CI_COMMIT_SHA"][0:7]
current_commit = status_result["commit"][0:7]
current_branch = status_result["branch"]

autosave_branch_name = (
f"renku/autosave/{user}/{current_branch}/{initial_commit}/{current_commit}"
)
user = os.environ["RENKU_USERNAME"]

cli = GitCLI(Path(repo_path))

cli.git_checkout(f"-b {autosave_branch_name}")

if should_commit:
cli.git_add("-A")
cli.git_commit(
"--no-verify "
f"-m 'Auto-saving for {user} on branch {current_branch} from commit {initial_commit}'"
autosave_branch_name = (
f"renku/autosave/{user}/{current_branch}/{initial_commit}/{current_commit}"
)

cli.git_push(f"origin {autosave_branch_name}")

cli.git_reset(f"--soft {current_branch}")
cli.git_checkout(f"{current_branch}")
cli.git_branch(f"-D {autosave_branch_name}")
cli = GitCLI(Path(repo_path))

cli.git_checkout(f"-b {autosave_branch_name}")

if should_commit:
# INFO: Find large files that should be checked in git LFS
autosave_min_file_size = os.getenv("AUTOSAVE_MINIMUM_LFS_FILE_SIZE_BYTES", "1000000")
cmd_res = Popen(
shlex.split(f"find . -type f -size +{autosave_min_file_size}c"),
cwd=Path(repo_path),
stdout=PIPE,
stderr=PIPE,
)
stdout, _ = cmd_res.communicate()
lfs_files = stdout.decode("utf-8").split()
if len(lfs_files) > 0:
cli.git_lfs("track " + " ".join(lfs_files))
cli.git_add("-A")
cli.git_commit(
"--no-verify "
f"-m 'Auto-saving for {user} on branch "
f"{current_branch} from commit {initial_commit}'"
)

cli.git_push(f"origin {autosave_branch_name}")

cli.git_reset(f"--soft {current_branch}")
cli.git_checkout(f"{current_branch}")
cli.git_branch(f"-D {autosave_branch_name}")
finally:
# INFO: Inform the proxy it can shut down
# NOTE: Do not place return, break or continue here, otherwise
# the exception from try will be completely discarded.
requests.get(f"http://localhost:{git_proxy_health_port}/shutdown")


@Request.application
Expand Down
Loading