# 简介

免费代理ip池，就是自动爬取网络上的一些免费的代理，经过校验之后，将可用代理存储起来，在需要时随机提供一个可用的免费代理。

不过网上的免费代理可用性都很差，如果想正常使用还是建议购买隧道代理，这个相对来说比较稳定。

# 分析

要实现代理ip池，需要考虑以下内容
1. 从哪里获取免费的代理ip
    - 快代理（可用的比较少）：https://www.kuaidaili.com/free/
    - 不知名的代理：https://ip.jiangxianli.com/?page=1
2. 如何获取代理
    - 使用爬虫定时的爬取网页的免费代理，网站中的代理都是定时刷新的，只需要爬取第一页的代理ip即可
3. 免费代理如何存储
    - 免费代理的查询场景很多，要考虑到查询速度
    - 免费代理ip不能重复，要考虑去重，在爬取时重复的ip跳过
    - 要对代理ip进行定时校验，更新其的可用性，查询和更新要快
    - 在提供可用ip时，要先拿出所有可用ip，将ip随机提供出去
    
    综上考虑，选择redis来存储，使用zset来存储，其有个score可以用来识别其可用性。
4. 如何识别免费代理ip可用
    - 使用代理ip想某个网址发送请求，如果可以拿到正确的响应说明是可用的
    - 需在请求时设置超时，代理ip不可用时，默认的超时时间很久，所有需要设置超时。
    - 建议向大众经常使用的网址发送请求来识别代理ip是否可用，如baidu
    - 也可以向你要爬取的目标网站发送请求来检测是否可用(不过可能会提前被封)
5. 如何对外提供可用的免费代理ip
    - 可以使用flask搭建一个简单的服务
    - 每次对外提供一个随机可用的ip
    

在存储时有个思路：
    
    使用redis的zset来存储，当代理ip添加时，其初始的score都是30，每次检测如果用直接将其的score拉满为100分，如果一个检测ip不可用就将其score减去2.当score<=0时，将该ip从zset中删除。
    
    稳定的ip是分数为满分的，不稳定的ip是分数是50-99之间的。

# 代码


## redis的操作

爬取代理存储到redis、从redis拿到所有的ip校验可用性、从redis中拿可用ip提供出去；这三个操作都离不开对redis的操作，所以首先封装对redis的操作

```python
import random
from redis import Redis

class ProxyRedis(object):
    def __init__(self):
        self.red = Redis(
            host='127.0.0.1',
            port='6379',
            db=5,
            decode_responses=True
        )

    def add_proxy_ip(self, ip):
        """
        增加时要先检测ip是否已经存在
        :param ip:
        :return:
        """
        if not self.red.zscore('proxy_ip', ip):
            self.red.zadd('proxy_ip', {ip: 30})
        else:
            print(f"采集到新的IP了，{ip}，但是以及存在了")

    def get_all_proxy(self):
        return self.red.zrange('proxy_ip', 0, -1)

    def set_max_score(self, ip):
        self.red.zadd('proxy_ip', {ip: 100})

    def desc_incrby(self, ip):
        """
        检验的ip此次不可用，将分数减去2
        :param ip:
        :return:
        """
        score = self.red.zscore('proxy_ip', ip)
        if score > 0:
            # 分数减去2
            self.red.zincrby('proxy_ip', -2, ip)
        else:
            # 分值以及小于0了，删除，再见
            self.red.zrem('proxy_ip', ip)

    def get_avail_proxy(self):
        """
        可用ip可以分等级：
            满分可用ip
            稳定性差的ip：50-99
        没有可用ip时返回None
        :return:
        """
        # 拿到所有ip中分数为100的ip
        ips = self.red.zrangebyscore('proxy_ip', 100, 100, 0, -1)  # 参数：字典大key，最小分数，最大分数，起始位置，终止位置
        if not ips:
            # 没有满分可用ip，拿质量较差的ip
            ips = self.red.zrangebyscore('proxy_ip', 50, 99, 0, -1)
            print("满分ip没有了，只有较差的ip")

        if ips:
            return random.choice(ips)
        print("实在没有可用的IP了")
        return None


if __name__ == '__main__':
    r = ProxyRedis()
    # r.add_proxy_ip('129.11.12.11')
    r.set_max_score('129.11.12.11')
    print(r.get_avail_proxy())
```

## 爬取网站的免费代理ip

下载时，肯定是需要循环下载的，每隔一段时间去网站获取免费的代理IP

```python
import time
import requests
from lxml import etree
from proxy_redis import ProxyRedis


class IPCollection(object):
    def __init__(self):
        self.proxy_redis = ProxyRedis()
        self.headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36"
        }

    def get_jiangxianli(self):
        url = 'https://ip.jiangxianli.com/?page=1'
        resp = requests.get(url, headers=self.headers)
        tree = etree.HTML(resp.text)
        trs = tree.xpath('//table/tbody//tr')
        for tr in trs:
            ip = tr.xpath('./td[1]/text()')
            port = tr.xpath('./td[2]/text()')

            if not ip:
                continue

            self.proxy_redis.add_proxy_ip(f'{ip[0]}:{port[0]}')


    def get_kuaidali(self):
        url = 'https://www.kuaidaili.com/free/intr/'  # 建议爬取国内普通代理，国内高匿多数不可用
        resp = requests.get(url, headers=self.headers)
        tree = etree.HTML(resp.text)
        # print(resp.text)
        trs = tree.xpath('//table/tbody//tr')
        for tr in trs:
            ip = tr.xpath('./td[1]/text()')
            port = tr.xpath('./td[2]/text()')

            if not ip:
                continue
            self.proxy_redis.add_proxy_ip(f'{ip[0]}:{port[0]}')

    def run(self):
        """
        循环采集爬虫
        :return:
        """
        while True:
            try:
                self.get_kuaidali()
                self.get_jiangxianli()
            except Exception as e:
                print("有人崩了", e)

            time.sleep(60)  # 睡眠60s，继续爬取ip

def run():
    ip_col = IPCollection()
    # ip_col.get_kuaidali()
    ip_col.run()

if __name__ == '__main__':
    ip_col = IPCollection()
    # ip_col.get_kuaidali()
    ip_col.run()
```

