Skip to content

Commit

Permalink
Add initial (rough) database support
Browse files Browse the repository at this point in the history
  • Loading branch information
dstotijn committed Sep 26, 2020
1 parent 618ed61 commit 2e88b4a
Show file tree
Hide file tree
Showing 10 changed files with 771 additions and 54 deletions.
3 changes: 2 additions & 1 deletion .gitignore
@@ -1,3 +1,4 @@
**/rice-box.go
dist
hetty
hetty
hetty.bolt
13 changes: 11 additions & 2 deletions cmd/hetty/main.go
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

rice "github.com/GeertJohan/go.rice"
"github.com/dstotijn/hetty/db/cayley"
"github.com/dstotijn/hetty/pkg/api"
"github.com/dstotijn/hetty/pkg/proxy"
"github.com/dstotijn/hetty/pkg/reqlog"
Expand All @@ -23,13 +24,15 @@ import (
var (
caCertFile string
caKeyFile string
dbFile string
addr string
adminPath string
)

func main() {
flag.StringVar(&caCertFile, "cert", "", "CA certificate file path")
flag.StringVar(&caKeyFile, "key", "", "CA private key file path")
flag.StringVar(&dbFile, "db", "hetty.db", "Database file path")
flag.StringVar(&addr, "addr", ":80", "TCP address to listen on, in the form \"host:port\"")
flag.StringVar(&adminPath, "adminPath", "", "File path to admin build")
flag.Parse()
Expand All @@ -44,7 +47,13 @@ func main() {
log.Fatalf("[FATAL] Could not parse CA: %v", err)
}

reqLogService := reqlog.NewService()
db, err := cayley.NewDatabase(dbFile)
if err != nil {
log.Fatalf("[FATAL] Could not initialize database: %v", err)
}
defer db.Close()

reqLogService := reqlog.NewService(db)

p, err := proxy.NewProxy(caCert, tlsCA.PrivateKey)
if err != nil {
Expand Down Expand Up @@ -77,7 +86,7 @@ func main() {
// GraphQL server.
adminRouter.Path("/api/playground").Handler(playground.Handler("GraphQL Playground", "/api/graphql"))
adminRouter.Path("/api/graphql").Handler(handler.NewDefaultServer(api.NewExecutableSchema(api.Config{Resolvers: &api.Resolver{
RequestLogService: &reqLogService,
RequestLogService: reqLogService,
}})))

// Admin interface.
Expand Down
64 changes: 64 additions & 0 deletions db/cayley/bolt.go
@@ -0,0 +1,64 @@
package cayley

import (
"os"
"path/filepath"

"github.com/cayleygraph/cayley/clog"
"github.com/cayleygraph/cayley/graph"
hkv "github.com/hidal-go/hidalgo/kv"
"github.com/hidal-go/hidalgo/kv/bolt"
)

const Type = bolt.Name

func boltFilePath(path, filename string) string {
return filepath.Join(path, filename)
}

func boltCreate(path string, opt graph.Options) (hkv.KV, error) {
filename, err := opt.StringKey("filename", "indexes.bolt")
if err != nil {
return nil, err
}

err = os.MkdirAll(path, 0700)
if err != nil {
return nil, err
}

db, err := bolt.Open(boltFilePath(path, filename), nil)
if err != nil {
clog.Errorf("Error: couldn't create Bolt database: %v", err)
return nil, err
}

return db, nil
}

func boltOpen(path string, opt graph.Options) (hkv.KV, error) {
filename, err := opt.StringKey("filename", "indexes.bolt")
if err != nil {
return nil, err
}

db, err := bolt.Open(boltFilePath(path, filename), nil)
if err != nil {
clog.Errorf("Error, couldn't open! %v", err)
return nil, err
}

bdb := db.DB()
bdb.NoSync, err = opt.BoolKey("nosync", false)
if err != nil {
db.Close()
return nil, err
}

bdb.NoGrowSync = bdb.NoSync
if bdb.NoSync {
clog.Infof("Running in nosync mode")
}

return db, nil
}
278 changes: 278 additions & 0 deletions db/cayley/cayley.go
@@ -0,0 +1,278 @@
package cayley

import (
"context"
"fmt"
"log"
"net/http"
"net/url"
"path"
"strings"
"time"

"github.com/cayleygraph/cayley"
"github.com/cayleygraph/cayley/graph"
"github.com/cayleygraph/cayley/graph/kv"
"github.com/cayleygraph/cayley/schema"
"github.com/cayleygraph/quad"
"github.com/cayleygraph/quad/voc"
"github.com/cayleygraph/quad/voc/rdf"
"github.com/google/uuid"

"github.com/dstotijn/hetty/pkg/reqlog"
)

type HTTPRequest struct {
rdfType struct{} `quad:"@type > hy:HTTPRequest"`
ID quad.IRI `quad:"@id"`
Proto string `quad:"hy:proto"`
URL string `quad:"hy:url"`
Method string `quad:"hy:method"`
Body string `quad:"hy:body,optional"`
Headers []HTTPHeader `quad:"hy:header"`
Timestamp time.Time `quad:"hy:timestamp"`
Response *HTTPResponse `quad:"hy:request < *,optional"`
}

type HTTPResponse struct {
rdfType struct{} `quad:"@type > hy:HTTPResponse"`
RequestID quad.IRI `quad:"hy:request"`
Proto string `quad:"hy:proto"`
Status string `quad:"hy:status"`
StatusCode int `quad:"hy:status_code"`
Headers []HTTPHeader `quad:"hy:header"`
Body string `quad:"hy:body,optional"`
Timestamp time.Time `quad:"hy:timestamp"`
}

type HTTPHeader struct {
rdfType struct{} `quad:"@type > hy:HTTPHeader"`
Key string `quad:"hy:key"`
Value string `quad:"hy:value,optional"`
}

type Database struct {
store *cayley.Handle
schema *schema.Config
}

func init() {
voc.RegisterPrefix("hy:", "https://hetty.xyz/")
schema.RegisterType(quad.IRI("hy:HTTPRequest"), HTTPRequest{})
schema.RegisterType(quad.IRI("hy:HTTPResponse"), HTTPResponse{})
schema.RegisterType(quad.IRI("hy:HTTPHeader"), HTTPHeader{})

kv.Register(Type, kv.Registration{
NewFunc: boltOpen,
InitFunc: boltCreate,
IsPersistent: true,
})
}

func NewDatabase(filename string) (*Database, error) {
dir, file := path.Split(filename)
if dir == "" {
dir = "."
}
opts := graph.Options{
"filename": file,
}

schemaCfg := schema.NewConfig()
schemaCfg.GenerateID = func(_ interface{}) quad.Value {
return quad.BNode(uuid.New().String())
}

// Initialize the database.
err := graph.InitQuadStore("bolt", dir, opts)
if err != nil && err != graph.ErrDatabaseExists {
return nil, fmt.Errorf("cayley: could not initialize database: %v", err)
}

// Open the database.
store, err := cayley.NewGraph("bolt", dir, opts)
if err != nil {
return nil, fmt.Errorf("cayley: could not open database: %v", err)
}

return &Database{
store: store,
schema: schemaCfg,
}, nil
}

func (db *Database) Close() error {
return db.store.Close()
}

func (db *Database) FindAllRequestLogs(ctx context.Context) ([]reqlog.Request, error) {
var reqLogs []reqlog.Request
var reqs []HTTPRequest

path := cayley.StartPath(db.store, quad.IRI("hy:HTTPRequest")).In(quad.IRI(rdf.Type))
err := path.Iterate(ctx).EachValue(nil, func(v quad.Value) {
var req HTTPRequest
if err := db.schema.LoadToDepth(ctx, db.store, &req, -1, v); err != nil {
log.Printf("[ERROR] Could not load sub-graph for http requests: %v", err)
return
}
reqs = append(reqs, req)
})
if err != nil {
return nil, fmt.Errorf("cayley: could not iterate over http requests: %v", err)
}

for _, req := range reqs {
reqLog, err := parseRequestQuads(req, nil)
if err != nil {
return nil, fmt.Errorf("cayley: could not parse request quads (id: %v): %v", req.ID, err)
}
reqLogs = append(reqLogs, reqLog)
}

return reqLogs, nil
}

func (db *Database) FindRequestLogByID(ctx context.Context, id uuid.UUID) (reqlog.Request, error) {
var req HTTPRequest
err := db.schema.LoadTo(ctx, db.store, &req, iriFromUUID(id))
if schema.IsNotFound(err) {
return reqlog.Request{}, reqlog.ErrRequestNotFound
}
if err != nil {
return reqlog.Request{}, fmt.Errorf("cayley: could not load value: %v", err)
}

reqLog, err := parseRequestQuads(req, nil)
if err != nil {
return reqlog.Request{}, fmt.Errorf("cayley: could not parse request log (id: %v): %v", req.ID, err)
}

return reqLog, nil
}

func (db *Database) AddRequestLog(ctx context.Context, reqLog reqlog.Request) error {
httpReq := HTTPRequest{
ID: iriFromUUID(reqLog.ID),
Proto: reqLog.Request.Proto,
Method: reqLog.Request.Method,
URL: reqLog.Request.URL.String(),
Headers: httpHeadersSliceFromMap(reqLog.Request.Header),
Body: string(reqLog.Body),
Timestamp: reqLog.Timestamp,
}

qw := graph.NewWriter(db.store)
defer qw.Close()

_, err := db.schema.WriteAsQuads(qw, httpReq)
if err != nil {
return fmt.Errorf("cayley: could not write quads: %v", err)
}

return nil
}

func (db *Database) AddResponseLog(ctx context.Context, resLog reqlog.Response) error {
httpRes := HTTPResponse{
RequestID: iriFromUUID(resLog.RequestID),
Proto: resLog.Response.Proto,
Status: resLog.Response.Status,
StatusCode: resLog.Response.StatusCode,
Headers: httpHeadersSliceFromMap(resLog.Response.Header),
Body: string(resLog.Body),
Timestamp: resLog.Timestamp,
}

qw := graph.NewWriter(db.store)
defer qw.Close()

_, err := db.schema.WriteAsQuads(qw, httpRes)
if err != nil {
return fmt.Errorf("cayley: could not write response quads: %v", err)
}

return nil
}

func iriFromUUID(id uuid.UUID) quad.IRI {
return quad.IRI("hy:" + id.String()).Full().Short()
}

func uuidFromIRI(iri quad.IRI) (uuid.UUID, error) {
iriString := iri.Short().String()
stripped := strings.TrimRight(strings.TrimLeft(iriString, "<hy:"), ">")
id, err := uuid.Parse(stripped)
if err != nil {
return uuid.Nil, err
}

return id, nil
}

func httpHeadersSliceFromMap(hm http.Header) []HTTPHeader {
if hm == nil {
return nil
}
var hs []HTTPHeader
for key, values := range hm {
for _, value := range values {
hs = append(hs, HTTPHeader{Key: key, Value: value})
}
}
return hs
}

func httpHeadersMapFromSlice(hs []HTTPHeader) http.Header {
if hs == nil {
return nil
}
hm := make(http.Header)
for _, header := range hs {
hm.Add(header.Key, header.Value)
}
return hm
}

func parseRequestQuads(req HTTPRequest, _ *HTTPResponse) (reqlog.Request, error) {
reqID, err := uuidFromIRI(req.ID)
if err != nil {
return reqlog.Request{}, fmt.Errorf("cannot parse request id: %v", err)
}

u, err := url.Parse(req.URL)
if err != nil {
return reqlog.Request{}, fmt.Errorf("cannot parse request url: %v", err)
}

reqLog := reqlog.Request{
ID: reqID,
Request: http.Request{
Method: req.Method,
URL: u,
Proto: req.Proto,
Header: httpHeadersMapFromSlice(req.Headers),
},
Timestamp: req.Timestamp,
}
if req.Body != "" {
reqLog.Body = []byte(reqLog.Body)
}

if req.Response != nil {
reqLog.Response = &reqlog.Response{
RequestID: reqID,
Response: http.Response{
Proto: req.Response.Proto,
Status: req.Response.Status,
StatusCode: req.Response.StatusCode,
Header: httpHeadersMapFromSlice(req.Response.Headers),
},
}
if req.Response.Body != "" {
reqLog.Response.Body = []byte(req.Response.Body)
}
}

return reqLog, nil
}

0 comments on commit 2e88b4a

Please sign in to comment.