# 海量数据处理技巧小结

2020年6月15日

上个月（2020年5月），做了一个5G搜网的DEMO，需要把历史的众包数据全量地练库。
做了一个粗略的估计，每天的众包大概7KW条记录，要处理3个月的，就达到几十亿的量。
以前没有处理过这么大的数据量，最多就是用ELK来处理流数据。所以在我看来，这次也是一个小小的挑战。


以下简单回顾一下处理的思路，以及通过python-cookbook等书籍中看到的一些小技巧后的思考。
首先，根据粗略的估算，大概有几十亿的量。就要考虑两个问题：一是数据库的性能；二是处理数据的性能。
因为业务逻辑还是很简单的，主要是写入到数据库，再查询出来。主要的矛盾在于性能方面。
所以刚开始还做了一些关于数据库的调研。


当前主流数据库就两种：关系型数据库和NoSQL。
要处理数十亿的量，首选是NoSQL。关第型需要也有可能处理，但要分库分表，还要使用一些中间件来分流。相当复杂，成本相当高。
至于NoSQL，基本都是为了处理海量数据而设计的。
但NoSQL也有很多类型，纷繁复杂，网络上分类就不在这里复述了。这里只讲一些网上没有的。
在一一比较了每个NoSQL类型后，发现要想快速部署起应用，把DEMO做起来，除了考虑数据库本身的性能外，还需要考虑当前的环境。
就是当前其它项目已经在选择了Cassadra，还有必要去重新部署另一套数据库吗？还有多余的资源来部署吗？部署的时间足够吗？稳定性如何？
这几个问题，在以前我都不会考虑到。随着对C* 的深入了解，发现它的写入性能是读取性能的好几倍，原因是写入过程会先写内存，内存写成功了就可以返回了。
这是以提供较低的一致性————最终一致性为代价的。再考虑项目的实际需求，写性能的确是比读性能要求更高。
因为几十亿的记录要在尽可能短的时间内写完，而读的数量很少，并发量不大。
所以，结合NoSQL的自身特点与项目需求的客观环境因素，C* 是最佳选择。

选定数据库后，就是数据表的设计。
根据资料显示，C* 的设计理念是磁盘是便宜的，可以为每种查询语句创建对应的表，即使有多个字段是重复的，但可以带来读性能的提升，以及表设计时更加简单。
当然这其中还是有不少技巧的，比如主键、分区键的设计、使用适当的字段排序以提升分页的性能等等。
小结一句，就是要根据业务特点来设计表。最理想的情况是可以知道查询语句后，再来设计表。这样就非常的直接了。
回到这个项目来讲，众包数据，更加关心的是地理位置，所以直接按地理位置来设计主键就可以了。

数据库和数据表都定下来了，就要考虑处理性能了。
因为之前使用ELK做过日志收集分析系统，第一感觉是ELK的性能相当高啊。
就考虑使用ELK来做这个DEMO，但调研后发现Logstage并不支持C* ，还需要自己开发插件。
后来到了尝试部署时就发现，这个部署会更加麻烦。ELK都需要重新部署，而且整个项目组只有我一个人熟悉ELK，以后作交接会很麻烦。
综合上面的因素，还有数据库选型，数据处理部分还是需要自己写。

接下来就测试一下Python的处理性能。原来Python的处理性能并不有听闻的那么差。遍历3W的数据库只要0.1秒，就是加上处理逻辑后，
只要在1W CPS也是可以接受的。
那接下来就是开发数据处理逻辑了。这里结合了一部分刚从python-cookbook上看到的知识点来写。
选开发单进程的版本，然后在外层增加一个多进程的调度器，就可以形成多进程版本。
这里有几点要注意的：
1. 多进程启动时，要选择‘spawn’方式，这时父进程会启动一个亲的的Python解释器进程。
子进程只会继承那些运行进程对象的run()方法需要的资源。特别是父进程中非必须的文件描述符和句柄不会被继承。
相对于使用‘fork’或者‘forkserver’，使用这个方法启动进程相当慢。
但可会避免不必要的继承，在对启动速度要求不高，启动后就要长时间保留的场景下相当适合。
```multiprocess.set_start_method('spawn')```
1. 不要随便使用队列。
队列是多进程通信机制。在这个项目中，每个进程的任务就是处理数据，然后写到数据库里。相互之前是不需要通信的，也就不需要使用到队列。
当然，主进程会需要收集每个子进程的结果，这个只需要使用multiproceing模块的map_async函数，就可以指定回调函数来处理子进程的结果。
在这个项目中，主进程的工作就是把待处理的文件名分配给子进程，然后收集子进程已经处理好的文件名，再记录好，以便重启后不会重复处理文件。
1. 经过这个项目，发现选择合适的数据流模型是很重要的。比如，在一开始时考虑的ELK模型是不适合这个简单的任务，因为ELK是多个生产者把数据汇合到一处的。
再比如刚开始动手写时，参考了一些使用队列的模型也是不适合的，因为这个项目的需要是每个子进程可能单独工作，不需要相互通信。
所以最后得到的数据流处理模型，就是主进程负责找到未处理的文件名，然后分配给各个子进程，再收集子进程已经处理好的文件名，并保存下来。
每个子进程只负责把指定的文件过滤，拆分，练库，入库，最后返回处理完的文件名。
这个模型简单明了，边界清楚，开发简单。
1. 后来再研究了一下python的并发模块，当前主要提供两类接口。一是concurrent.futures下的ThreadPoolExecutor和ProcessPoolExecutor，这是一个相当高层的接口。低层也是调用了threading.Thread和multiprocess.Process，但提供了池的管理。这个模块的优点是把线程和进程的调用接口统一，可以方便切换。当一个项目在一开始还不知道是选择线程还是选择进程时，使用这个模块是很方便的。因为每个任务可能涉及IO和计算，在一开始不很难知道是IO密集型还是计算密集型任务的。这时使用这个接口是明智的。同时这个模块提供map接口和submit接口，即可以处理简单统一的任务，也为复杂的任务提供了相当的灵活性。但缺点就是没有提供更多版本的map函数，不支持回调函数。这就比不上另一个模块————multiprocessing。这是只提供多进程的接口，没有多线程的接口。适合那些一开始就明确使用多进程的任务。这模块提供了多个map函数的版本是，最好用的提支持回调函数。（这里要吐槽一下，多线程完全没有对应的接口，害我找了好半天。这段内容完全原创，并不是从网上抄来的。如有雷同，纯属巧合。）


在开发完成后，就开始部署了。要部署这样一个长时间运行的任务，当时还不会写守护进程，就只是设置了一个crotab周期任务来定时重启。
重启是为了避免一些没处理的异常使用任务完全挂了的情况。现在看来还真的好原始的方法。
最后再提一下，就是日志的使用，很后悔没有一开始就使用日志模块。可能是以前配置过太复杂的内容，现在都怕了使用。
但后来看到python-cookbook的介绍，日志模块已经提供了很简单的使用方式，基本3行就可以使用起来了。
这时必须要使用日志啊，这会为后面的运维和问题排查提供极大的方便。

In [1]:
# 以下是简单的日志示例
import logging
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log = logging.getLogger(__name__)

后来学到了一些函数式编程技巧，比当前这个项目的写法又好了太多，但这是后话了。

尾记：[多进程访问数据库示例](https://github.com/aholmberg/driver-multiprocessing/blob/master/multiprocess_concurrent.py)