# Python RQ初学者指南
## 让 Python 任务排队执行

<h1>目录<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#让-Python-任务排队执行" data-toc-modified-id="让-Python-任务排队执行-1">让 Python 任务排队执行</a></span><ul class="toc-item"><li><span><a href="#引言" data-toc-modified-id="引言-1.1">引言</a></span></li><li><span><a href="#为什么我的-Python程序需要队列？" data-toc-modified-id="为什么我的-Python程序需要队列？-1.2">为什么我的 Python程序需要队列？</a></span><ul class="toc-item"><li><span><a href="#第一步，你需要有个-redis。" data-toc-modified-id="第一步，你需要有个-redis。-1.2.1">第一步，你需要有个 redis。</a></span></li><li><span><a href="#第二步-安装-RQ" data-toc-modified-id="第二步-安装-RQ-1.2.2">第二步 安装 RQ</a></span></li><li><span><a href="#第三步-建立Redis-Queue" data-toc-modified-id="第三步-建立Redis-Queue-1.2.3">第三步 建立Redis Queue</a></span></li><li><span><a href="#第四步-准备好你的任务" data-toc-modified-id="第四步-准备好你的任务-1.2.4">第四步 准备好你的任务</a></span></li><li><span><a href="#第五步-把任务加入RQ队列" data-toc-modified-id="第五步-把任务加入RQ队列-1.2.5">第五步 把任务加入RQ队列</a></span></li><li><span><a href="#第六步-起个工人开始干活吧" data-toc-modified-id="第六步-起个工人开始干活吧-1.2.6">第六步 起个工人开始干活吧</a></span></li><li><span><a href="#第七步-如果-Job-间有依赖关系" data-toc-modified-id="第七步-如果-Job-间有依赖关系-1.2.7">第七步 如果 Job 间有依赖关系</a></span></li><li><span><a href="#出错处理" data-toc-modified-id="出错处理-1.2.8">出错处理</a></span></li></ul></li></ul></li></ul></div>


### 引言