## 检验ip的可用性

向百度发起请求来查看当前ip是否可用`https://www.baidu.com/s?ie=UTF-8&wd=ip`,来查看当前ip是不是代理ip


一次从redis中拿出所有的ip，使用单线程来一个一个校验太慢了，可以使用多线程或者协程的方式来校验。

```python
import time
import re
import asyncio
import aiohttp
from proxy_redis import ProxyRedis
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36"
}

async def check_ip(ip, sem, proxy_redis):
    print(f"开始检测{ip}的可用性.....")
    url = 'https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&tn=baidu&wd=ip&oq=ip&rsv_pq=8e6712c90001cd36&rsv_t=a679rO0w6H496Lzf2uVmYRvMFon8BfJh9djjxIZ%2BzSjXWYc8Jvs38aYmvJY&rqlang=cn&rsv_enter=0&rsv_dl=tb&rsv_btype=t'
    timeout = aiohttp.ClientTimeout(total=10)  # 10秒没回来就报错, 扣分
    print("开始校验ip:", ip)
    async with sem:
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, headers=headers, timeout=timeout, proxy=f'http://{ip}') as resp:
                    page_source = await resp.text()
                    print(page_source)
                    if resp.status in [200, 302]:
                        # 没有问题，拉满分值
                        proxy_redis.set_max_score(ip)
                        r = re.compile('<span class="c-gap-right">本机IP:&nbsp;(?P<ip>.*?)')
                        print(ip, "可用，分值拉满！！！", r.findall(page_source))
                    else:
                        # 减分
                        proxy_redis.desc_incrby(ip)
                        print(ip, "不可用，扣分")
        except Exception as e:
            # 超时时会报错，在此处捕捉异常
            print("校验可用性报错了", ip, e)
            # 进行扣分
            proxy_redis.desc_incrby(ip)
            print(ip, '不可用扣分')


async def main(proxy_redis):
    # 拿出所有ip
    ips = proxy_redis.get_all_proxy()
    sem = asyncio.Semaphore(30)  # 控制并发量. 默认30
    task = []
    for ip in ips:
        task.append(asyncio.create_task(check_ip(ip, sem, proxy_redis)))

    await asyncio.wait(task)


def run():
    proxy_redis = ProxyRedis()
    # 初始的等待，等待爬虫抓取到一些数据了开始检测
    time.sleep(100)
    while True:
        # 每隔一段时间检测
        try:
            event_loop = asyncio.get_event_loop()
            event_loop.run_until_complete(main(proxy_redis))
            time.sleep(100)  # 每次检测完，等一段时间再去检测
        except Exception as e:
            print("在校验的时候出错了", e)
            time.sleep(100)


if __name__ == '__main__':
    run()
```

从校验的可用ip中拿一个出来，配置为代理看可用性：

如果和你本地ip地址不同，说明生效了
```python
import requests
    proxy = {
        'http': 'http://118.190.244.234:3128',
        'https': 'http://118.190.244.234:3128'
    }
    resp = requests.get('https://2022.ip138.com/',
                        proxies=proxy,
                        headers=headers
                        )
    resp.encoding = 'utf-8'
    print(resp.text)
```

![image.png](attachment:image.png)

## 开启一个网络服务API来提供ip

flask解决跨区域：

```python
# 安装： pip install flask-cors
```

用flask编写一个提供api的服务端

```python
from flask import Flask, jsonify
from flask_cors import CORS

from proxy_redis import ProxyRedis
app = Flask(__name__)

CORS(app, supports_credentials=True)

proxy_redis = ProxyRedis()


@app.route('/get_proxy_ip')
def get_proxy_ip():
    return jsonify({'ip': proxy_redis.get_avail_proxy()})


def run():
    app.run(host='127.0.0.1', port='9939')

if __name__ == '__main__':
    app.run(host='127.0.0.1', port='9939')


```

## 将三者结合，同时运行

每个功能起一个进行执行，三个功能要同时运行，在采集的同时，分析可用性，并且将IP的API提供给用户。

```python
from multiprocessing import Process

from ip_collection import run as col_run
from ip_verify import run as ver_run
from ip_api import run as api_run

# 三个进程全部启动执行


def main():
    col = Process(target=col_run)
    verify = Process(target=ver_run)
    api = Process(target=api_run)

    col.start()
    verify.start()
    api.start()


if __name__ == '__main__':
    main()
```

其模型是，生产者消费者模型

![image.png](attachment:image.png)