Skip to content

Commit

Permalink
Merge pull request #14 from cloudtrust/events_db
Browse files Browse the repository at this point in the history
Persist events in BD - First stage
  • Loading branch information
harture committed Mar 27, 2019
2 parents 38d529d + 5b44bec commit 575588a
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 87 deletions.
129 changes: 79 additions & 50 deletions cmd/keycloakb/keycloak_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@ import (
"time"

common "github.com/cloudtrust/common-healthcheck"
"github.com/cloudtrust/go-jobs"
controller "github.com/cloudtrust/go-jobs"
"github.com/cloudtrust/go-jobs/job"
job_lock "github.com/cloudtrust/go-jobs/lock"
job_status "github.com/cloudtrust/go-jobs/status"
fb_flaki "github.com/cloudtrust/keycloak-bridge/api/flaki/fb"
"github.com/cloudtrust/keycloak-bridge/api/user/fb"
"github.com/cloudtrust/keycloak-bridge/internal/elasticsearch"
"github.com/cloudtrust/keycloak-bridge/internal/idgenerator"
"github.com/cloudtrust/keycloak-bridge/internal/keycloakb"
"github.com/cloudtrust/keycloak-bridge/pkg/event"
Expand All @@ -41,7 +40,8 @@ import (
gokit_influx "github.com/go-kit/kit/metrics/influx"
"github.com/go-kit/kit/ratelimit"
grpc_transport "github.com/go-kit/kit/transport/grpc"
"github.com/google/flatbuffers/go"
_ "github.com/go-sql-driver/mysql"
flatbuffers "github.com/google/flatbuffers/go"
"github.com/gorilla/mux"
influx "github.com/influxdata/influxdb/client/v2"
_ "github.com/lib/pq"
Expand Down Expand Up @@ -98,20 +98,16 @@ func main() {
// Keycloak Timeout
//keycloakClientCreationTimeout = c.GetDuration("keycloak-client-creation-timeout")

// Elasticsearch
esAddr = c.GetString("elasticsearch-host-port")
esIndex = c.GetString("elasticsearch-index-name")

// Enabled units
// Enabled units - health checks (keycloak) or enabled (all the rest)
cockroachEnabled = c.GetBool("cockroach")
esEnabled = c.GetBool("elasticsearch")
flakiEnabled = c.GetBool("flaki")
influxEnabled = c.GetBool("influx")
jaegerEnabled = c.GetBool("jaeger")
keycloakEnabled = c.GetBool("keycloak")
sentryEnabled = c.GetBool("sentry")
jobEnabled = c.GetBool("job")
pprofRouteEnabled = c.GetBool("pprof-route-enabled")
eventsDBEnabled = c.GetBool("events-DB")

// Influx
influxHTTPConfig = influx.HTTPConfig{
Expand Down Expand Up @@ -152,16 +148,23 @@ func main() {
cockroachDB = c.GetString("cockroach-database")
cockroachCleanInterval = c.GetDuration("cockroach-clean-interval")

// EventsDB
eventsDBHostPort = c.GetString("db-host-port")
eventsDBUsername = c.GetString("db-username")
eventsDBPassword = c.GetString("db-password")
eventsDBDatabase = c.GetString("db-database")
//eventsDBTable = c.GetString("db-table")
eventsDBProtocol = c.GetString("protocol")

// Jobs
healthChecksValidity = map[string]time.Duration{
"cockroach": c.GetDuration("job-cockroach-health-validity"),
"elasticsearch": c.GetDuration("job-es-health-validity"),
"flaki": c.GetDuration("job-flaki-health-validity"),
"influx": c.GetDuration("job-influx-health-validity"),
"jaeger": c.GetDuration("job-jaeger-health-validity"),
"keycloak": c.GetDuration("job-keycloak-health-validity"),
"redis": c.GetDuration("job-redis-health-validity"),
"sentry": c.GetDuration("job-sentry-health-validity"),
"cockroach": c.GetDuration("job-cockroach-health-validity"),
"flaki": c.GetDuration("job-flaki-health-validity"),
"influx": c.GetDuration("job-influx-health-validity"),
"jaeger": c.GetDuration("job-jaeger-health-validity"),
"keycloak": c.GetDuration("job-keycloak-health-validity"),
"redis": c.GetDuration("job-redis-health-validity"),
"sentry": c.GetDuration("job-sentry-health-validity"),
}

// Rate limiting
Expand Down Expand Up @@ -192,14 +195,13 @@ func main() {
}
// Authorized health checks for each module.
authorizedHC = map[string]map[string]struct{}{
"cockroach": healthcheckNames("", "ping"),
"elasticsearch": healthcheckNames("", "ping"),
"flaki": healthcheckNames("", "nextid"),
"influx": healthcheckNames("", "ping"),
"jaeger": healthcheckNames("", "agent", "collector"),
"keycloak": healthcheckNames("", "createuser", "deleteuser"),
"redis": healthcheckNames("", "ping"),
"sentry": healthcheckNames("", "ping"),
"cockroach": healthcheckNames("", "ping"),
"flaki": healthcheckNames("", "nextid"),
"influx": healthcheckNames("", "ping"),
"jaeger": healthcheckNames("", "agent", "collector"),
"keycloak": healthcheckNames("", "createuser", "deleteuser"),
"redis": healthcheckNames("", "ping"),
"sentry": healthcheckNames("", "ping"),
}
)

Expand Down Expand Up @@ -351,8 +353,27 @@ func main() {
}
}

// Elasticsearch client.
var esClient = elasticsearch.NewClient(esAddr, http.DefaultClient)
// Audit events DB.
type EventsDB interface {
Exec(query string, args ...interface{}) (sql.Result, error)
//Ping() error
Query(query string, args ...interface{}) (*sql.Rows, error)
QueryRow(query string, args ...interface{}) *sql.Row
}

var eventsDBConn EventsDB = keycloakb.NoopEventsDB{}
if eventsDBEnabled {
var err error
eventsDBConn, err = sql.Open("mysql", fmt.Sprintf("%s:%s@%s(%s)/%s", eventsDBUsername, eventsDBPassword, eventsDBProtocol, eventsDBHostPort, eventsDBDatabase))
//eventsDBConn, err = sql.Open("mysql", "root:admin@tcp(127.0.0.1:3306)/auditevents")

//logger.Log("msg", fmt.Sprintf("%s:%s@%s(%s)/%s", eventsDBUsername, eventsDBPassword, eventsDBProtocol, eventsDBHostPort, eventsDBDatabase))

if err != nil {
logger.Log("msg", "could not create DB connection for audit events", "error", err)
return
}
}

// User service.
var userEndpoints = user.Endpoints{}
Expand Down Expand Up @@ -398,7 +419,7 @@ func main() {

var consoleModule event.ConsoleModule
{
consoleModule = event.NewConsoleModule(log.With(eventLogger, "module", "console"), esClient, esIndex, ComponentName, ComponentID)
consoleModule = event.NewConsoleModule(log.With(eventLogger, "module", "console"))
consoleModule = event.MakeConsoleModuleInstrumentingMW(influxMetrics.NewHistogram("console_module"))(consoleModule)
consoleModule = event.MakeConsoleModuleLoggingMW(log.With(eventLogger, "mw", "module", "unit", "console"))(consoleModule)
consoleModule = event.MakeConsoleModuleTracingMW(tracer)(consoleModule)
Expand All @@ -412,9 +433,19 @@ func main() {
statisticModule = event.MakeStatisticModuleTracingMW(tracer)(statisticModule)
}

// new module for sending the events to the DB
var eventsDBModule event.EventsDBModule
{
eventsDBModule = event.NewEventsDBModule(eventsDBConn)
eventsDBModule = event.MakeEventsDBModuleInstrumentingMW(influxMetrics.NewHistogram("eventsDB_module"))(eventsDBModule)
eventsDBModule = event.MakeEventsDBModuleLoggingMW(log.With(eventLogger, "mw", "module", "unit", "eventsDB"))(eventsDBModule)
eventsDBModule = event.MakeEventsDBModuleTracingMW(tracer)(eventsDBModule)

}

var eventAdminComponent event.AdminComponent
{
var fns = []event.FuncEvent{consoleModule.Print, statisticModule.Stats}
var fns = []event.FuncEvent{consoleModule.Print, statisticModule.Stats, eventsDBModule.Store}
eventAdminComponent = event.NewAdminComponent(fns, fns, fns, fns)
eventAdminComponent = event.MakeAdminComponentInstrumentingMW(influxMetrics.NewHistogram("admin_component"))(eventAdminComponent)
eventAdminComponent = event.MakeAdminComponentLoggingMW(log.With(eventLogger, "mw", "component", "unit", "admin_event"))(eventAdminComponent)
Expand All @@ -423,13 +454,15 @@ func main() {

var eventComponent event.Component
{
var fns = []event.FuncEvent{consoleModule.Print, statisticModule.Stats}
var fns = []event.FuncEvent{consoleModule.Print, statisticModule.Stats, eventsDBModule.Store}
eventComponent = event.NewComponent(fns, fns)
eventComponent = event.MakeComponentInstrumentingMW(influxMetrics.NewHistogram("component"))(eventComponent)
eventComponent = event.MakeComponentLoggingMW(log.With(eventLogger, "mw", "component", "unit", "event"))(eventComponent)
eventComponent = event.MakeComponentTracingMW(tracer)(eventComponent)
}

// add ct_type

var muxComponent event.MuxComponent
{
muxComponent = event.NewMuxComponent(eventComponent, eventAdminComponent)
Expand Down Expand Up @@ -710,12 +743,6 @@ func main() {
flakiHM = common.MakeHealthCheckerLoggingMW(log.With(healthLogger, "module", "flaki"))(flakiHM)
flakiHM = common.MakeValidationMiddleware(authorizedHC["flaki"])(flakiHM)
}
var elasticsearchHM HealthChecker
{
elasticsearchHM = health.NewElasticsearchModule(http.DefaultClient, esAddr, esEnabled)
elasticsearchHM = common.MakeHealthCheckerLoggingMW(log.With(healthLogger, "module", "elasticsearch"))(elasticsearchHM)
elasticsearchHM = common.MakeValidationMiddleware(authorizedHC["elasticsearch"])(elasticsearchHM)
}
var keycloakHM HealthChecker
{
var err error
Expand All @@ -727,13 +754,12 @@ func main() {
}

var healthCheckers = map[string]health.HealthChecker{
"cockroach": cockroachHM,
"elasticsearch": elasticsearchHM,
"flaki": flakiHM,
"influx": influxHM,
"jaeger": jaegerHM,
"keycloak": keycloakHM,
"sentry": sentryHM,
"cockroach": cockroachHM,
"flaki": flakiHM,
"influx": influxHM,
"jaeger": jaegerHM,
"keycloak": keycloakHM,
"sentry": sentryHM,
}

var healthComponent health.HealthCheckers
Expand All @@ -757,7 +783,7 @@ func main() {
if jobEnabled {
var ctrl = controller.NewController(ComponentName, ComponentID, idgenerator.New(flakiClient, tracer), &job_lock.NoopLocker{}, controller.EnableStatusStorage(job_status.New(cockroachConn)))

for _, job := range []string{"cockroach", "elasticsearch", "flaki", "influx", "jaeger", "keycloak", "redis", "sentry"} {
for _, job := range []string{"cockroach", "flaki", "influx", "jaeger", "keycloak", "redis", "sentry"} {
var job, err = health_job.MakeHealthJob(healthCheckers[job], job, healthChecksValidity[job], healthStorage, logger)
if err != nil {
logger.Log("msg", fmt.Sprintf("could not create %s health job", job), "error", err)
Expand Down Expand Up @@ -1004,11 +1030,6 @@ func config(logger log.Logger) *viper.Viper {
v.SetDefault("keycloak-timeout", "5s")
v.SetDefault("keycloak-client-creation-timeout", "50s")

// Elasticsearch default.
v.SetDefault("elasticsearch", false)
v.SetDefault("elasticsearch-host-port", "elasticsearch:9200")
v.SetDefault("elasticsearch-index-name", "keycloak_business")

// Influx DB client default.
v.SetDefault("influx", false)
v.SetDefault("influx-host-port", "")
Expand Down Expand Up @@ -1053,6 +1074,15 @@ func config(logger log.Logger) *viper.Viper {
v.SetDefault("job-sentry-health-validity", "1m")
v.SetDefault("job-keycloak-health-validity", "1m")

//Storage events in DB
v.SetDefault("events-DB", false)
v.SetDefault("db-host-port", "")
v.SetDefault("db-username", "")
v.SetDefault("db-password", "")
v.SetDefault("db-database", "")
v.SetDefault("db-table", "")
v.SetDefault("protocol", "")

// Rate limiting (in requests/second)
v.SetDefault("rate-event", 1000)
v.SetDefault("rate-user", 1000)
Expand All @@ -1070,7 +1100,6 @@ func config(logger log.Logger) *viper.Viper {
}

// If the host/port is not set, we consider the components deactivated.
v.Set("elasticsearch", v.GetString("elasticsearch-host-port") != "")
v.Set("influx", v.GetString("influx-host-port") != "")
v.Set("sentry", v.GetString("sentry-dsn") != "")
v.Set("jaeger", v.GetString("jaeger-sampler-host-port") != "")
Expand Down
14 changes: 9 additions & 5 deletions configs/keycloak_bridge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@ keycloak-username: admin
keycloak-password: admin
keycloak-timeout: 5s
keycloak-client-creation-timeout: 40s

# Elasticsearch configs
elasticsearch: false
elasticsearch-host-port: elasticsearch-data:9200cd
elasticsearch-index-name: audit

# Redis
redis: false
Expand Down Expand Up @@ -75,6 +70,15 @@ job-redis-health-validity: 1m
job-sentry-health-validity: 1m
job-keycloak-health-validity: 1m

#Audit events
events-DB: true
db-host-port: 127.0.0.1:3306
db-username: root
db-password: admin
db-database: auditevents
db-table: audit
protocol: tcp

# Rate limiting in requests/second.
rate-event: 1000
rate-user: 1000
23 changes: 23 additions & 0 deletions internal/keycloakb/eventsdb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package keycloakb

import (
"database/sql"
)

// NoopEventsDB is a eventsDB client that does nothing.
type NoopEventsDB struct{}

// Exec does nothing.
func (NoopEventsDB) Exec(query string, args ...interface{}) (sql.Result, error) {
return NoopResult{}, nil
}

// Query does nothing.
func (NoopEventsDB) Query(query string, args ...interface{}) (*sql.Rows, error) {
return nil, nil
}

// QueryRow does nothing.
func (NoopEventsDB) QueryRow(query string, args ...interface{}) *sql.Row {
return nil
}
24 changes: 24 additions & 0 deletions pkg/event/instrumenting.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,27 @@ func (m *statisticModuleInstrumentingMW) Stats(ctx context.Context, mp map[strin
}(time.Now())
return m.next.Stats(ctx, mp)
}

// Instrumenting middleware at module level.
type eventsDBModuleInstrumentingMW struct {
h metrics.Histogram
next EventsDBModule
}

// MakeStatisticModuleInstrumentingMW makes an instrumenting middleware at module level.
func MakeEventsDBModuleInstrumentingMW(h metrics.Histogram) func(EventsDBModule) EventsDBModule {
return func(next EventsDBModule) EventsDBModule {
return &eventsDBModuleInstrumentingMW{
h: h,
next: next,
}
}
}

// consoleModuleInstrumentingMW implements Module.
func (m *eventsDBModuleInstrumentingMW) Store(ctx context.Context, mp map[string]string) error {
defer func(begin time.Time) {
m.h.With("correlation_id", ctx.Value("correlation_id").(string)).Observe(time.Since(begin).Seconds())
}(time.Now())
return m.next.Store(ctx, mp)
}
24 changes: 24 additions & 0 deletions pkg/event/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,27 @@ func (m *statisticModuleLoggingMW) Stats(ctx context.Context, mp map[string]stri
}(time.Now())
return m.next.Stats(ctx, mp)
}

// Logging middleware for the statistic module.
type eventsDBModuleLoggingMW struct {
logger log.Logger
next EventsDBModule
}

// MakeStatisticModuleLoggingMW makes a logging middleware for the statistic module.
func MakeEventsDBModuleLoggingMW(log log.Logger) func(EventsDBModule) EventsDBModule {
return func(next EventsDBModule) EventsDBModule {
return &eventsDBModuleLoggingMW{
logger: log,
next: next,
}
}
}

// statisticModuleLoggingMW implements StatisticModule.
func (m *eventsDBModuleLoggingMW) Store(ctx context.Context, mp map[string]string) error {
defer func(begin time.Time) {
m.logger.Log("method", "Store", "args", mp, "took", time.Since(begin))
}(time.Now())
return m.next.Store(ctx, mp)
}
Loading

0 comments on commit 575588a

Please sign in to comment.