已经有一些关于 Python RQ 的教程了，比如[简易教程-RQ](https://www.twle.cn/go/rq)，

我写这篇 Python RQ 指南时站在工程师的角度，让你通过动手一步步走下来，看完这一篇文章基本搞懂 RQ怎么用。

我在这篇文章中也会带入我用 Python 处理数据时的一些思考。

### 为什么我的 Python程序需要队列？

我学习使用 Python 已经有一段时间了，主要是用做数据处理，需要把数据从机器上下载下来，经过处理后放到结构化的数据库中。我自己设计的爬虫需要用到这个 Python 程序，它每天定时执行，把需要处理的数据下载产生报表。爬虫要按照顺序执行好几个步骤，比如下载数据，解析数据，存储到数据库，然后产生摘要。我的同事还设计了 web 应用，允许用户手动调用这些步骤。

但是，让人困扰的是怎么监控每个任务的运行状态，通过日志和任务进程，我个人感觉都不是规范的做法。当出现错误时候，怎么处理也是问题啊。

#### 第一步，你需要有个 redis。

如果你是在Linux x86或者 mac 上，这条 docker命令能让你一步拥有redis 环境。

In [None]:
%%bash
docker run --name my-redis-container -p 6379:6379 -d redis

如果你的环境是LinuxOne/s390x，那就用下面这条命令。

In [None]:
%%bash
docker run --name my-redis-container -p 6379:6379 -d s390x/redis

#### 第二步 安装 RQ

In [49]:
%%bash
pip install --user rq
# 为了执行后面的测试程序，我们还需要安装 lxml和 requests,但是这不是RQ需要的
pip install --user lxml
pip install --user requests



You are using pip version 18.1, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
You are using pip version 18.1, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
You are using pip version 18.1, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.


#### 第三步 建立Redis Queue

默认的 RQ 是连接到 redis://@localhost:6379/0。如果你的配置不是这样，自己去查参数吧。

In [1]:
from redis import Redis
from rq import Queue

q = Queue(connection=Redis())

#### 第四步 准备好你的任务

你需要随便写个函数，并把他放在某个独立的 python 文件中。注：RQ不允许使用当前“\_\_main\_\_”所在的文件中的函数。所以，不要偷懒，单独准备个文件吧。'

下面这个例子时官方文档提供的，会点网页的词数, 代码如下:

代码在 [my_module.py](my_module.py)

```python
import requests

def count_words_at_url(url):
    resp = requests.get(url)
    return len(resp.text.split())
```

#### 第五步 把任务加入RQ队列

In [2]:
from my_module import count_words_at_url
result = q.enqueue(
             count_words_at_url, 'http://nvie.com')

RQ不仅仅提供了 python 库，还提供了rq命令。通过"rq info"子命令，我们可以看到任务已经在队列里面了。

In [3]:
%%bash
rq info

default      |█ 1
1 queues, 1 jobs total

31b8a4a03a524c1a9a106d00419f8532 (miaocx-mbp.local 27711): busy default
1 workers, 1 queues

Updated: 2020-03-26 14:33:13.925443


#### 第六步 起个工人开始干活吧

我们可以通过 rq worker子命令启动一个worker来处理任务，但是要注意的是
1. 如果你的机器上有多个Python环境，请把Path指向你要用的python可执行程序所在的目录。
2. 把 PYTHONPATH指向mymodule.py所在目录

但是因为worker进程是在后台一直运行的，所以需要你在自己的终端里面启动。

```bash
$ export PATH=~/my_env/venv/bin:$PATH
$ export PYTHONPATH=~/my_folder/cqyblog/python/:$PYTHONPATH
$ rq worker
```



你再次打开 rq info，就可以看到任务是不是执行完了。

In [4]:
%%bash
rq info

default      |█ 1
1 queues, 1 jobs total

31b8a4a03a524c1a9a106d00419f8532 (miaocx-mbp.local 27711): busy default
1 workers, 1 queues

Updated: 2020-03-26 14:33:29.341319


#### 第七步 如果 Job 间有依赖关系

我们的任务可能相互之间有先后关系或者说依赖关系。

比如说，我们要先下载页面，然后找出上面全部连接，分成两步走。

"download_page"方法，把页面下载到本地content.txt. count_links负责分析link, 通过depends_on=job_id指定依赖关系。
只有"download_page"成功了，才会执行 下一步。

In [5]:
from my_module import download_page, count_links
job1 = q.enqueue(
             download_page, 'http://nvie.com')
job2 = q.enqueue(count_links, depends_on=job1.id)

如果我们的worker还活着，它就会依次执行job1, job2。这里我故意把程序写错了，job1的输出时"context.txt", job2却没有去读“context.txt", 而是读取"page.txt"。rq worker 里面就会看到下面的错误。

```
FileNotFoundError: [Errno 2] No such file or directory: 'page.txt'
```

#### 出错处理
rq info给出的信息还是比较有限的。如果有些任务失败了，我们怎么去处理呢？

我们可以打印出出错的job id和出错信息等信息。

In [10]:
for job_id in q.failed_job_registry.get_job_ids():
    job=q.fetch_job(job_id)
    print(job_id)
    print(job.exc_info)

1172495b-fe0b-4813-8a3a-fb0a6bd14052
Traceback (most recent call last):
  File "/Users/cqy/code/PerfDataAnalytics3/venv/lib/python3.7/site-packages/rq/worker.py", line 886, in perform_job
    rv = job.perform()
  File "/Users/cqy/code/PerfDataAnalytics3/venv/lib/python3.7/site-packages/rq/job.py", line 664, in perform
    self._result = self._execute()
  File "/Users/cqy/code/PerfDataAnalytics3/venv/lib/python3.7/site-packages/rq/job.py", line 670, in _execute
    return self.func(*self.args, **self.kwargs)
  File "/Users/cqy/Box Sync/cqyblog/python/my_module.py", line 17, in count_links
    with open("page.txt",'r') as content_file:
FileNotFoundError: [Errno 2] No such file or directory: 'page.txt'



我们现在可以去把 content.txt 拷贝到 page.txt，手动解决这个问题。

In [12]:
%%bash
cp content.txt page.txt

然后我们可以把这个job重新加入队列。

In [14]:
job = q.failed_job_registry.requeue(job_id)

在 worker 那里，我们就能看到它顺利执行完了。

```
14:43:35 default: my_module.count_links() (1172495b-fe0b-4813-8a3a-fb0a6bd14052)
['/', '/posts/', '/about/', '/posts/git-power-tools/', '/posts/introducing-decoders/', '/posts/why-you-should-consider-technical-debt-to-be-real-debt/', '/posts/beautiful-code/', '/posts/a-successful-git-branching-model/', '/posts/']
14:43:35 default: Job OK (1172495b-fe0b-4813-8a3a-fb0a6bd14052)
14:43:35 Result is kept for 500 seconds
```

注：如果需要删除失败的job,可以执行下面这段。

In [9]:
q.failed_job_registry.remove("1440f09a-2921-439b-bde9-27abfd598041")

1