Skip to content

Commit

Permalink
Revert "Revert "added compression/archive functionality, cleaned up p…
Browse files Browse the repository at this point in the history
…arameter ch..."
  • Loading branch information
jaredbischof committed Apr 24, 2015
1 parent 6229b25 commit a61b5f6
Show file tree
Hide file tree
Showing 17 changed files with 816 additions and 368 deletions.
14 changes: 9 additions & 5 deletions shock-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
# config: /etc/shock/shock-server.conf

NAME="shock-server"
PID_FILE="/var/run/${NAME}.pid"
LOG_FILE="/var/log/${NAME}.log"
PID_FILE="/etc/shock/data/pidfile"
CONF_FILE="/etc/shock/${NAME}.conf"

start() {
Expand All @@ -18,8 +18,7 @@ start() {
echo "is already running!"
else
$NAME -conf $CONF_FILE > $LOG_FILE 2>&1 &
sleep 2
echo `ps -ef | grep -v grep | grep 'shock-server' | awk '{print $2}'` > $PID_FILE
sleep 1
echo "(Done)"
fi
return 0
Expand All @@ -30,7 +29,7 @@ stop() {
if [ -f $PID_FILE ]; then
PIDN=`cat $PID_FILE`
kill $PIDN 2>&1
sleep 2
sleep 1
rm $PID_FILE
echo "(Done)"
else
Expand All @@ -42,7 +41,12 @@ stop() {
status() {
if [ -f $PID_FILE ]; then
PIDN=`cat $PID_FILE`
echo "$NAME is running with pid $PIDN."
PSTAT=`ps -p $PIDN | grep -v -w 'PID'`
if [ -z "$PSTAT" ]; then
echo "$NAME has pidfile ($PIDN) but is not running."
else
echo "$NAME is running with pid $PIDN."
fi
else
echo "$NAME is not running."
fi
Expand Down
2 changes: 1 addition & 1 deletion shock-server/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func Initialize() {

VERSIONS["ACL"] = 2
VERSIONS["Auth"] = 1
VERSIONS["Node"] = 2
VERSIONS["Node"] = 4
}

// Bool is a convenience wrapper around strconv.ParseBool
Expand Down
10 changes: 10 additions & 0 deletions shock-server/controller/node/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ func (cr *NodeController) Create(ctx context.Context) error {
}
}

// special case, creates multiple nodes
if archiveId, hasArchiveNode := params["unpack_node"]; hasArchiveNode {
ns, err := node.CreateNodesFromArchive(u, params, files, archiveId)
if err != nil {
err_msg := "err@node_CreateNodesFromArchive: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
return responder.RespondWithData(ctx, ns)
}
// Create node
n, err := node.CreateNodeUpload(u, params, files)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions shock-server/controller/node/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"net/http"
"os"
"strconv"
"time"
)

type getRes struct {
Expand Down Expand Up @@ -276,6 +277,7 @@ func IndexTypedRequest(ctx context.Context) {
TotalUnits: count,
AvgUnitSize: n.File.Size / count,
Format: indexFormat,
CreatedOn: time.Now(),
}

//if idxType == "chunkrecord" {
Expand Down
34 changes: 20 additions & 14 deletions shock-server/controller/node/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,9 @@ func (cr *NodeController) ReadMany(ctx context.Context) error {
// bson.M is a convenient alias for a map[string]interface{} map, useful for dealing with BSON in a native way.
var OptsMArray []bson.M

// default sort field and direction (can only be changed with querynode operator, not query operator)
order := "created_on"
direction := "-"

// Gather params to make db query. Do not include the following list.
if _, ok := query["query"]; ok {
paramlist := map[string]int{"limit": 1, "offset": 1, "query": 1}
paramlist := map[string]int{"query": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1}
for key := range query {
if _, found := paramlist[key]; !found {
keyStr := fmt.Sprintf("attributes.%s", key)
Expand All @@ -90,7 +86,7 @@ func (cr *NodeController) ReadMany(ctx context.Context) error {
}
}
} else if _, ok := query["querynode"]; ok {
paramlist := map[string]int{"limit": 1, "offset": 1, "querynode": 1, "order": 1, "direction": 1, "owner": 1, "read": 1, "write": 1, "delete": 1, "public_owner": 1, "public_read": 1, "public_write": 1, "public_delete": 1}
paramlist := map[string]int{"querynode": 1, "limit": 1, "offset": 1, "order": 1, "direction": 1, "owner": 1, "read": 1, "write": 1, "delete": 1, "public_owner": 1, "public_read": 1, "public_write": 1, "public_delete": 1}
for key := range query {
if _, found := paramlist[key]; !found {
for _, value := range query[key] {
Expand All @@ -102,14 +98,6 @@ func (cr *NodeController) ReadMany(ctx context.Context) error {
}
}
}
if _, ok := query["order"]; ok {
order = query.Get("order")
}
if _, ok := query["direction"]; ok {
if query.Get("direction") == "asc" {
direction = ""
}
}
}

if len(OptsMArray) > 0 {
Expand Down Expand Up @@ -157,8 +145,26 @@ func (cr *NodeController) ReadMany(ctx context.Context) error {
q["$and"] = []bson.M{qPerm, qOpts, qAcls}

// defaults
order := "created_on"
direction := "-"
limit := 25
offset := 0

// get from query
if _, ok := query["query"]; ok {
if _, ok := query["order"]; ok {
order = fmt.Sprintf("attributes.%s", query.Get("order"))
}
} else {
if _, ok := query["order"]; ok {
order = query.Get("order")
}
}
if _, ok := query["direction"]; ok {
if query.Get("direction") == "asc" {
direction = ""
}
}
if _, ok := query["limit"]; ok {
limit = util.ToInt(query.Get("limit"))
}
Expand Down
128 changes: 50 additions & 78 deletions shock-server/controller/node/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
e "github.com/MG-RAST/Shock/shock-server/errors"
"github.com/MG-RAST/Shock/shock-server/logger"
"github.com/MG-RAST/Shock/shock-server/node"
"github.com/MG-RAST/Shock/shock-server/node/archive"
"github.com/MG-RAST/Shock/shock-server/node/file"
"github.com/MG-RAST/Shock/shock-server/node/file/index"
"github.com/MG-RAST/Shock/shock-server/node/filter"
Expand Down Expand Up @@ -68,26 +69,35 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {

// Gather query params
query := ctx.HttpRequest().URL.Query()

// set defaults
filename := n.Id
if n.File.Name != "" {
filename = n.File.Name
}
var fFunc filter.FilterFunc = nil
var compressionFormat string = ""
// use query params if exist
if _, ok := query["filename"]; ok {
filename = query.Get("filename")
}
if _, ok := query["filter"]; ok {
if filter.Has(query.Get("filter")) {
fFunc = filter.Filter(query.Get("filter"))
}
}
if _, ok := query["compression"]; ok {
if archive.IsValidCompress(query.Get("compression")) {
compressionFormat = query.Get("compression")
}
}

// Switch though param flags
// ?download=1 or ?download_raw=1

_, download_raw := query["download_raw"]
if _, ok := query["download"]; ok || download_raw {
if !n.HasFile() {
return responder.RespondWithError(ctx, http.StatusBadRequest, "Node has no file")
}
filename := n.Id
if _, ok := query["filename"]; ok {
filename = query.Get("filename")
}

_, seek_ok := query["seek"]
if _, length_ok := query["length"]; seek_ok || length_ok {
Expand Down Expand Up @@ -130,24 +140,13 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusInternalServerError, err_msg)
}
s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: length, Filter: fFunc}
s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: length, Filter: fFunc, Compression: compressionFormat}
s.R = append(s.R, io.NewSectionReader(r, seek, length))
if download_raw {
err = s.StreamRaw()
if err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.StreamRaw: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
} else {
err = s.Stream()
if err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
if err = s.Stream(download_raw); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
} else if _, ok := query["index"]; ok {
//handling bam file
Expand All @@ -156,7 +155,7 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
return responder.RespondWithError(ctx, http.StatusBadRequest, "subset nodes do not support bam indices")
}

s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc}
s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc, Compression: compressionFormat}

var region string
if _, ok := query["region"]; ok {
Expand Down Expand Up @@ -199,6 +198,7 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
TotalUnits: totalunits,
AvgUnitSize: conf.CHUNK_SIZE,
Format: "dynamic",
CreatedOn: time.Now(),
}
err = n.Save()
if err != nil {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
}

var size int64 = 0
s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Filter: fFunc}
s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Filter: fFunc, Compression: compressionFormat}

_, hasPart := query["part"]
if n.Type == "subset" && idxName == "chunkrecord" {
Expand Down Expand Up @@ -322,22 +322,11 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
}
}
s.Size = size
if download_raw {
err = s.StreamRaw()
if err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.StreamRaw: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
} else {
err = s.Stream()
if err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
if err = s.Stream(download_raw); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
// download full file
} else {
Expand All @@ -353,7 +342,7 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {

idx := index.New()

s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc}
s := &request.Streamer{R: []file.SectionReader{}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc, Compression: compressionFormat}

fullRange := "1-" + strconv.FormatInt(n.Subset.Index.TotalUnits, 10)
recSlice, err := idx.Range(fullRange, n.Path()+"/"+n.Id+".subset.idx", n.Subset.Index.TotalUnits)
Expand All @@ -363,23 +352,11 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
for _, rec := range recSlice {
s.R = append(s.R, io.NewSectionReader(r, rec[0], rec[1]))
}

if download_raw {
err = s.StreamRaw()
if err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.StreamRaw: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
} else {
err = s.Stream()
if err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
if err = s.Stream(download_raw); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
} else {
nf, err := n.FileReader()
Expand All @@ -391,23 +368,12 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
s := &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc}
if download_raw {
err = s.StreamRaw()
if err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.StreamRaw: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
} else {
err = s.Stream()
if err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
s := &request.Streamer{R: []file.SectionReader{nf}, W: ctx.HttpResponseWriter(), ContentType: "application/octet-stream", Filename: filename, Size: n.File.Size, Filter: fFunc, Compression: compressionFormat}
if err = s.Stream(download_raw); err != nil {
// causes "multiple response.WriteHeader calls" error but better than no response
err_msg := "err:@node_Read s.Stream: " + err.Error()
logger.Error(err_msg)
return responder.RespondWithError(ctx, http.StatusBadRequest, err_msg)
}
}
}
Expand All @@ -419,10 +385,16 @@ func (cr *NodeController) Read(id string, ctx context.Context) error {
if !n.HasFile() {
return responder.RespondWithError(ctx, http.StatusBadRequest, "Node has no file")
} else {
// add options
options := map[string]string{}
if _, ok := query["filename"]; ok {
options["filename"] = query.Get("filename")
options["filename"] = filename
if fFunc != nil {
options["filter"] = query.Get("filter")
}
if compressionFormat != "" {
options["compression"] = compressionFormat
}
// set preauth
if p, err := preauth.New(util.RandString(20), "download", n.Id, options); err != nil {
err_msg := "err:@node_Read download_url: " + err.Error()
logger.Error(err_msg)
Expand Down
Loading

0 comments on commit a61b5f6

Please sign in to comment.