Skip to content

Latest commit

 

History

History
1250 lines (903 loc) · 48.3 KB

File metadata and controls

1250 lines (903 loc) · 48.3 KB

八、并发应用架构

By now, we've designed small bits of concurrent programs, primarily in a single piece keeping concurrency largely isolated. What we haven't done yet is tie everything together to build something a little more robust, complex, and more daunting to manage from an administrator's perspective.

简单的聊天应用和 web 服务器都很不错。然而,最终您将需要更复杂的环境,并需要外部软件来满足所有更高级的需求。

在本例中,我们将构建一些不协调的服务来满足需求:一个具有修订控制的文件管理器,提供 web 和 shell 访问。Dropbox 和 Google Drive 等服务允许用户在同行之间保存和共享文件。另一方面,GitHub 及其同类产品允许使用类似的平台,但具有修订控制的关键附加好处。

Many organizations face problems with the following sharing and distribution options:

  • 对存储库、存储或文件数量的限制
  • 如果服务关闭,则可能无法访问
  • 安全问题,特别是敏感信息的安全问题

Dropbox 和 Google Drive 等简单的共享应用在存储数据方面非常出色,而无需大量的修订控制选项。GitHub 是一个优秀的协作版本控制和分发系统,但它会带来很多成本,开发人员的错误可能会导致严重的安全漏洞。

我们将结合版本控制的目标(以及 GitHub 的理想)和 Dropbox/Google Drive 的简单性和开放性。这种类型的应用将非常适合作为内部网的替代品,完全隔离,可以通过自定义身份验证进行访问,而不必依赖云服务。将所有信息都保存在内部的能力消除了任何潜在的网络安全问题,并允许管理员以适合其组织的方式设计永久备份解决方案。

组织内的文件共享将允许分叉、备份、文件锁定和修订控制,所有这些都可以通过命令行进行,也可以通过简单的 web 界面进行。

设计我们的并发应用

在设计并发应用时,我们将有三个组件在单独的进程中运行。将提醒文件侦听器对指定位置的文件进行更改。web CLI 界面将允许用户增加或修改文件,备份过程将绑定到侦听器,以提供新文件更改的自动副本。考虑到这一点,这三个过程看起来有点像下图所示:

Designing our concurrent application

我们的文件侦听器进程将完成以下三件事:

  • 密切关注任何文件更改
  • 广播到我们的 web/CLI 服务器和备份过程
  • 维护数据库/数据存储中任何给定文件的状态

