From e73430b2a51cb6b1bebd2adabae47e23ba80ed60 Mon Sep 17 00:00:00 2001 From: Ahmed Sabbour <103856+sabbour@users.noreply.github.com> Date: Sun, 11 Nov 2018 13:15:42 +0400 Subject: [PATCH] Updated to use Service Bus using @torosent code --- .dockerignore | 4 + Dockerfile | 6 +- README.md | 13 +-- controllers/order.go | 3 +- models/order.go | 263 +++++++++++++------------------------------ routers/router.go | 11 ++ 6 files changed, 102 insertions(+), 198 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..7940a865 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +Dockerfile +README.md +.gitignore +azure-pipelines.yml \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index d2c72815..188f64ef 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,13 +29,15 @@ COPY --from=builder /go/src/captureorderfd . # Define environment variables # Application Insights ENV APPINSIGHTS_KEY= + +# PLEASE DO NOT OVERRIDE UNLESS INSTRUCTED BY PROCTORS ENV CHALLENGEAPPINSIGHTS_KEY=23c6b1ec-ca92-4083-86b6-eba851af9032 # Challenge Logging ENV TEAMNAME= -# AMQP -ENV AMQPURL= +# AMQP for Service Bus +ENV AMQPURL="amqps://[policy name]:[policy key]@[yourServiceBus].servicebus.windows.net/[queuename]" # Mongo/Cosmos ENV MONGOURL= diff --git a/README.md b/README.md index 47af0267..181c3aff 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,7 @@ The following environment variables need to be passed to the container: ``` ENV TEAMNAME=[YourTeamName] ENV APPINSIGHTS_KEY=[YourCustomApplicationInsightsKey] # Optional, create your own App Insights resource +ENV CHALLENGEAPPINSIGHTS_KEY=[Challenge Application Insights Key] # Override, if given one by the proctors ``` ### For MongoDB @@ -47,19 +48,13 @@ ENV MONGOURL=mongodb://[mongoinstance].[namespace] ENV MONGOURL=mongodb://[CosmosDBInstanceName]:[CosmosDBPrimaryPassword]=@[CosmosDBInstanceName].documents.azure.com:10255/?ssl=true&replicaSet=globaldb ``` -### For RabbitMQ +### For Service Bus ``` -ENV AMQPURL=amqp://[url]:5672 +ENV AMQPURL=amqps://[policy name]:[policy key]@[yourServiceBus].servicebus.windows.net/[queuename] ``` -### For Event Hubs - -``` -ENV AMQPURL=amqps://[policy name]:[policy key]@[youreventhub].servicebus.windows.net/[eventhubname] -``` - -Make sure your _policy key_ is URL Encoded. Use a tool like: +> Make sure your _policy key_ is URL Encoded. Use a tool like: ## Contributing diff --git a/controllers/order.go b/controllers/order.go index 87bf370b..48818fab 100644 --- a/controllers/order.go +++ b/controllers/order.go @@ -23,7 +23,8 @@ func (this *OrderController) Post() { var ob models.Order json.Unmarshal(this.Ctx.Input.RequestBody, &ob) - + models.TrackInitialOrder(ob) + // Add the order to MongoDB addedOrder, err := models.AddOrderToMongoDB(ob) diff --git a/models/order.go b/models/order.go index 9aa36369..7df05d8e 100644 --- a/models/order.go +++ b/models/order.go @@ -18,7 +18,6 @@ import ( "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" amqp10 "pack.ag/amqp" - amqp091 "github.com/streadway/amqp" "gopkg.in/matryer/try.v1" ) @@ -51,16 +50,11 @@ var mongoDatabaseName = "k8orders" var mongoCollectionName = "orders" var mongoCollectionShardKey = "partition" -// AMQP 0.9.1 variables -var amqp091Client *amqp091.Connection -var amqp091Channel *amqp091.Channel -var amqp091Queue amqp091.Queue - // AMQP 1.0 variables var amqp10Client *amqp10.Client var amqp10Session *amqp10.Session var amqpSender *amqp10.Sender -var eventHubName string +var serivceBusName string // Application Insights telemetry clients var challengeTelemetryClient appinsights.TelemetryClient @@ -68,9 +62,21 @@ var customTelemetryClient appinsights.TelemetryClient // For tracking and code branching purposes var isCosmosDb = strings.Contains(mongoURL, "documents.azure.com") -var isEventHub = strings.Contains(amqpURL, "servicebus.windows.net") var db string // CosmosDB or MongoDB? -var queueType string // EventHub or RabbitMQ + +// TrackInitialOrder send telemetry data for initial order, to track challenge +func TrackInitialOrder(order Order) { + eventTelemetry := appinsights.NewEventTelemetry("Initial order") + eventTelemetry.Properties["team"] = teamName + eventTelemetry.Properties["sequence"] = "0" + eventTelemetry.Properties["type"] = "http" + eventTelemetry.Properties["service"] = "CaptureOrder" + eventTelemetry.Properties["orderId"] = order.OrderID + challengeTelemetryClient.Track(eventTelemetry) + if customTelemetryClient != nil { + customTelemetryClient.Track(eventTelemetry) + } +} // AddOrderToMongoDB Adds the order to MongoDB/CosmosDB func AddOrderToMongoDB(order Order) (Order, error) { @@ -118,10 +124,12 @@ func AddOrderToMongoDB(order Order) (Order, error) { if success { // Track the event for the challenge purposes - eventTelemetry := appinsights.NewEventTelemetry("CapureOrder: - Team Name " + teamName + " db " + db) + eventTelemetry := appinsights.NewEventTelemetry("CaptureOrder to " + db) eventTelemetry.Properties["team"] = teamName - eventTelemetry.Properties["challenge"] = "1-captureorder" + eventTelemetry.Properties["sequence"] = "1" eventTelemetry.Properties["type"] = db + eventTelemetry.Properties["service"] = "CaptureOrder" + eventTelemetry.Properties["orderId"] = order.OrderID challengeTelemetryClient.Track(eventTelemetry) } @@ -161,13 +169,9 @@ func AddOrderToMongoDB(order Order) (Order, error) { return order, mongoDBSessionError } -// AddOrderToAMQP Adds the order to AMQP (EventHub/RabbitMQ) +// AddOrderToAMQP Adds the order to AMQP (Service Bus Queue) func AddOrderToAMQP(order Order) { - if isEventHub { - addOrderToAMQP10(order) - } else { - addOrderToAMQP091(order) - } + addOrderToAMQP10(order) } //// BEGIN: NON EXPORTED FUNCTIONS @@ -223,9 +227,7 @@ func initMongoDial() (success bool, mErr error) { url, err := url.Parse(mongoURL) if err != nil { // If the team provided an Application Insights key, let's track that exception - if customTelemetryClient != nil { - customTelemetryClient.TrackException(err) - } + trackException(err) log.Fatal(fmt.Sprintf("Problem parsing Mongo URL %s: ",url), err) } @@ -242,7 +244,7 @@ func initMongoDial() (success bool, mErr error) { var dialInfo *mgo.DialInfo mongoUsername := "" mongoPassword := "" - if url.User!=nil { + if url.User != nil { mongoUsername = url.User.Username() mongoPassword, _ = url.User.Password() } @@ -286,10 +288,11 @@ func initMongoDial() (success bool, mErr error) { mongoDBSession, mongoDBSessionError = mgo.DialWithInfo(dialInfo) if mongoDBSessionError != nil { log.Println(fmt.Sprintf("Can't connect to mongo at [%s], go error: ", mongoURL), mongoDBSessionError) + trackException(mongoDBSessionError) mErr = mongoDBSessionError } else { success = true - log.Println("\tConnected") + log.Println("\tConnected") } mongoDBSession.SetMode(mgo.Monotonic, true) @@ -298,7 +301,7 @@ func initMongoDial() (success bool, mErr error) { mongoDBSession.SetPoolLimit(mongoPoolLimit) endTime := time.Now() - + // Track the dependency, if the team provided an Application Insights key, let's track that dependency if customTelemetryClient != nil { if isCosmosDb { @@ -306,12 +309,12 @@ func initMongoDial() (success bool, mErr error) { "CosmosDB", "MongoDB", mongoURL, - success) + success) dependency.Data = "Create session" - if mongoDBSessionError != nil { - dependency.ResultCode = mongoDBSessionError.Error() - } + if mongoDBSessionError != nil { + dependency.ResultCode = mongoDBSessionError.Error() + } dependency.MarkTime(startTime, endTime) customTelemetryClient.TrackException(mongoDBSessionError) @@ -321,12 +324,12 @@ func initMongoDial() (success bool, mErr error) { "MongoDB", "MongoDB", mongoURL, - success) + success) dependency.Data = "Create session" - if mongoDBSessionError != nil { - dependency.ResultCode = mongoDBSessionError .Error() - } + if mongoDBSessionError != nil { + dependency.ResultCode = mongoDBSessionError.Error() + } dependency.MarkTime(startTime, endTime) customTelemetryClient.TrackException(mongoDBSessionError) @@ -338,15 +341,14 @@ func initMongoDial() (success bool, mErr error) { // Initialize the MongoDB client func initMongo() { - - success,err := initMongoDial() + + success, err := initMongoDial() if !success { os.Exit(1) } mongoDBSessionCopy := mongoDBSession.Copy() defer mongoDBSessionCopy.Close() - // SetSafe changes the mongoDBSessionCopy safety mode. // If the safe parameter is nil, the mongoDBSessionCopy is put in unsafe mode, and writes become fire-and-forget, @@ -360,7 +362,7 @@ func initMongo() { bson.D{ { "shardCollection", - fmt.Sprintf("%s.%s",mongoDatabaseName,mongoCollectionName), + fmt.Sprintf("%s.%s", mongoDatabaseName, mongoCollectionName), }, { "key", @@ -368,9 +370,10 @@ func initMongo() { mongoCollectionShardKey: "hashed", }, }, - }, &result); + }, &result) if err != nil { + trackException(err) // The collection is most likely created and already sharded. I couldn't find a more elegant way to check this. log.Println("Could not create/re-create sharded MongoDB collection. Either collection is already sharded or sharding is not supported. You can ignore this error: ", err) } else { @@ -390,99 +393,37 @@ func initAMQP() { log.Fatal(fmt.Sprintf("Problem parsing AMQP Host %s. Make sure you URL Encoded your policy/password.",url), err) } - // Figure out if we're running on EventHub or elsewhere - if isEventHub { - log.Println("Using EventHub") - queueType = "EventHub" - // Parse the eventHubName (last part of the url) - eventHubName = url.Path - initAMQP10() - } else { - log.Println("Using RabbitMQ") - queueType = "RabbitMQ" - initAMQP091() - } + log.Println("Using Service Bus") + queueType = "ServiceBus" + + // Parse the eventHubName (last part of the url) + serivceBusName = url.Path + initAMQP10() + log.Println("\tAMQP URL: " + amqpURL) log.Println("** READY TO TAKE ORDERS **") } -func initAMQP091() { - log.Println("Attempting to connect to RabbitMQ") +func initAMQP10() { // Try to establish the connection to AMQP // with retry logic err := try.Do(func(attempt int) (bool, error) { var err error - amqp091Client, err = amqp091.Dial(amqpURL) - if err != nil { - // If the team provided an Application Insights key, let's track that exception - if customTelemetryClient != nil { - customTelemetryClient.TrackException(err) - } - } - - if err != nil { - log.Println("Error connecting to Rabbit instance. Will retry in 5 seconds:", err) - time.Sleep(5 * time.Second) // wait - } - return attempt < 3, err - }) - - // If we still can't connect - if err != nil { - log.Println("Couldn't connect to Rabbit after 3 retries:", err) - } else { - log.Println("\tConnected to RabbitMQ. Establishing Channel and Queue") - - // Otherwise, let's continue and establish the channel and queue - amqp091Channel, err = amqp091Client.Channel() - if err != nil { - // If the team provided an Application Insights key, let's track that exception - if customTelemetryClient != nil { - customTelemetryClient.TrackException(err) - } - } - - amqp091Queue, err = amqp091Channel.QueueDeclare( - "order", // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - } -} - -func initAMQP10() { - - // Try to establish the connection to AMQP - // with retry logic - err := try.Do(func(attempt int) (bool, error) { - var err error - - log.Println("Attempting to connect to EventHub") + log.Println("Attempting to connect to ServiceBus") amqp10Client, err = amqp10.Dial(amqpURL) if err != nil { - // If the team provided an Application Insights key, let's track that exception - if customTelemetryClient != nil { - customTelemetryClient.TrackException(err) - } + trackException(err) } - //defer amqp10Client.Close() - // Open a session if we managed to get an amqpClient log.Println("\tConnected to EventHub") if amqp10Client != nil { - log.Println("\tCreating a new AMQP session") + log.Println("\tCreating a new AMQP session") amqp10Session, err = amqp10Client.NewSession() if err != nil { - // If the team provided an Application Insights key, let's track that exception - if customTelemetryClient != nil { - customTelemetryClient.TrackException(err) - } + trackException(err) log.Fatal("\t\tCreating AMQP session: ", err) } } @@ -490,7 +431,7 @@ func initAMQP10() { // Create a sender log.Println("\tCreating AMQP sender") amqpSender, err = amqp10Session.NewSender( - amqp10.LinkTargetAddress(eventHubName), + amqp10.LinkTargetAddress(serivceBusName), ) if err != nil { // If the team provided an Application Insights key, let's track that exception @@ -501,7 +442,7 @@ func initAMQP10() { } if err != nil { - log.Println("Error connecting to EventHub instance. Will retry in 5 seconds:", err) + log.Println("Error connecting to Service Bus instance. Will retry in 5 seconds:", err) time.Sleep(5 * time.Second) // wait } return attempt < 3, err @@ -509,69 +450,7 @@ func initAMQP10() { // If we still can't connect if err != nil { - log.Println("Couldn't connect to EventHub after 3 retries:", err) - } -} - -// addOrderToAMQP091 Adds the order to AMQP 0.9.1 -func addOrderToAMQP091(order Order) { - if amqp091Channel == nil { - log.Println("Skipping AMQP. It is either not configured or improperly configured") - } else { - success := false - startTime := time.Now() - body := fmt.Sprintf("{\"order\": \"%s\", \"source\": \"%s\"}", order.OrderID, teamName) - - // Send message - err := amqp091Channel.Publish( - "", // exchange - amqp091Queue.Name, // routing key - false, // mandatory - false, // immediate - amqp091.Publishing{ - DeliveryMode: amqp091.Persistent, - ContentType: "application/json", - Body: []byte(body), - }) - if err != nil { - // If the team provided an Application Insights key, let's track that exception - if customTelemetryClient != nil { - customTelemetryClient.TrackException(err) - } - log.Println("Sending message:", err) - } else { - success = true - } - - endTime := time.Now() - - if success { - // Track the event for the challenge purposes - eventTelemetry := appinsights.NewEventTelemetry("SendOrder: - Team Name " + teamName + " - RabbitMQ") - eventTelemetry.Properties["team"] = teamName - eventTelemetry.Properties["challenge"] = "2-sendmessage" - eventTelemetry.Properties["type"] = "rabbitmq" - challengeTelemetryClient.Track(eventTelemetry) - } - - // Track the dependency, if the team provided an Application Insights key, let's track that dependency - if customTelemetryClient != nil { - dependency := appinsights.NewRemoteDependencyTelemetry( - "RabbitMQ", - "AMQP", - amqpURL, - success) - dependency.Data = "Send message" - - if err != nil { - dependency.ResultCode = err.Error() - } - - dependency.MarkTime(startTime, endTime) - customTelemetryClient.Track(dependency) - } - - log.Printf("Sent to AMQP 0.9.1 (RabbitMQ) - %t, %s: %s", success, amqpURL, body) + log.Println("Couldn't connect to Service Bus after 3 retries:", err) } } @@ -589,7 +468,7 @@ func addOrderToAMQP10(order Order) { // Get an empty context amqp10Context := context.Background() - log.Printf("AMQP URL: %s, Target: %s", amqpURL, eventHubName) + log.Printf("AMQP URL: %s, Target: %s", amqpURL, serivceBusName) // Prepare the context to timeout in 5 seconds amqp10Context, cancel := context.WithTimeout(amqp10Context, 5*time.Second) @@ -605,13 +484,11 @@ func addOrderToAMQP10(order Order) { default: log.Println("Encountered an error sending AMQP. Will not retry: ", err) // If the team provided an Application Insights key, let's track that exception - if customTelemetryClient != nil { - customTelemetryClient.TrackException(err) - } + trackException(err) // This is an unhandled error, don't retry return false, err case *amqp10.DetachError: - log.Println("EventHub detached. Will reconnect and retry: " , t, err) + log.Println("Service Bus detached. Will reconnect and retry: " , t, err) initAMQP10() } } @@ -629,21 +506,23 @@ func addOrderToAMQP10(order Order) { if success { // Track the event for the challenge purposes - eventTelemetry := appinsights.NewEventTelemetry("SendOrder: - Team Name " + teamName + " - EventHub") + eventTelemetry := appinsights.NewEventTelemetry("SendOrder to ServiceBus") eventTelemetry.Properties["team"] = teamName - eventTelemetry.Properties["challenge"] = "2-sendmessage" - eventTelemetry.Properties["type"] = "eventhub" + eventTelemetry.Properties["sequence"] = "2" + eventTelemetry.Properties["type"] = "servicebus" + eventTelemetry.Properties["service"] = "CaptureOrder" + eventTelemetry.Properties["orderId"] = order.OrderID challengeTelemetryClient.Track(eventTelemetry) } // Track the dependency, if the team provided an Application Insights key, let's track that dependency if customTelemetryClient != nil { dependency := appinsights.NewRemoteDependencyTelemetry( - "EventHub", + "ServiceBus", "AMQP", amqpURL, success) - dependency.Data = "Send message" + dependency.Data = "Send message" if err != nil { dependency.ResultCode = err.Error() @@ -653,7 +532,19 @@ func addOrderToAMQP10(order Order) { customTelemetryClient.Track(dependency) } - log.Printf("Sent to AMQP 1.0 (EventHub) - %t, %s: %s", success, amqpURL, body) + log.Printf("Sent to AMQP 1.0 (ServiceBus) - %t, %s: %s", success, amqpURL, body) + } +} + +func trackException(err error) { + if err != nil { + log.Println(err) + if challengeTelemetryClient != nil { + challengeTelemetryClient.TrackException(err) + } + if customTelemetryClient != nil { + customTelemetryClient.TrackException(err) + } } } diff --git a/routers/router.go b/routers/router.go index 6f80031f..036a163a 100644 --- a/routers/router.go +++ b/routers/router.go @@ -11,6 +11,8 @@ import ( "captureorderfd/controllers" "github.com/astaxie/beego" + "github.com/astaxie/beego/context" + "github.com/astaxie/beego/plugins/cors" ) func init() { @@ -22,4 +24,13 @@ func init() { ), ) beego.AddNamespace(ns) + beego.Get("/healthz", func(ctx *context.Context) { + ctx.Output.Body([]byte("i'm alive!")) + }) + beego.InsertFilter("*", beego.BeforeRouter, cors.Allow(&cors.Options{ + AllowAllOrigins: true, + AllowMethods: []string{"GET", "POST", "PUT", "DELETE", "OPTIONS"}, + AllowHeaders: []string{"Origin", "Authorization", "Access-Control-Allow-Origin"}, + ExposeHeaders: []string{"Content-Length", "Access-Control-Allow-Origin"}, + })) }