Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Appears to be sending Byte message rather then Text #21

Closed
johntdyer opened this issue Jan 6, 2015 · 3 comments
Closed

Appears to be sending Byte message rather then Text #21

johntdyer opened this issue Jan 6, 2015 · 3 comments
Labels

Comments

@johntdyer
Copy link

when i send via Stompngo I seem to be publishing messages as ActiveMqByte Messages

Jan 06 15:26:32 DEBUG tropo-cdr2 [voiceCdrSpringListenerBillwise-1] BillingBridge - Received message of type [class org.apache.activemq.command.ActiveMQBytesMessage] from consumer [ActiveMQMessageConsumer { value=ID:tropo-cdr2.prod.us-west-2xxxxxxx-43243-1417638886581-0:0:1:1, started=true }] of session [PooledSession { ActiveMQSession {id=ID:tropo-cdr2.prod.us-west-2.aws.xxxxxx-43243-1417638886581-0:0:1,started=true} }]

I am using the Send function which as I understand it should be sending a string to the queue

package main

import (
    "bufio"
    "fmt"
    "github.com/gmallard/stompngo"
    "gopkg.in/alecthomas/kingpin.v1"
    "net"
    "os"
)

var (
    queueName     = kingpin.Flag("queue", "Destination").Default("/queue/client_test").Short('q').String()
    serverAddr    = kingpin.Flag("server", "STOMP server endpoint").Default("127.0.0.1").Short('s').String()
    serverPort    = kingpin.Flag("port", "STOMP server port").Default("61613").Short('p').String()
    fileToProcess = kingpin.Flag("file", "File to process").Short('f').String()
    workerCount   = kingpin.Flag("workers", "Number of workers to send/receive").Short('w').Int()
    serverUser    = kingpin.Flag("user", "Username").OverrideDefaultFromEnvar("STOMP_USER").String()
    serverPass    = kingpin.Flag("pass", "Password").OverrideDefaultFromEnvar("STOMP_PASS").String()

    client Client
    done   = make(chan bool)
)

//var done = make(chan bool)

type Client struct {
    Host     string
    Port     string
    User     string
    Password string
    Uuid     string
    Queue    string

    NetConnection   net.Conn
    StompConnection *stompngo.Connection
}

func init() {
    kingpin.Version("0.0.1")
    kingpin.Parse()

    // Set default of 4 workers
    if *workerCount == 0 {
        *workerCount = 4
    }

}

// Setups connection options
func (client *Client) setOpts() {

    client.Host = *serverAddr
    client.Port = *serverPort
    client.Uuid = stompngo.Uuid()
    client.Queue = *queueName

    if *serverUser != "" {
        client.User = *serverUser
    }

    if *serverPass != "" {
        client.Password = *serverPass
    }
}

// Creates net connection
func (client *Client) netConnection() (conn net.Conn, err error) {
    conn, err = net.Dial("tcp", net.JoinHostPort(client.Host, client.Port))
    if err != nil {
        fmt.Println(err)
        return nil, err
    }

    client.NetConnection = conn
    return
}

func (client *Client) stompConnection() *stompngo.Connection {
    headers := stompngo.Headers{
        "accept-version", "1.1",
        "host", client.Host,
        "login", client.User,
        "passcode", client.Password,
    }

    conn, err := stompngo.Connect(client.NetConnection, headers)
    if err != nil {
        fmt.Println(err)
        os.Exit(0)
    }

    client.StompConnection = conn
    return conn
}

func (client *Client) Connect() (conn *stompngo.Connection) {
    client.setOpts()
    client.netConnection()

    conn = client.stompConnection()

    return
}

func (client *Client) Disconnect() {
    client.StompConnection.Disconnect(stompngo.Headers{})
    client.NetConnection.Close()
}

//  Start main
//
//
func main() {

    fmt.Println("Starting connection")
    _ = client.Connect()
    defer client.Disconnect()

    dataCh := make(chan string, *workerCount)

    // Start workers
    fmt.Println("Create workers")
    for id := 1; id <= *workerCount; id++ {
        go sender(id, &client, dataCh)
    }

    //  Start reader go routine
    fmt.Println("Read file")
    go fileReader(*fileToProcess, dataCh)

    <-done

    fmt.Println("Done")
}

//  Read from file and put data line by line on channel
func fileReader(path string, dataCh chan<- string) {
    inFile, _ := os.Open(path)
    defer inFile.Close()
    scanner := bufio.NewScanner(inFile)
    scanner.Split(bufio.ScanLines)

    for scanner.Scan() {
        dataCh <- scanner.Text()
    }
    close(dataCh)
}

//  Read from channel and put on queue
func sender(id int, client *Client, dataCh <-chan string) {

    for message := range dataCh {
        headers := stompngo.Headers{"destination", *queueName, "id", client.Uuid, "persistent", "true"}
        err := client.StompConnection.Send(headers, message)
        if err != nil {
            fmt.Println(err)
        }
    }

    done <- true
}
@johntdyer
Copy link
Author

Hrm, I was able to get past this by suppressing the contentLength header

headers := stompngo.Headers{"destination", *queueName, "suppress-content-length", "true", "id", client.Uuid, "persistent", "true"}

Not sure I understand why that resolved my issue...

@gmallard
Copy link
Owner

gmallard commented Jan 6, 2015

It solved your issue because it is the way ActiveMQ works, and is to assist with interaction between STOMP and JMS clients.

One description of this is here:

http://activemq.apache.org/stomp.html

@gmallard
Copy link
Owner

gmallard commented Jul 6, 2016

If you are still using the stompngo package please try the current v.1.0.2 tag.

Tag information:

b1f1d96 refs/tags/v1.0.2

Document that version number in any issue reports.

Thanks for the support.

Guy

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants