Skip to content

Commit

Permalink
Merge pull request #8 from creztfallen/refactor/dependencies-interchange
Browse files Browse the repository at this point in the history
Refactor/dependencies interchange
  • Loading branch information
creztfallen committed Sep 27, 2023
2 parents 6bdde8b + 97418d2 commit 8227764
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 198 deletions.
2 changes: 1 addition & 1 deletion api/.env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
API_KEY=7cc97052c8384d669d2eba12b32d3a36
API_URL="https://api.currencyfreaks.com/v2.0/rates/latest?apikey=7cc97052c8384d669d2eba12b32d3a36"
73 changes: 44 additions & 29 deletions api/handlers/latestExchangeRate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,75 @@ import (
"encoding/json"
"fmt"
"go-rich/models"
"go-rich/pubsub/utils"
mb "go-rich/pubsub/message_broker"
"log"
"net/http"
"os"
"time"

"github.com/joho/godotenv"
)

func LatestExchangeRateHandler(w http.ResponseWriter, r *http.Request) {
if err := godotenv.Load(); err != nil {
log.Fatalf("Error loading .env file %v", err)

rabbitmq, err := mb.NewRabbitMQ("amqp://localhost:5672")
if err != nil {
panic(err)
}
defer rabbitmq.Close()

apiKey := os.Getenv("API_KEY")
currency := r.URL.Query().Get("currency")
apiEndpoint := fmt.Sprintf("https://api.currencyfreaks.com/latest?apikey=%s", apiKey)

message := models.ExchangeRateMessage{
Currency: currency,
Url: apiEndpoint,
}
if err := godotenv.Load(); err != nil {
log.Fatalf("Error loading .env file %v", err)
}

utils.Publish(message, "amqp://localhost:5672", "exchange_rates")
apiUrl := os.Getenv("API_URL")
currency := r.URL.Query().Get("currency")

msgs, ch := utils.Consumer("amqp://localhost:5672", "api")
defer ch.Close()
message := models.ExchangeRateMessage{
Currency: currency,
Url: apiUrl,
}

var result models.ExchangeRateResult
rabbitmq.SendMessage(message, "exchange_rates")

go func() {
for d := range msgs {
err := json.Unmarshal(d.Body, &result)
if err != nil {
panic(err)
}
msgs, err := rabbitmq.ReceiveMessage("api")
if err != nil {
panic(err)
}
}()

time.Sleep(2 * time.Second)
defer rabbitmq.Close()

var result models.ExchangeRateResult
var resultCh = make(chan models.ExchangeRateResult)

go func() {
for d := range msgs {
err := json.Unmarshal(d.Body, &result)
if err != nil {
panic(err)
}
resultCh <- result
fmt.Println("RESULT1", result)
}
close(resultCh)
}()

result = <-resultCh
fmt.Println("RESULT2", result)

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}

func LatestExchangeRatesHandler(w http.ResponseWriter, r *http.Request) {
if err := godotenv.Load(); err != nil {
log.Fatalf("Error loading .env file %v", err)
}

apiKey := os.Getenv("API_KEY")
apiEndpoint := fmt.Sprintf("https://api.currencyfreaks.com/latest?apikey=%s", apiKey)
apiUrl := os.Getenv("API_URL")

response, err := http.Get(apiEndpoint)
response, err := http.Get(apiUrl)
if err != nil {
panic(err)
}
Expand Down
2 changes: 2 additions & 0 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"go-rich/api/handlers"
)


func main() {

http.HandleFunc("/latest", handlers.LatestExchangeRateHandler)
http.HandleFunc("/latests", handlers.LatestExchangeRatesHandler)

Expand Down
43 changes: 43 additions & 0 deletions pubsub/message_broker/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package message_broker

import (
amqp "github.com/rabbitmq/amqp091-go"
)

type MessageQueue interface {
SendMessage(message interface{}, queueName string) error
ReceiveMessage(queueName string) (<- chan amqp.Delivery, error)
Close()
}

type RabbitMQ struct {
conn *amqp.Connection
ch *amqp.Channel
CleanUp func()
}



func NewRabbitMQ(url string) (*RabbitMQ, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, err
}

ch, err := conn.Channel()
if err != nil {
return nil, err
}

rabbitmqInstance := &RabbitMQ{
conn: conn,
ch: ch,
}

rabbitmqInstance.CleanUp = func() {
ch.Close()
}

return rabbitmqInstance, nil
}

77 changes: 77 additions & 0 deletions pubsub/message_broker/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package message_broker

import (
"context"
"encoding/json"
"time"

amqp "github.com/rabbitmq/amqp091-go"
)

func (r *RabbitMQ) SendMessage(message interface{}, queueName string) error {
_, err := r.ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return err
}

encryptedMessage, err := json.Marshal(message)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err = r.ch.PublishWithContext(ctx,
"", // exchange
queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: []byte(encryptedMessage),
})

return err
}

func (r *RabbitMQ) ReceiveMessage(queueName string) (<- chan amqp.Delivery, error) {
_, err := r.ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}

msgs, err := r.ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, err
}


return msgs, nil
}

func (r *RabbitMQ) Close() {
r.CleanUp()
}
44 changes: 0 additions & 44 deletions pubsub/utils/consumer.go

This file was deleted.

50 changes: 0 additions & 50 deletions pubsub/utils/message_broker.go

This file was deleted.

45 changes: 0 additions & 45 deletions pubsub/utils/sender.go

This file was deleted.

Loading

0 comments on commit 8227764

Please sign in to comment.