备份过程将接受来自文件侦听器(#2)的任何广播,并以迭代设计方式创建备份文件。

我们的通用服务器(web 和 CLI)将报告单个文件的详细信息,并允许使用可定制的语法进行前后版本控制。当提交新文件或请求修订时,应用的这一部分还必须广播回文件侦听器。

确定我们的需求

在我们的架构设计过程中,最关键的一步是放大我们需要实现的功能、包和技术。对于我们的文件管理和版本控制应用,有几个关键点将非常突出:

  • 允许文件上传、下载和修订的 web 界面。
  • 一个命令行界面,允许我们直接回滚更改和修改文件。
  • 查找对共享位置所做更改的文件系统侦听器。
  • 一个数据存储系统,具有强大的 Go 连接,允许我们以最一致的方式维护有关文件和用户的信息。该系统还将维护用户记录。
  • 一种并发日志系统,用于维护和循环已更改文件的日志。

通过允许以下三种不同的方式与整个应用接口,我们使事情变得有些复杂:

  • 通过需要用户和登录的 Web。这还允许我们的用户访问和修改文件,即使它们恰好位于未连接到共享驱动器的位置。
  • 通过命令行。这是过时的,但在用户遍历文件系统时,尤其是不在 GUI 中的高级用户,这也是非常有价值的。
  • 通过改变自身的文件系统。这是一种共享驱动机制,我们假设任何有权访问该机制的用户都将对任何文件进行有效修改。

为了解决所有这些问题,我们可以确定以下几个关键技术:

  • 用于管理文件系统修订的数据库或数据存储。在 NoSQL 中选择事务性、符合 ACID 的 SQL 和 fast 文档存储时,通常需要权衡性能和一致性。然而,由于我们的大多数锁定机制都存在于应用中,因此复制锁(即使在行级别)将增加我们不需要的潜在缓慢和粗糙程度。因此,我们将使用 NoSQL 解决方案。
  • 这个解决方案需要很好地处理并发性。
  • 我们将使用一个 web 界面,它引入了强大而干净的路由/多路复用,并与 Go 强大的内置模板系统配合良好。
  • 一个文件系统通知库,允许我们监视对文件的更改以及备份修订。

我们为满足这些需求而发现或构建的任何解决方案都需要高度并发和非阻塞。我们希望确保不允许同时更改文件,包括更改内部修订本身。

考虑到所有这些,让我们逐一确定我们的各个部分,并决定它们将如何在我们的应用中发挥作用。

我们还将介绍一些选项,这些选项可以在不影响功能或核心需求的情况下进行交换。当平台或偏好使我们的主要选择令人不快时,这将允许一些灵活性。在我们设计应用的任何时候,如果软件(或其使用条款)发生变化,或者在未来的规模上使用它不再令人满意,那么知道还有什么是一个好主意。

让我们从数据存储开始。

在 Go 中使用 NoSQL 作为数据存储

使用 NoSQL 的最大让步之一显然是,在 CRUD 操作(创建、读取、更新和删除)方面缺乏标准化。自 1986 年以来,SQL 已被标准化,从 MySQL 到 SQL Server,从 Microsoft 和 Oracle 一直到 PostgreSQL,在许多数据库中都是非常严密的。

Note

您可以在上阅读更多关于 NoSQL 和各种 NoSQL 平台的http://nosql-database.org/

Martin Fowler 还在他的书NoSQL Decreated中的中对该概念和一些用例做了一个广受欢迎的介绍 http://martinfowler.com/books/nosql.html

根据 NoSQL 平台的不同,还可能会失去耐酸性和耐用性。这意味着您的数据不是 100%安全的。如果服务器崩溃,如果对过时或不存在的数据进行读取,等等,可能会发生事务性丢失。后者被称为脏读。

这一点非常值得注意,因为它适用于我们的应用,特别是并发性,因为我们在前面的章节中已经讨论了一个潜在的第三方瓶颈。

对于我们在 Go 中的文件共享应用,我们将利用 NoSQL 存储有关文件以及修改/与这些文件交互的用户的元数据。

在这里使用 NoSQL 数据存储时,我们有很多选择,几乎所有的大型数据存储都有一个库或接口。当我们在这里讨论 Couchbase 时,我们将简要地讨论游戏中的其他一些大玩家,以及它们各自的优点。

下面几节中的代码片段还应该让您了解如何在不太担心的情况下将 Couchbase 切换到其他任何一款。虽然我们没有深入讨论其中任何一个,但维护文件和修改信息的代码将尽可能通用,以确保易于交换。

蒙哥达

MongoDB是目前最流行的 NoSQL 平台之一。写在 2009 年,它也是最成熟的平台之一,但伴随着一些折衷,使得它在最近几年有些失宠。

Even so, Mongo does what it does in a reliable fashion and with a great deal of speed. Utilizing indices, as is the case with most databases and data stores, improves query speed on reads greatly.

Mongo 还允许对应用于读、写和一致性的保证进行一些非常精细的控制。您可以将其视为支持语法脏读的任何语言和/或引擎的非常模糊的模拟。

最重要的是,Mongo 在 Go 中很容易支持并发,并且隐式地设计为在分布式系统中工作。

Mongo 最大的 Go 接口是mgo,其可在上找到 http://godoc.org/labix.org/v2/mgo

Should you wish to experiment with Mongo in Go, it's a relatively straightforward process to take your data store record and inject it into a custom struct. The following is a quick and dirty example:

import
(
    "labix.org/v2/mgo"
    "labix.org/v2/mgo/bson"
)

type User struct {
  name string
}

func main() {
  servers, err := mgo.Dial("localhost")
  defer servers.Close()
  data := servers.DB("test").C("users")
  result := User{}
  err = c.Find(bson.M{"name": "John"}).One(&result)
}

与其他 NoSQL 解决方案相比,Mongo 的一个缺点是默认情况下没有任何 GUI。这意味着我们要么需要绑定另一个应用或 web 服务,要么坚持使用命令行来管理其数据存储。对于许多应用来说,这并不是什么大问题,但我们希望将此项目保持尽可能的分区化和省区化,以限制故障点。

Mongo 在容错和数据丢失方面也受到了一些批评,但许多 NoSQL 解决方案也是如此。此外,它在许多方面都是快速数据存储的一项功能,因此灾难恢复往往以牺牲速度和性能为代价。

也可以公平地说,这是对 Mongo 及其同龄人普遍夸大的批评。Mongo 会发生什么坏事吗?当然基于 Oracle 的托管系统是否也会发生这种情况?绝对地缓解这一领域的大规模故障更多的是系统管理员的责任,而不是软件本身,软件本身只能提供设计此类应急计划所需的工具。

综上所述,我们希望有一个快速、高可用的管理界面,因此 Mongo 满足了我们的需求,但如果这些需求价值不高,可以很容易地插入到这个解决方案中。

Redis

Redis 是另一个关键/价值数据存储,截至最近,在总使用率和流行度方面排名第一。在理想的 Redis 世界中,整个数据集都保存在内存中。考虑到许多数据集的大小,这并不总是可能的;然而,再加上 Redis 避免持久性的能力,这在并发应用中使用时可能会产生一些非常高的性能结果。

Redis 的另一个有用特性是它可以固有地保存不同的数据结构。虽然可以通过在 Mongo(和其他数据存储)中解组 JSON 对象/数组来抽象这些数据,但 Redis 可以处理集合、字符串、数组和散列。

Go 中的 Redis 有两个主要公认的库:

  • 基数:这是一个简约主义客户,赤裸裸、快速、肮脏。要安装基数,请运行以下命令:

    go get github.com/fzzy/radix/redis
  • Redigo:这个更健壮,更复杂,但提供了很多我们可能不需要的更复杂的功能。要安装 Redigo,请运行以下命令:

    go get github.com/garyburd/redigo/redis

我们现在将看到一个快速示例,使用Redigo 从 Redis 中的Users数据存储中获取用户名:

package main

import
(
    "fmt"
    "github.com/garyburd/redigo/redis"
)

func main() {

  connection,_ := dial()
  defer connection.Close()

  data, err := redis.Values(connection.Do("SORT", "Users", "BY", "User:*->name", 
    "GET", "User:*->name"))

  if (err) {
    fmt.Println("Error getting values", err)
  }

  for i:= range data {
    var Uname string
    data,err := redis.Scan(data, &Uname)
    if (err) {
      fmt.Println("Error getting value",err)
    }else {
      fmt.Println("Name Uname")
    }
  }
}

综上所述,您可能会注意到一些非编程访问语法,例如:

  data, err := redis.Values(connection.Do("SORT", "Users", "BY", "User:*->name", 
    "GET", "User:*->name"))

这确实是为什么我们不会选择 Redis in Go 的原因之一。这里的两个库都提供了对某些功能的接近 API 级别的访问,并带有一些更详细的内置功能,用于直接交互。Do命令直接将查询传递给 Redis,如果您需要使用该库,这很好,但这是一个有点不雅观的全面解决方案。

这两个库都很好地利用了 Go 的并发特性,并且您可以通过它们中的任何一个对 Redis 进行无阻塞的网络调用。

值得注意的是,Redis 只支持针对 Windows 的实验构建,因此它主要用于*nix 平台。确实存在的端口来自微软,可在找到 https://github.com/MSOpenTech/redis

铁点

如果你对 NoSQL 做了很多工作,那么前面的引擎对你来说可能都很熟悉。Redis、Coach、Mongo 等都是相对年轻的技术领域的忠实拥护者。

Tiedot, on the other hand, probably isn't as familiar. We're including it here only because the document store itself is written in Go directly. Document manipulation is handled primarily through a web interface, and it's a JSON document store like several other NoSQL solutions.

由于文档访问和处理是通过 HTTP 进行管理的,因此有一个有点违反直觉的工作流,如下所示:

Tiedot

由于这会导致潜在的延迟或失败,因此这并不是我们应用的理想解决方案。请记住,这也是前面提到的一些其他解决方案的一个功能,但由于 Tiedot 是用 Go 编写的,因此使用包连接到它并读取/修改数据将非常容易。虽然这本书正在编写中,但它并不存在。

与其他以 HTTP 或 REST 为中心的替代方案(如 CouchDB)不同,Tiedot 依赖 URL 端点来指示操作,而不是 HTTP 方法。

您可以在以下代码中看到我们如何通过标准库处理类似的内容:

package main

import
(
  "fmt"
  "json"
  "http"
)

type Collection struct {
  Name string
}

简单地说,这是您希望通过数据选择、查询等将任何记录引入 Go 应用的数据结构。您在我们之前使用 SQL Server 本身时看到了这一点,这与之前的使用没有任何区别:

func main() {

  Col := Collection{
    Name: ''
  }

  data, err := http.Get("http://localhost:8080/all")
  if (err != nil) {
    fmt.Println("Error accessing tiedot")
  }
  collections,_ = json.Unmarshal(data,&Col)
}

虽然 Tiedot 不如许多同行那样健壮、强大或可扩展,但它确实值得一玩,或者更好的是,为之做出贡献。

You can find Tiedot at https://github.com/HouzuoGuo/tiedot.

CouchDB

来自 Apache 孵化器的 CouchDB是 NoSQL 大数据的另一个大男孩。作为 JSON 文档存储,CouchDB 在数据存储方法方面提供了很大的灵活性。

CouchDB 支持ACID 语义,并且可以并发执行,如果绑定到这些属性,这将提供大量性能优势。在我们的应用中,对酸稠度的依赖有些灵活。根据设计,它将具有容错性和可恢复性,但对于许多人来说,即使是具有可恢复性的数据丢失的可能性也被认为是灾难性的。

与 CouchDB 的接口是通过 HTTP 实现的,这意味着不需要直接实现或 GoSQL 数据库钩子来使用它。有趣的是,CouchDB 使用 HTTP 头语法操作数据,如下所示:

  • 获取:此表示读取操作
  • PUT:此表示创建操作
  • 删除:此表示删除更新操作

当然,这些都是 HTTP 1.1 中最初打算使用的头方法,但是 Web 上的很多内容都集中在 GET/POST 上,以至于这些方法往往在这场争论中迷失了方向。

Coach还提供了方便的管理 web 界面。当 CouchDB 运行时,您可以在http://localhost:5984/_utils/处访问它,如以下屏幕截图所示:

CouchDB

也就是说,有一些包装器为一些更复杂和更高级的特性提供了抽象级别。

卡桑德拉

卡桑德拉,另一个 Apache 基金会项目,不是技术上的 NoSQL 解决方案,而是一个集群(或集群)数据库管理平台。

与许多 NoSQL 应用一样,Cassandra 中的传统查询方法也存在局限性,例如,通常不支持子查询和联接。

我们在这里提到主要是因为它关注分布式计算,以及通过编程调整数据一致性或性能是否更重要的能力。我们的解决方案 Couchbase 中同样表达了这一点,但 Cassandra 更关注分布式数据存储。

然而,Cassandra 确实支持 SQL 的一个子集,这将使涉足 MySQL、PostgreSQL 或类似领域的开发人员更加熟悉 SQL。Cassandra 对高度并发集成的内置处理在许多方面使其成为 Go 的理想选择,尽管这对于这个项目来说是一种过分的技巧。

与 Cassandra 接口的最值得注意的库是 gocql,它关注速度和与 Cassandra 连接的干净连接。若您选择使用 Cassandra 代替 Couchbase(或其他 NoSQL),您将发现许多方法可以简单地替换。

以下是连接到集群并编写简单查询的示例:

package main

import
(
    "github.com/gocql/gocql"
    "log"
)

func main() {

  cass := gocql.NewCluster("127.0.0.1")
  cass.Keyspace = "filemaster"
  cass.Consistency = gocql.LocalQuorum

  session, _ := cass.CreateSession()
  defer session.Close()

  var fileTime int;

  if err := session.Query(`SELECT file_modified_time FROM filemaster 
  WHERE filename = ? LIMIT 1`, "test.txt").Consistency(gocql.One).Scan(&fileTime); err != nil {
    log.Fatal(err)
  }
  fmt.Println("Last modified",fileTime)
}

Cassandra可能是一个理想的解决方案,如果您计划快速扩展此应用,将其广泛分发,或者使用 SQL 比使用 data store/JSON 访问舒服得多。

For our purposes here, SQL is not a requirement and we value speed over anything else, including durability.

沙发床

Couchbase在该领域是一个相对较新的产品,但它是由 CouchDB 和 memcached 的人共同建造的。它是用 Erlang 编写的,在并发性、速度和非阻塞行为方面与我们从大量 Go 应用中所期望的有许多相同之处。

Couchbase 还支持我们在前几章中讨论过的许多其他功能,包括易于基于分发的安装、可调节的 ACID 兼容性和低资源消耗。

Couchbase 的一个警告是,它在一些资源较低的机器或虚拟机上运行不好(或者根本运行不好)。事实上,64 位安装至少需要 4 GB 内存和 4 个内核,所以不要考虑在小型、小型、甚至中型实例或旧硬件上启动它。

虽然本文(或其他地方)介绍的大多数 NoSQL 解决方案总体上比 SQL 解决方案提供了性能优势,但 Couchbase 在 NoSQL 领域中的表现与同行相比非常出色。

Couchbase(如 CouchDB)提供了一个基于 web 的图形界面,简化了安装和维护过程。在设置中,您可以使用的高级功能包括基本存储桶存储引擎(Couchbase 或 memcached)、自动备份过程(副本)和读写并发级别。

In addition to configuration and management tools, it also presents some real-time monitoring in the web dashboard as shown in the following screenshot:

Couchbase

虽然不能替代全面的服务器管理(当服务器停止运行而您没有洞察时会发生什么情况),但在不需要命令行方法或外部工具的情况下准确地知道资源的去向非常有用。

Couchbase 中的方言略有不同,这在许多解决方案中都是如此。将 NoSQL 与陈旧的 SQL 解决方案略微分离的初衷将不时出现。

With Couchbase, a database is a data bucket and records are documents. However, views, an old transactional SQL standby, bring a bit of familiarity to the table. The big takeaway here is views allow you to create more complex queries using simple JavaScript, in some cases, replicating otherwise difficult features such as joins, unions, and pagination.

在 Couchbase 中创建的每个视图都成为 HTTP 访问点。因此,您命名为select_all_files的视图可以通过http://localhost:8092/file_manager/_design/select_all_files/_view/Select%20All%20Files?connection_timeout=60000&limit=10&skip=0等 URL 访问。

最值得注意的 Couchbase 接口库是 Go Couchbase,如果没有其他功能的话,它可能会使您避免在代码中进行 HTTP 调用以访问 CouchDB 的冗余。

Go Couchbase 可在找到 https://github.com/couchbaselabs/go-couchbase

Go Couchbase 通过 Go 抽象使与 Couchbase 的接口变得简单而强大。下面的代码以一种让人感觉原生的精益方式连接并获取有关各种数据池的信息:

package main

import
(
  "fmt"
  "github.com/couchbaselabs/go-couchbase"
)

func main() {

    conn, err := couchbase.Connect("http://localhost:8091")
    if err != nil {
      fmt.Println("Error:",err)
    }
    for _, pn := range conn.Info.Pools {
        fmt.Printf("Found pool:  %s -> %s\n", pn.Name, pn.URI)
    }
}

Setting up our data store

安装Couchbase 后,您可以在 localhost 和端口 8091 默认访问其管理面板。

您将有机会设置管理员、要连接的其他 IP(如果您要加入群集)和常规数据存储设计。

之后,您需要设置一个 bucket,我们将使用它来存储有关单个文件的所有信息。以下是 bucket 设置的界面:

Setting up our data store

在我们的示例中,我们在一台机器上工作,因此不支持复制(在数据库术语中也称为复制)。我们把它命名为file_manager,但这显然可以称为任何有意义的东西。

我们还将数据使用率保持在相当低的水平,在存储文件操作和记录旧操作时,不需要超过 256MB 的内存。换句话说,我们不一定要将test.txt的修改历史永久保存在内存中。

我们还将继续使用 Couchbase 作为存储引擎的等效工具,尽管您可以使用 memcache(d)来回切换,而不会有太多明显的变化。

让我们先创建一个种子文档:稍后将删除该文档,但它将表示数据存储的模式。我们可以使用任意 JSON 结构化对象创建此文档,如以下屏幕截图所示:

Setting up our data store

由于此数据存储中存储的所有内容都应该是有效的 JSON,因此我们可以混合和匹配字符串、整数、布尔、数组和对象。这为我们在使用什么数据方面提供了一些灵活性。以下是一个示例文档:

{
  "file_name": "test.txt",
  "hash": "",
  "created": 1,
  "created_user": 0,
  "last_modified": "",
  "last_modified_user": "",
  "revisions": [],
  "version": 1
}

监控文件系统更改

当谈到 NoSQL 选项时,我们有各种各样的解决方案可供选择。对于监视文件系统更改的应用,情况并非如此。虽然 Linux flavors 在 inotify 中有一个相当好的内置解决方案,但这确实限制了应用的可移植性。

所以 Chris Howey 的 fsnotify 中有一个跨平台的库来处理这个问题,这是非常有用的。

Fsnotify在 Linux、OSX 和 Windows 上工作,允许我们检测任何给定目录中的文件何时被创建、删除、修改或重命名,这对于我们来说已经足够了。

实现 fsnotify 也不容易。最重要的是,它是无阻塞的,因此如果我们将侦听器放在 goroutine 后面,我们可以将其作为主服务器应用代码的一部分运行。

以下代码显示了一个简单的目录侦听器:

package main

import (
    "github.com/howeyc/fsnotify""fmt"
  "log""
)

func main() {

    scriptDone := make(chan bool)
    dirSpy, err := fsnotify.NewWatcher()
    if err != nil {
        log.Fatal(err)
    }

    go func() {
        for {
            select {
            case fileChange := <-dirSpy.Event:
                log.Println("Something happened to a file:", 
                  fileChange)
            case err := <-dirSpy.Error:
                log.Println("Error with fsnotify:", err)
            }
        }
    }()

    err = dirSpy.Watch("/mnt/sharedir")
    if err != nil {
      fmt.Println(err)
    }

    <-scriptDone

    dirSpy.Close()
}

管理日志文件

与开发人员工具箱中的许多基本功能一样,Go 提供了一个相当完整的内置日志解决方案。它处理许多基础工作,例如创建带有时间戳标记的日志项和保存到磁盘或控制台。

基本包遗漏的一件事是内置格式和日志循环,这是我们的文件管理器应用的关键要求。

请记住,我们的应用的关键需求包括在并发环境中无缝工作的能力,以及在需要时准备扩展到分布式网络的能力。这就是精细的log4go应用派上用场的地方。Log4go 允许日志记录到文件、控制台和内存,并固有地处理日志旋转。

Log4go可在找到 https://code.google.com/p/log4go/

要安装 Log4go,请运行以下命令:

go get code.google.com/p/log4go

Creating a logfile that handles warnings, notices, debug information, and critical errors is simple and appending log rotation to that is similarly simple, as shown in the following code:

package main

import
(
  logger "code.google.com/p/log4go"
)
func main() {
  logMech := make(logger.Logger);
  logMech.AddFilter("stdout", logger.DEBUG, 
    logger.NewConsoleLogWriter())

  fileLog := logger.NewFileLogWriter("log_manager.log", false)
  fileLog.SetFormat("[%D %T] [%L] (%S) %M")
  fileLog.SetRotate(true)
  fileLog.SetRotateSize(256)
  fileLog.SetRotateLines(20)
  fileLog.SetRotateDaily(true)
  logMech.AddFilter("file", logger.FINE, fileLog)

  logMech.Trace("Received message: %s)", "All is well")
  logMech.Info("Message received: ", "debug!")
  logMech.Error("Oh no!","Something Broke")
}

处理配置文件

When it comes to configuration files and parsing them, you have a lot of options, from simple to complicated.

当然,我们可以简单地将所需内容存储在 JSON 中,但这种格式直接用于人类有点棘手,它需要转义字符等等,这使得它容易出错。

相反,我们将通过使用 gcfg 中的标准ini config文件库来保持简单,该库处理gitconfig文件和传统的老式.ini格式,如以下代码片段所示:

[revisions]
count = 2
revisionsuffix = .rev
lockfiles = false

[logs]
rotatelength = 86400

[alarms]
emails = sysadmin@example.com,ceo@example.com

您可以在找到 gcfghttps://code.google.com/p/gcfg/

本质上,这个库获取配置文件的值,并将它们推送到 Go 中的结构中。我们将如何做到这一点的示例如下:

package main

import
(
  "fmt"
  "code.google.com/p/gcfg"
)

type Configuration struct {
  Revisions struct {
    Count int
    Revisionsuffix string
    Lockfiles bool
  }
  Logs struct {
    Rotatelength int
  }
  Alarms struct {
    Emails string
  }
}

func main() {
  configFile := Configuration{}
  err := gcfg.ReadFileInto(&configFile, "example.ini")
  if err != nil {
    fmt.Println("Error",err)
  }
  fmt.Println("Rotation duration:",configFile.Logs.Rotatelength)
}

检测文件更改

现在我们需要关注我们的文件侦听器。您可能还记得,这是应用的一部分,它将接受来自 web 服务器和备份应用的客户端连接,并宣布对文件的任何更改。

本部分的基本流程如下:

  1. 侦听对 goroutine 中文件的更改。
  2. 接受连接并添加到 goroutine 中的池中。
  3. 如果检测到任何更改,请向整个池宣布这些更改。

这三种情况都是同时发生的,第一种和第三种情况可以在池中没有任何连接的情况下发生,尽管我们假设 web 服务器和备份应用都会始终保持连接。

文件侦听器将完成的另一个关键角色是在第一次加载时分析目录,并将其与 Couchbase 中的数据存储进行协调。由于 Go Couchbase 库处理获取、更新和添加操作,因此我们不需要任何自定义视图。在以下代码中,我们将检查文件侦听器进程,并展示如何侦听文件夹中的更改:

package main

import
(
  "fmt"
  "github.com/howeyc/fsnotify"
  "net"
  "time"
  "io"  
  "io/ioutil"
  "github.com/couchbaselabs/go-couchbase"
  "crypto/md5"
  "encoding/hex"
  "encoding/json"  
  "strings"

)

var listenFolder = "mnt/sharedir"

type Client struct {
  ID int
  Connection *net.Conn  
}

在这里,我们声明了我们的共享文件夹以及一个连接的Client结构。在此应用中,Client是 web 侦听器或备份侦听器,我们将使用以下 JSON 编码结构向一个方向传递消息:

type File struct {
  Hash string "json:hash"
  Name string "json:file_name"
  Created int64 "json:created"
  CreatedUser  int "json:created_user"
  LastModified int64 "json:last_modified"
  LastModifiedUser int "json:last_modified_user"
  Revisions int "json:revisions"
  Version int "json:version"
}

如果这看起来很熟悉,可能是因为它也是我们最初设置的示例文档格式。

Note

如果您不熟悉前面表达的语法 sugar,那么这些被称为 struct 标记。标记只是一段附加元数据,可通过reflect包应用于结构字段以进行键/值查找。在本例中,它们用于将结构字段映射到 JSON 字段。

我们先来看看我们的整体Message struct

type Message struct {
  Hash string "json:hash"
  Action string "json:action"
  Location string "json:location"  
  Name string "json:name"
  Version int "json:version"
}

我们将我们的文件划分为一条消息,该消息提醒我们的其他两个更改过程:

func generateHash(name string) string {

  hash := md5.New()
  io.WriteString(hash,name)
  hashString := hex.EncodeToString(hash.Sum(nil))

  return hashString
}

这是一种不太可靠的方法来生成对文件的哈希引用,如果文件名发生更改,该方法将失败。但是,它允许我们跟踪创建、删除或修改的文件。

向客户端发送更改

以下是发送到所有现有连接的广播消息。我们传递 JSON 编码的Message结构以及当前版本、当前位置和哈希值,以供参考。我们的其他服务器将做出相应的反应:

func alertServers(hash string, name string, action string, location string, version int) {

  msg := Message{Hash:hash,Action:action,Location:location,Name:name,Version:version}
  msgJSON,_ := json.Marshal(msg)

  fmt.Println(string(msgJSON))

  for i := range Clients {
    fmt.Println("Sending to clients")
    fmt.Fprintln(*Clients[i].Connection,string(msgJSON))
  }
}

Our backup server will create a copy of that file with the .[VERSION] extension in the backup folder.

我们的 web 服务器只需通过我们的 web 界面提醒用户文件已更改:

func startServer(listener net.Listener) {
  for {  
    conn,err := listener.Accept()
    if err != nil {

    }
    currentClient := Client{ ID: 1, Connection: &conn}
    Clients = append(Clients,currentClient)
      for i:= range Clients {
        fmt.Println("Client",Clients[i].ID)
      }    
  }  

}

这个代码看起来熟悉吗?我们已经将几乎完全相同的聊天服务器Client处理程序带到这里,几乎完好无损:

func removeFile(name string, bucket *couchbase.Bucket) {
  bucket.Delete(generateHash(name))
}

removeFile函数只做一件事,那就是从我们的 Couchbase 数据存储中删除文件。由于它是被动的,我们不需要在文件服务器端做任何事情,因为文件已经被删除了。此外,不需要删除任何备份,因为这样可以恢复。接下来,让我们看看更新现有文件的函数:

func updateExistingFile(name string, bucket *couchbase.Bucket) int {
  fmt.Println(name,"updated")
  hashString := generateHash(name)

  thisFile := Files[hashString]
  thisFile.Hash = hashString
  thisFile.Name = name
  thisFile.Version = thisFile.Version + 1
  thisFile.LastModified = time.Now().Unix()
  Files[hashString] = thisFile
  bucket.Set(hashString,0,Files[hashString])
  return thisFile.Version
}

此函数实质上用新值覆盖 Couchbase 中的任何值,复制现有的File结构并更改LastModified日期:

func evalFile(event *fsnotify.FileEvent, bucket *couchbase.Bucket) {
  fmt.Println(event.Name,"changed")
  create := event.IsCreate()
  fileComponents := strings.Split(event.Name,"\\")
  fileComponentSize := len(fileComponents)
  trueFileName := fileComponents[fileComponentSize-1]
  hashString := generateHash(trueFileName)

  if create == true {
    updateFile(trueFileName,bucket)
    alertServers(hashString,event.Name,"CREATE",event.Name,0)
  }
  delete := event.IsDelete()
  if delete == true {
    removeFile(trueFileName,bucket)
    alertServers(hashString,event.Name,"DELETE",event.Name,0)    
  }
  modify := event.IsModify()
  if modify == true {
    newVersion := updateExistingFile(trueFileName,bucket)
    fmt.Println(newVersion)
    alertServers(hashString,trueFileName,"MODIFY",event.Name,newVersion)
  }
  rename := event.IsRename()
  if rename == true {

  }
}

在这里,我们对监视目录中的文件系统的任何更改作出反应。我们不会对重命名做出反应,但您也可以处理这些问题。下面是我们如何处理一般的updateFile函数:

func updateFile(name string, bucket *couchbase.Bucket) {
  thisFile := File{}
  hashString := generateHash(name)

  thisFile.Hash = hashString
  thisFile.Name = name
  thisFile.Created = time.Now().Unix()
  thisFile.CreatedUser = 0
  thisFile.LastModified = time.Now().Unix()
  thisFile.LastModifiedUser = 0
  thisFile.Revisions = 0
  thisFile.Version = 1

  Files[hashString] = thisFile

  checkFile := File{}
  err := bucket.Get(hashString,&checkFile)
  if err != nil {
    fmt.Println("New File Added",name)
    bucket.Set(hashString,0,thisFile)
  }
}

对照 Couchbase 检查记录

当涉及到根据 Couchbase 检查现有记录时,我们检查 Couchbase bucket 中是否存在哈希。如果没有,我们就创建它。如果有,我们什么也不做。为了更可靠地处理关闭,我们还应该将现有记录吸收到应用中。执行此操作的代码如下所示:

var Clients []Client
var Files map[string] File

func main() {
  Files = make(map[string]File)
  endScript := make(chan bool)

  couchbaseClient, err := couchbase.Connect("http://localhost:8091/")
    if err != nil {
      fmt.Println("Error connecting to Couchbase", err)
    }
  pool, err := couchbaseClient.GetPool("default")
    if err != nil {
      fmt.Println("Error getting pool",err)
    }
  bucket, err := pool.GetBucket("file_manager")
    if err != nil {
      fmt.Println("Error getting bucket",err)
    }  

  files, _ := ioutil.ReadDir(listenFolder)
  for _, file := range files {
    updateFile(file.Name(),bucket)
  }

    dirSpy, err := fsnotify.NewWatcher()
    defer dirSpy.Close()

  listener, err := net.Listen("tcp", ":9000")
  if err != nil {
    fmt.Println ("Could not start server!",err)
  }

  go func() {
        for {
            select {
            case ev := <-dirSpy.Event:
                evalFile(ev,bucket)
            case err := <-dirSpy.Error:
                fmt.Println("error:", err)
            }
        }
    }()
    err = dirSpy.Watch(listenFolder)  
  startServer(listener)

  <-endScript
}

Finally, main() handles setting up our connections and goroutines, including a file watcher, our TCP server, and connecting to Couchbase.

现在,让我们看一下整个过程中的另一个步骤,我们将自动创建修改文件的备份。

Backing up our files

因为我们正在通过线路发送我们的命令,可以说,我们的备份过程需要在该线路上进行监听,并对任何更改做出响应。鉴于修改将通过 localhost 发送,我们应该在网络和文件端都有最小的延迟。

我们还将返回一些关于该文件发生了什么的信息,尽管在这一点上我们并没有对这些信息做太多的处理。其代码如下:

package main

import
(
  "fmt"
  "net"
  "io"
  "os"
  "strconv"
  "encoding/json"
)

var backupFolder = "mnt/backup/"

请注意,在本例中,我们在 Windows 计算机上有一个单独的备份文件夹。如果我们碰巧不小心使用了同一个目录,我们将面临无限复制和备份文件的风险。在下面的代码片段中,我们将看到Message结构本身和backup函数,这部分应用的核心:

type Message struct {
  Hash string "json:hash"
  Action string "json:action"
  Location string "json:location"
  Name string "json:name"  
  Version int "json:version"
}

func backup (location string, name string, version int) {

  newFileName := backupFolder + name + "." + 
    strconv.FormatInt(int64(version),10)
  fmt.Println(newFileName)
  org,_ := os.Open(location)
  defer org.Close()
  cpy,_ := os.Create(newFileName)
  defer cpy.Close()
  io.Copy(cpy,org)
}

这是我们的基本文件操作。Go 没有一步复制功能;相反,您需要创建一个文件,然后使用io.Copy将另一个文件的内容复制到其中:

func listen(conn net.Conn) {
  for {

      messBuff := make([]byte,1024)
    n, err := conn.Read(messBuff)
    if err != nil {

    }

    resultMessage := Message{}
    json.Unmarshal(messBuff[:n],&resultMessage)

    if resultMessage.Action == "MODIFY" {
      fmt.Println("Back up file",resultMessage.Location)
      newVersion := resultMessage.Version + 1
      backup(resultMessage.Location,resultMessage.Name,newVersion)
    }

  }

}

对于我们的聊天客户端的listen()功能来说,这段代码几乎是一字不差,除了我们获取流式 JSON 数据的内容,解组并将其转换为Message{}结构,然后再转换为File{}结构。最后,我们来看一下main函数和 TCP 初始化:

func main() {
  endBackup := make(chan bool)
  conn, err := net.Dial("tcp","127.0.0.1:9000")
  if err != nil {
    fmt.Println("Could not connect to File Listener!")
  }
  go listen(conn)

  <- endBackup
}

设计我们的 web 界面

为了与文件系统进行交互,我们需要一个界面来显示所有当前文件的版本、上次修改时间和更改警报,并允许拖放创建/替换文件。

获取文件列表很简单,因为我们将直接从file_manager沙发桶中获取它们。更改将通过我们的文件管理器进程通过 TCP 发送,这将触发一个 API 调用,为我们的 web 用户阐明对文件的更改。

我们在这里使用的一些方法与我们在备份过程中使用的方法是重复的,当然可以从一些整合中获益;不过,以下是 web 服务器的代码,它允许上载并显示更改通知:

package main

import
(
  "net"
  "net/http"
  "html/template"
  "log"
  "io"
  "os"
  "io/ioutil"
  "github.com/couchbaselabs/go-couchbase"
  "time"  
  "fmt"
  "crypto/md5"
  "encoding/hex"
  "encoding/json"
)

type File struct {
  Hash string "json:hash"
  Name string "json:file_name"
  Created int64 "json:created"
  CreatedUser  int "json:created_user"
  LastModified int64 "json:last_modified"
  LastModifiedUser int "json:last_modified_user"
  Revisions int "json:revisions"
  Version int "json:version"
}

对于示例,这与我们在文件侦听器和备份过程中使用的File结构相同:

type Page struct {
  Title string
  Files map[string] File
}

我们的Page结构表示通用 web 数据,这些数据被转换为网页模板的相应变量:

type ItemWrapper struct {

  Items []File
  CurrentTime int64
  PreviousTime int64

}

type Message struct {
  Hash string "json:hash"
  Action string "json:action"
  Location string "json:location"
  Name string "json:name"  
  Version int "json:version"
}

ItemWrapper结构只是一个 JSON 包装器,它将保存一个从Files结构转换而来的数组。这对于在客户端循环使用 API 的 JSON 非常重要。我们的Message结构与我们在文件侦听器和备份过程中看到的类型相同。在以下代码段中,我们将指定一些常规配置变量和哈希生成函数:

var listenFolder = "/wamp/www/shared/"
var Files map[string] File
var webTemplate = template.Must(template.ParseFiles("ch8_html.html"))
var fileChange chan File
var lastChecked int64

func generateHash(name string) string {

  hash := md5.New()
  io.WriteString(hash,name)
  hashString := hex.EncodeToString(hash.Sum(nil))

  return hashString
}

我们的md5散列方法也与此应用相同。值得注意的是,每次从文件侦听器获得信号时,我们都会派生一个lastChecked变量,它是 Unix 风格的时间戳。我们使用它与客户端上的文件更改进行比较,以了解是否要在 Web 上提醒用户。现在我们来看一下 web 界面的updateFile功能:

func updateFile(name string, bucket *couchbase.Bucket) {
  thisFile := File{}
  hashString := generateHash(name)

  thisFile.Hash = hashString
  thisFile.Name = name
  thisFile.Created = time.Now().Unix()
  thisFile.CreatedUser = 0
  thisFile.LastModified = time.Now().Unix()
  thisFile.LastModifiedUser = 0
  thisFile.Revisions = 0
  thisFile.Version = 1

  Files[hashString] = thisFile

  checkFile := File{}
  err := bucket.Get(hashString,&checkFile)
  if err != nil {
    fmt.Println("New File Added",name)
    bucket.Set(hashString,0,thisFile)
  }else {
    Files[hashString] = checkFile
  }
}

This is the same function as our backup process, except that instead of creating a duplicate file, we simply overwrite our internal File struct to allow it represent its updated LastModified value when the /api is next called. And as with our last example, let's check out the listen() function:

func listen(conn net.Conn) {
  for {

      messBuff := make([]byte,1024)
    n, err := conn.Read(messBuff)
    if err != nil {

    }
    message := string(messBuff[:n])
    message = message[0:]

    resultMessage := Message{}
    json.Unmarshal(messBuff[:n],&resultMessage)

    updateHash := resultMessage.Hash
    tmp := Files[updateHash]
    tmp.LastModified = time.Now().Unix()
    Files[updateHash] = tmp
  }

}

这里是我们的消息被读取、解组和设置为散列映射键的地方。如果不存在,将创建一个文件;如果存在,将更新当前文件。接下来,我们来看看main()函数,它设置了我们的应用和 web 服务器:

func main() {
  lastChecked := time.Now().Unix()
  Files = make(map[string]File)
  fileChange = make(chan File)
  couchbaseClient, err := couchbase.Connect("http://localhost:8091/")
    if err != nil {
      fmt.Println("Error connecting to Couchbase", err)
    }
  pool, err := couchbaseClient.GetPool("default")
    if err != nil {
      fmt.Println("Error getting pool",err)
    }
  bucket, err := pool.GetBucket("file_manager")
    if err != nil {
      fmt.Println("Error getting bucket",err)
    }    

  files, _ := ioutil.ReadDir(listenFolder)
  for _, file := range files {
    updateFile(file.Name(),bucket)
  }

  conn, err := net.Dial("tcp","127.0.0.1:9000")
  if err != nil {
    fmt.Println("Could not connect to File Listener!")
  }
  go listen(conn)

  http.HandleFunc("/api", func(w http.ResponseWriter, r 
    *http.Request) {
    apiOutput := ItemWrapper{}
    apiOutput.PreviousTime = lastChecked
    lastChecked = time.Now().Unix()
    apiOutput.CurrentTime = lastChecked

    for i:= range Files {
      apiOutput.Items = append(apiOutput.Items,Files[i])
    }
    output,_ := json.Marshal(apiOutput)
    fmt.Fprintln(w,string(output))

  })
  http.HandleFunc("/", func(w http.ResponseWriter, r 
    *http.Request) {
    output := Page{Files:Files,Title:"File Manager"}
    tmp, _ := template.ParseFiles("ch8_html.html")
    tmp.Execute(w, output)
  })
  http.HandleFunc("/upload", func(w http.ResponseWriter, r 
    *http.Request) {
    err := r.ParseMultipartForm(10000000)
    if err != nil {
      return
    }
    form := r.MultipartForm

    files := form.File["file"]
    for i, _ := range files {
      newFileName := listenFolder + files[i].Filename
      org,_:= files[i].Open()
      defer org.Close()
      cpy,_ := os.Create(newFileName)
      defer cpy.Close()
      io.Copy(cpy,org)
    }
  })  

  log.Fatal(http.ListenAndServe(":8080",nil))

}

在我们的 web服务器组件中,main()负责控制建立与文件侦听器和 Couchbase 的连接,并创建 web 服务器(具有相关路由)。

如果您通过将文件拖动到Drop files here to upload框来上传文件,几秒钟内您将看到该文件在 web 界面中被标记为已更改,如以下屏幕截图所示:

Designing our web interface

我们没有包含 web 界面客户端的代码;不过,关键是通过 API 进行检索。我们使用了一个名为Dropzone.js的JavaScript 库,它允许拖放上传,并使用 jQuery 进行 API 访问。

恢复文件的历史记录–命令行

我们想添加到此应用套件的最后一个组件是命令行文件修订过程。我们可以使这一点相当简单,因为我们知道文件位于何处,其备份位于何处,以及如何用后者替换前者。与之前一样,我们有一些全局配置变量和我们的generateHash()功能的复制:

var liveFolder = "/mnt/sharedir "
var backupFolder = "/mnt/backup

func generateHash(name string) string {

  hash := md5.New()
  io.WriteString(hash,name)
  hashString := hex.EncodeToString(hash.Sum(nil))

  return hashString
}

func main() {
  revision := flag.Int("r",0,"Number of versions back")
  fileName := flag.String("f","","File Name")
  flag.Parse()

  if *fileName == "" {

    fmt.Println("Provide a file name to use!")
    os.Exit(0)
  }

  couchbaseClient, err := couchbase.Connect("http://localhost:8091/")
    if err != nil {
      fmt.Println("Error connecting to Couchbase", err)
    }
  pool, err := couchbaseClient.GetPool("default")
    if err != nil {
      fmt.Println("Error getting pool",err)
    }
  bucket, err := pool.GetBucket("file_manager")
    if err != nil {
      fmt.Println("Error getting bucket",err)
    }  

  hashString := generateHash(*fileName)
  checkFile := File{}    
  bucketerr := bucket.Get(hashString,&checkFile)
  if bucketerr != nil {

  }else {
    backupLocation := backupFolder + checkFile.Name + "." + strconv.FormatInt(int64(checkFile.Version-*revision),10)
    newLocation := liveFolder + checkFile.Name
    fmt.Println(backupLocation)
    org,_ := os.Open(backupLocation)
      defer org.Close()
    cpy,_ := os.Create(newLocation)
      defer cpy.Close()
    io.Copy(cpy,org)
    fmt.Println("Revision complete")
  }

}

此应用最多可接受两个参数:

  • -f:表示文件名
  • -r:表示要恢复的版本数

请注意,这本身创建了一个新的版本,并因此创建了一个备份,所以-2 需要变成-3,然后变成-6,以此类推,以便不断地递归备份。

例如,如果希望将example.txt还原回三个版本,可以使用以下命令:

fileversion -f example.txt -r -3

使用 Go-in 守护进程并将其作为服务

关于运行类似于应用这一部分的程序的一个小提示,理想情况下,您希望将这些应用保持为活动的、可重启的服务,而不是独立的、手动执行的后台进程。这样做将允许您保持应用的活动状态,并通过外部或服务器进程管理其生命周期。

This sort of application suite would be best suited on a Linux box (or boxes) and managed with a daemon manager such as daemontools or Ubuntu's built-in Upstart service. The reason for this is that any long-term downtime can result in lost data and inconsistency. Even storing file data details in the memory (Couchbase and memcached) provides a vulnerability for lost data.

检查我们服务器的运行状况

在众多检查常规服务器运行状况的方法中,我们在这里处于良好的位置,无需构建自己的系统,这在很大程度上要感谢 Couchbase 本身。如果您访问 Couchbase web admin,在集群、服务器和存储桶视图下,单击任意项将显示一些实时统计信息,如以下屏幕截图所示:

Checking the health of our server

如果您希望将这些区域包括在应用中以使您的日志记录和错误处理更加全面,也可以通过 REST 访问这些区域。

总结

我们现在有了一个自上而下的应用套件,该套件高度并发,包含在多个第三方库中,并通过日志记录和灾难恢复来缓解潜在故障。

在这一点上,构建一个复杂的软件包,重点是在 Go 中维护并发性、可靠性和性能,应该没有问题。我们的文件监视应用可以很容易地进行修改,以执行更多操作、使用替代服务或扩展到一个健壮的分布式环境。

在下一章中,我们将更详细地了解测试并发性和吞吐量,探索 panic 和 recover 的价值,以及在 Go 中以安全、并发的方式记录重要信息和错误。