Skip to content

Commit

Permalink
Merge pull request #31 from amenzhinsky/actions
Browse files Browse the repository at this point in the history
Add github actions
  • Loading branch information
amenzhinsky committed Nov 15, 2020
2 parents 001a670 + 73489ed commit e718b48
Show file tree
Hide file tree
Showing 19 changed files with 144 additions and 83 deletions.
13 changes: 0 additions & 13 deletions .github/main.workflow

This file was deleted.

36 changes: 36 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Test and Lint
on:
push:
branches:
- actions
- master
workflow_dispatch:

jobs:
test:
name: Test
runs-on: ubuntu-20.04
steps:
- name: Check out repository
uses: actions/checkout@v2
- name: Set up go
uses: actions/setup-go@v2
with:
go-version: '^1.15.5'
- name: Run go test
run: go test -v ./...
env:
TEST_IOTHUB_SERVICE_CONNECTION_STRING: ${{ secrets.TEST_IOTHUB_SERVICE_CONNECTION_STRING }}
TEST_EVENTHUB_CONNECTION_STRING: ${{ secrets.TEST_EVENTHUB_CONNECTION_STRING }}

lint:
name: Lint
runs-on: ubuntu-20.04
steps:
- name: Check out repository
uses: actions/checkout@v2
- name: Run golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: v1.32
args: --enable=goimports,gofumpt,whitespace,gocritic,scopelint
15 changes: 5 additions & 10 deletions common/sas.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ type EdgeSignRequestPayload struct {

// Validate the properties on EdgeSignRequestPayload
func (esrp *EdgeSignRequestPayload) Validate() error {

if len(esrp.Algo) < 1 {
esrp.Algo = "HMACSHA256"
}
Expand All @@ -213,8 +212,10 @@ type EdgeSignRequestResponse struct {
Message string `json:"message"`
}

var sharedUnixHTTPClient http.Client
var doOnce sync.Once
var (
sharedUnixHTTPClient http.Client
doOnce sync.Once
)

func setSharedUnixHTTPClient(addrName string) {
doOnce.Do(func() {
Expand All @@ -241,19 +242,14 @@ func edgeSignRequest(workloadURI, name, genid string, payload *EdgeSignRequestPa

// catch unix domain sockets URIs
if strings.Contains(workloadURI, "unix://") {

addr, err := net.ResolveUnixAddr("unix", strings.TrimPrefix(workloadURI, "unix://"))
if err != nil {
fmt.Printf("Failed to resolve: %v\n", err)
return "", err
}

setSharedUnixHTTPClient(addr.Name)

var response *http.Response
//var err error

response, err = sharedUnixHTTPClient.Post("http://iotedge"+fmt.Sprintf("/modules/%s/genid/%s/sign?api-version=2018-06-28", name, genid), "text/plain", bytes.NewBuffer(payloadJSON))
response, err := sharedUnixHTTPClient.Post("http://iotedge"+fmt.Sprintf("/modules/%s/genid/%s/sign?api-version=2018-06-28", name, genid), "text/plain", bytes.NewBuffer(payloadJSON))
if err != nil {
return "", fmt.Errorf("sign: unable to sign request (resp): %s", err.Error())
}
Expand All @@ -268,7 +264,6 @@ func edgeSignRequest(workloadURI, name, genid string, payload *EdgeSignRequestPa
if err != nil {
return "", fmt.Errorf("sign: unable to sign request (unm): %s", err.Error())
}

} else {
// format uri string for base uri
uri := fmt.Sprintf("%smodules/%s/genid/%s/sign?api-version=2018-06-28", workloadURI, name, genid)
Expand Down
11 changes: 1 addition & 10 deletions common/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,19 @@ type TrustBundleResponse struct {

// TrustBundle root CA certificates pool for connecting to EdgeHub Gateway.
func TrustBundle(workloadURI string) (*x509.CertPool, error) {

tbr := TrustBundleResponse{}
var err error

// catch unix domain sockets URIs
if strings.Contains(workloadURI, "unix://") {

addr, err := net.ResolveUnixAddr("unix", strings.TrimPrefix(workloadURI, "unix://"))
if err != nil {
fmt.Printf("Failed to resolve: %v\n", err)
return nil, err
}

setSharedUnixHTTPClient(addr.Name)

var response *http.Response
//var err error

response, err = sharedUnixHTTPClient.Get("http://iotedge" + "/trust-bundle?api-version=2019-11-05")
response, err := sharedUnixHTTPClient.Get("http://iotedge" + "/trust-bundle?api-version=2019-11-05")
if err != nil {
return nil, fmt.Errorf("tls: unable to append certificates: %s", err.Error())
}
Expand All @@ -132,9 +126,7 @@ func TrustBundle(workloadURI string) (*x509.CertPool, error) {
if err != nil {
return nil, fmt.Errorf("tls: unable to append certificates: %s", err.Error())
}

} else {

// format uri string
uri := fmt.Sprintf("%strust-bundle?api-version=2019-11-05", workloadURI)

Expand Down Expand Up @@ -163,5 +155,4 @@ func TrustBundle(workloadURI string) (*x509.CertPool, error) {
}

return p, err

}
2 changes: 1 addition & 1 deletion eventhub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (c *Client) getPartitionIDs(ctx context.Context, sess *amqp.Session) ([]str
if msg.Properties.CorrelationID != mid {
return nil, errors.New("message-id mismatch")
}
if err := msg.Accept(); err != nil {
if err := msg.Accept(ctx); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion eventhub/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestClient_Subscribe(t *testing.T) {
defer cancel()

if err := c.Subscribe(ctx, func(msg *Event) error {
return msg.Accept()
return msg.Accept(ctx)
},
WithSubscribeSince(time.Now()),
); err != nil && err != context.DeadlineExceeded {
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
module github.com/amenzhinsky/iothub

require (
github.com/Azure/go-amqp v0.12.7
github.com/Azure/go-amqp v0.13.1
github.com/eclipse/paho.mqtt.golang v1.2.0
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b // indirect
)

go 1.11
go 1.13
24 changes: 16 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
github.com/Azure/go-amqp v0.12.7 h1:/Uyqh30J5JrDFAOERQtEqP0qPWkrNXxr94vRnSa54Ac=
github.com/Azure/go-amqp v0.12.7/go.mod h1:qApuH6OFTSKZFmCOxccvAv5rLizBQf4v8pRmG138DPo=
github.com/Azure/go-amqp v0.13.1 h1:dXnEJ89Hf7wMkcBbLqvocZlM4a3uiX9uCxJIvU77+Oo=
github.com/Azure/go-amqp v0.13.1/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs=
github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0=
github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/google/go-cmp v0.5.1 h1:JFrFEBb2xKufg6XkJsJr+WbKb4FQlURi5RUcBveYu9k=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
5 changes: 3 additions & 2 deletions iotdevice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"os"
"sync"

"github.com/amenzhinsky/iothub/common"
Expand Down Expand Up @@ -82,7 +83,7 @@ func New(

ready: make(chan struct{}),
done: make(chan struct{}),
logger: logger.New(logger.LevelWarn, nil),
logger: logger.NewFromString(os.Getenv("IOTHUB_DEVICE_LOG_LEVEL")),

evMux: newEventsMux(),
tsMux: newTwinStateMux(),
Expand Down Expand Up @@ -209,7 +210,7 @@ func (s TwinState) Version() int {
}

// RetrieveTwinState returns desired and reported twin device states.
func (c *Client) RetrieveTwinState(ctx context.Context) (desired TwinState, reported TwinState, err error) {
func (c *Client) RetrieveTwinState(ctx context.Context) (desired, reported TwinState, err error) {
if err := c.checkConnection(ctx); err != nil {
return nil, nil, err
}
Expand Down
1 change: 0 additions & 1 deletion iotdevice/module_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func NewModuleFromConnectionString(
edge bool,
opts ...ClientOption,
) (*ModuleClient, error) {

creds, err := ParseModuleConnectionString(cs)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion iotdevice/module_credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

// ModuleSharedAccessKeyCredentials is a SharedAccessKeyCredentials struct adapted for module connections
type ModuleSharedAccessKeyCredentials struct {
SharedAccessKeyCredentials //embedded SharedAccessKeyCredentials struct
SharedAccessKeyCredentials // embedded SharedAccessKeyCredentials struct
ModuleID string // moduleID
Gateway string // name of host gateway
GenerationID string // module generation ID
Expand Down
2 changes: 0 additions & 2 deletions iotdevice/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,11 @@ func (m *eventsMux) once(fn func() error) error {
func (m *eventsMux) Dispatch(msg *common.Message) {
m.mu.RLock()
for _, s := range m.subs {
//go func() {
select {
case <-s.done:
case <-m.done:
case s.ch <- msg:
}
//}()
}
m.mu.RUnlock()
}
Expand Down
14 changes: 6 additions & 8 deletions iotdevice/transport/mqtt/module_mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,19 @@ func (tr *ModuleTransport) Connect(ctx context.Context, creds transport.Credenti
return username, ""
}
audience := creds.GetHostName() + "/devices/" + url.QueryEscape(creds.GetDeviceID()) + "/modules/" + url.QueryEscape(creds.GetModuleID())
sas := common.SharedAccessSignature{}
if creds.UseEdgeGateway() {
sas, err := creds.TokenFromEdge(creds.GetWorkloadURI(), creds.GetModuleID(), creds.GetGenerationID(), audience, time.Hour)
if err != nil {
tr.logger.Errorf("cannot generate token: %s", err)
return "", ""
}
return username, sas.String()
} else {
sas, err := creds.Token(url.QueryEscape(audience), time.Hour)
if err != nil {
tr.logger.Errorf("cannot generate token: %s", err)
return "", ""
}
return username, sas.String()
}

sas, err := creds.Token(url.QueryEscape(audience), time.Hour)
if err != nil {
tr.logger.Errorf("cannot generate token: %s", err)
return "", ""
}
return username, sas.String()
})
Expand Down
2 changes: 1 addition & 1 deletion iotdevice/transport/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (tr *Transport) request(ctx context.Context, topic string, b []byte) (*resp

select {
case r := <-rch:
if r.code < 200 && r.code > 299 {
if r.code < 200 || r.code > 299 {
return nil, fmt.Errorf("request failed with %d response code", r.code)
}
return r, nil
Expand Down
2 changes: 1 addition & 1 deletion iotdevice/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package transport
import (
"context"
"crypto/tls"
"github.com/amenzhinsky/iothub/logger"
"time"

"github.com/amenzhinsky/iothub/common"
"github.com/amenzhinsky/iothub/logger"
)

// Transport interface.
Expand Down
19 changes: 10 additions & 9 deletions iotservice/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -73,7 +74,7 @@ func New(sak *common.SharedAccessKey, opts ...ClientOption) (*Client, error) {
c := &Client{
sak: sak,
done: make(chan struct{}),
logger: logger.New(logger.LevelWarn, nil),
logger: logger.NewFromString(os.Getenv("IOTHUB_SERVICE_LOG_LEVEL")),
}
for _, opt := range opts {
opt(c)
Expand Down Expand Up @@ -225,7 +226,7 @@ func (c *Client) putToken(
if err != nil {
return err
}
if err = msg.Accept(); err != nil {
if err = msg.Accept(ctx); err != nil {
return err
}
return eventhub.CheckMessageResponse(msg)
Expand Down Expand Up @@ -301,7 +302,7 @@ func (c *Client) SubscribeEvents(ctx context.Context, fn EventHandler) error {
if err := fn(&Event{FromAMQPMessage(msg.Message)}); err != nil {
return err
}
return msg.Accept()
return msg.Accept(ctx)
},
eventhub.WithSubscribeSince(time.Now()),
)
Expand Down Expand Up @@ -484,7 +485,7 @@ func (c *Client) SubscribeFeedback(ctx context.Context, fn FeedbackHandler) erro
return err
}
}
if err = msg.Accept(); err != nil {
if err = msg.Accept(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -543,7 +544,7 @@ func (c *Client) SubscribeFileNotifications(
if err := fn(&FileNotification{msg}); err != nil {
return err
}
if err = msg.Accept(); err != nil {
if err = msg.Accept(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -741,7 +742,7 @@ func (c *Client) bulkRequest(
&res,
)
if err != nil {
if re, ok := err.(*RequestError); ok && re.Res.StatusCode == http.StatusBadRequest {
if re, ok := err.(*RequestError); ok && re.Code == http.StatusBadRequest {
if err = json.Unmarshal(re.Body, &res); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1387,20 +1388,20 @@ func (c *Client) call(
return nil, &e
}
}
return nil, &RequestError{Res: res, Body: body}
return nil, &RequestError{Code: res.StatusCode, Body: body}
}

// RequestError is an API request error.
//
// Response body is already read out to Body attribute,
// so there's no need read it manually and call `e.Res.Body.Close()`
type RequestError struct {
Res *http.Response
Code int
Body []byte
}

func (e *RequestError) Error() string {
return fmt.Sprintf("code = %d, body = %q", e.Res.StatusCode, e.Body)
return fmt.Sprintf("code = %d, body = %q", e.Code, e.Body)
}

func genID() string {
Expand Down

0 comments on commit e718b48

Please sign in to comment.