diff --git a/cmd/consume.go b/cmd/consume.go new file mode 100644 index 0000000..7b98233 --- /dev/null +++ b/cmd/consume.go @@ -0,0 +1,92 @@ +/******************************************************************************* + * Copyright (c) 2019 Red Hat Inc + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/ctron/hot/pkg/utils" + "pack.ag/amqp" +) + +func consume(messageType string, uri string, tenant string) error { + + fmt.Printf("Consuming %s from %s ...", messageType, uri) + fmt.Println() + + opts := make([]amqp.ConnOption, 0) + if insecure { + opts = append(opts, amqp.ConnTLSConfig(createTlsConfig())) + } + + client, err := amqp.Dial(uri, opts...) + if err != nil { + return err + } + + defer func() { + if err := client.Close(); err != nil { + log.Fatal("Failed to close client:", err) + } + }() + + var ctx = context.Background() + + session, err := client.NewSession() + if err != nil { + return err + } + + defer func() { + if err := session.Close(ctx); err != nil { + log.Fatal("Failed to close session:", err) + } + }() + + receiver, err := session.NewReceiver( + amqp.LinkSourceAddress(fmt.Sprintf("%s/%s", messageType, tenant)), + amqp.LinkCredit(10), + ) + if err != nil { + return err + } + defer func() { + ctx, cancel := context.WithTimeout(ctx, 1*time.Second) + if err := receiver.Close(ctx); err != nil { + log.Fatal("Failed to close receiver: ", err) + } + cancel() + }() + + fmt.Printf("Consumer running, press Ctrl+C to stop...") + fmt.Println() + + for { + // Receive next message + msg, err := receiver.Receive(ctx) + if err != nil { + return err + } + + // Accept message + if err := msg.Accept(); err != nil { + return nil + } + + utils.PrintMessage(msg) + } +} diff --git a/cmd/main.go b/cmd/main.go index b8e76bd..7ae7992 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -14,109 +14,60 @@ package main import ( - "context" "crypto/tls" - "fmt" "log" - "time" - - "github.com/ctron/hot/pkg/utils" "github.com/spf13/cobra" - "pack.ag/amqp" ) var insecure bool +var contentType string = "text/plain" -func consume(messageType string, uri string, tenant string) error { - - fmt.Printf("Consuming %s from %s ...", messageType, uri) - fmt.Println() - - opts := make([]amqp.ConnOption, 0) - if insecure { - var tlsConfig = &tls.Config{ - InsecureSkipVerify: true, - } - opts = append(opts, amqp.ConnTLSConfig(tlsConfig)) - } - - client, err := amqp.Dial(uri, opts...) - if err != nil { - return err - } - - defer func() { - if err := client.Close(); err != nil { - log.Fatal("Failed to close client:", err) - } - }() - - var ctx = context.Background() - - session, err := client.NewSession() - if err != nil { - return err - } - - defer func() { - if err := session.Close(ctx); err != nil { - log.Fatal("Failed to close session:", err) - } - }() - - receiver, err := session.NewReceiver( - amqp.LinkSourceAddress(fmt.Sprintf("%s/%s", messageType, tenant)), - amqp.LinkCredit(10), - ) - if err != nil { - return err - } - defer func() { - ctx, cancel := context.WithTimeout(ctx, 1*time.Second) - if err := receiver.Close(ctx); err != nil { - log.Fatal("Failed to close receiver: ", err) - } - cancel() - }() - - fmt.Printf("Consumer running, press Ctrl+C to stop...") - fmt.Println() - - for { - // Receive next message - msg, err := receiver.Receive(ctx) - if err != nil { - return err - } - - // Accept message - if err := msg.Accept(); err != nil { - return nil - } - - utils.PrintMessage(msg) +func createTlsConfig() *tls.Config { + return &tls.Config{ + InsecureSkipVerify: insecure, } } func main() { - var cmdConsume = &cobra.Command{ + cmdConsume := &cobra.Command{ Use: "consume [telemetry|event] [message endpoint uri] [tenant]", Short: "Consume and print messages", Long: `Consume messages from the endpoint and print it on the console.`, - Args: cobra.MinimumNArgs(3), + Args: cobra.ExactArgs(3), Run: func(cmd *cobra.Command, args []string) { if err := consume(args[0], args[1], args[2]); err != nil { - log.Fatal("Failed to consume messages: ", err) + log.Fatal("Failed to consume messages:", err) } }, } - cmdConsume.Flags().BoolVar(&insecure, "insecure", false, "Skip TLS validation") + cmdPublish := &cobra.Command{ + Use: "publish", + Short: "Publish messages", + } + + cmdPublishHttp := &cobra.Command{ + Use: "http [telemetry|event] [http endpoint uri] [tenant] [deviceId] [authId] [password] [payload]", + Short: "Publish via HTTP", + Args: cobra.ExactArgs(7), + Run: func(cmd *cobra.Command, args []string) { + if err := publishHttp(args[0], args[1], args[2], args[3], args[4], args[5], contentType, args[6]); err != nil { + log.Fatal("Failed to publish via HTTP:", err) + } + }, + } + + cmdPublish.AddCommand(cmdPublishHttp) + cmdPublish.Flags().StringVarP(&contentType, "content-type", "t", "text/plain", "content type") + + // root command var rootCmd = &cobra.Command{Use: "hot"} - rootCmd.AddCommand(cmdConsume) + rootCmd.AddCommand(cmdConsume, cmdPublish) + + rootCmd.Flags().BoolVar(&insecure, "insecure", false, "Skip TLS validation") if err := rootCmd.Execute(); err != nil { panic(err) diff --git a/cmd/publish_http.go b/cmd/publish_http.go new file mode 100644 index 0000000..af9cddb --- /dev/null +++ b/cmd/publish_http.go @@ -0,0 +1,69 @@ +/******************************************************************************* + * Copyright (c) 2019 Red Hat Inc + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + *******************************************************************************/ + +package main + +import ( + "bytes" + "fmt" + "net/http" + neturl "net/url" +) + +func publishHttp(messageType string, uri string, tenant string, deviceId string, authId string, password string, contentType string, payload string) error { + + url, err := neturl.Parse(uri) + if err != nil { + return err + } + + url.Path = url.Path + neturl.PathEscape(messageType) + "/" + neturl.PathEscape(tenant) + "/" + deviceId + fmt.Println("URL:", url) + + buf := bytes.NewBufferString(payload) + + tr := &http.Transport{ + TLSClientConfig: createTlsConfig(), + } + + client := &http.Client{Transport: tr} + request, err := http.NewRequest("PUT", url.String(), buf) + if err != nil { + return err + } + + request.SetBasicAuth(authId+"@"+tenant, password) + + request.Header.Set("Content-Type", contentType) + + response, err := client.Do(request) + if err != nil { + return err + } + + fmt.Printf("Publish result: %s", response.Status) + fmt.Println() + + body := new(bytes.Buffer) + if _, err := body.ReadFrom(response.Body); err != nil { + return err + } + + fmt.Println(body.String()) + + if err := response.Body.Close(); err != nil { + fmt.Printf("Failed to close response: %v", err) + } + + return nil +}