# 本站代码在MIT License下开源

The MIT License (MIT)
Copyright © 2022-present, Fan Chen

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

# 说明
该脚本每15分钟后台运行一次。如发现数据更新，则将更新数据处理后上传至redis数据库

In [None]:
import os
import requests
from lxml import etree
import pickle
import pandas as pd
from math import log
import datetime as dt
import redis
import sys

## 抓取前准备，数据加载，预处理

### 记录脚本运行时间并通知prometheus

In [None]:
def publish_metrics(label, metrics, value):
    requests.post(f'http://localhost:9091/metrics/job/data_grab/label/{label}', data=f'{metrics} {value}\n')

In [None]:
n = dt.datetime.now()

In [None]:
n.strftime('%Y-%m-%d %H:%M:%S.%f')

In [None]:
publish_metrics('grab_time', 'run_start_time', int(n.timestamp()))

### 配置文件

In [None]:
# 手动配置“小区感染数据”页面链接
CONFIG_FILE = 'data/manual.config'
# 手动配置“感染人数”页面链接
CNT_CONFIG_FILE = 'data/cnt_manual.config'
# 原始数据文件保存和读取文件
PICK_FILE = 'data/infect.pickle'
CNT_FILE = 'data/cnt.pickle'

In [None]:
# 官方更正数据文件
ADJUST_FOLDER = 'data/adjust/'

### 加载往期数据文件

In [None]:
def load_if_exist(path, cols):
    if os.path.exists(path):
        with open(path, 'rb') as f:
            return pickle.load(f)
    else:
        return pd.DataFrame(columns=cols)

In [None]:
all_data = load_if_exist(PICK_FILE, ['Dist', 'Community', 'Date'])

In [None]:
cnt_data = load_if_exist(CNT_FILE, [])

### 如果数据已经更新到前一天，就跳过网页抓取以避免被网站拉入黑名单
- 本站曾经被卫健委官方网站拉入黑名单，于第二天解封。请各位抓取数据时谨慎对待。

In [None]:
yesterday = dt.date.today() - dt.timedelta(days=1)

In [None]:
if not all_data.empty and all_data.Date.max() >= yesterday:
    processing_infect = False
else:
    processing_infect = True

In [None]:
if not cnt_data.empty and cnt_data.index.max() >= yesterday:
    processing_cnt = False
else:
    processing_cnt = True

In [None]:
processing_infect, processing_cnt

## 网页抓取

In [None]:
def request(url):
    headers = {
        'User-Agent': 
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36'}
    r = requests.get(url, headers=headers)
    return r.text

In [None]:
import re
import datetime as dt
infect_re = re.compile('.*(?P<month>\d)月(?P<day>\d*)日（0-24时）本市各区确诊病例、无症状感染者居住地信息')
cnt_re = re.compile('.*((?P<year>\d+)年)?(?P<month>\d+)月(?P<day>\d+)日.?(上海)?无?新增本土新冠肺炎确诊病例(?P<bl>\d+)例.*本土无症状感染者(?P<wzz>\d+)例.*')
cnt_re2 = re.compile('(?P<month>\d+)月(?P<day>\d+)日.*上海(无)?新增(?P<bl>\d+)例本土新冠.*新增(?P<wzz>\d+)例本土无症状')

def process_infect_title(node):
    r = infect_re.match(node.getchildren()[0].attrib['title'])
    return dt.date(2022, int(r['month']), int(r['day']))

def parse_cnt_title_str(s):
    r = cnt_re.match(s)
    if r is None:
        r = cnt_re2.match(s)
    if r is None:
        if '无新增本土新冠肺炎确诊病例' not in s:
            print('cannot process cnt title "{}"'.format(s))
        return None
    return dt.date(2022, int(r['month']), int(r['day'])), int(r['bl']), int(r['wzz'])


def process_cnt_title(node):
    return parse_cnt_title_str(node.getchildren()[0].attrib['title'])


def get_url(node):
    return node.xpath('.//div/div/a[@class="url"]')[0].attrib['href']


def get_search_report_dict():
    ret = get_page_search_infect_dict(
        'https://ss.shanghai.gov.cn/search?q=本市各区确诊病例、无症状感染者居住地信息&page=2&view=&contentScope=2&dateOrder=1&tr=1&dr=&format=1&re=2&all=1&siteId=wsjkw.sh.gov.cn&siteArea=all',
    )
    ret.update(get_page_search_infect_dict(
        'https://ss.shanghai.gov.cn/search?page=1&view=&contentScope=2&dateOrder=1&tr=1&dr=&format=1&re=2&all=1&debug=&siteId=wsjkw.sh.gov.cn&siteArea=all&q=本市各区确诊病例、无症状感染者居住地信息',
    ))
    ret.update(get_page_search_infect_dict(
        'https://ss.shanghai.gov.cn/search?page=3&view=&contentScope=2&dateOrder=1&tr=1&dr=&format=1&re=2&all=1&debug=&siteId=wsjkw.sh.gov.cn&siteArea=all&q=本市各区确诊病例、无症状感染者居住地信息',
    ))
    ret.update(get_page_search_infect_dict(
        'https://ss.shanghai.gov.cn/search?page=4&view=&contentScope=2&dateOrder=1&tr=1&dr=&format=1&re=2&all=1&debug=&siteId=wsjkw.sh.gov.cn&siteArea=all&q=本市各区确诊病例、无症状感染者居住地信息',
    ))
    return ret


def get_search_cnt_dict():
    ret = get_page_search_cnt_dict(
        'https://ss.shanghai.gov.cn/search?q=新增本土新冠肺炎&page=1&view=&contentScope=2&dateOrder=1&tr=1&dr=&format=1&re=2&all=1&siteId=wsjkw.sh.gov.cn&siteArea=all',
    )
    ret.update(get_page_search_cnt_dict(
        'https://ss.shanghai.gov.cn/search?q=新增本土新冠肺炎&page=2&view=&contentScope=2&dateOrder=1&tr=1&dr=&format=1&re=2&all=1&siteId=wsjkw.sh.gov.cn&siteArea=all',
    ))
    ret.update(get_page_search_cnt_dict(
        'https://ss.shanghai.gov.cn/search?q=新增本土新冠肺炎&page=3&view=&contentScope=2&dateOrder=1&tr=1&dr=&format=1&re=2&all=1&siteId=wsjkw.sh.gov.cn&siteArea=all',
    ))
    return ret


def get_page_search_nodes(url, key):
    response = request(url)
    html = etree.fromstring(response, etree.HTMLParser())
    results = [i for i in html.xpath("//div[@id = 'results']")[0].getchildren() if 'class' in i.attrib]
    daily_result_nodes = [x for x in results if all([k in x.getchildren()[0].attrib['title'] for k in key])]
    return daily_result_nodes

def get_page_search_infect_dict(url):
    daily_result_nodes = get_page_search_nodes(url, ['本市各区确诊病例、无症状感染者居住地信息'])
    return {process_infect_title(i): get_url(i) for i in daily_result_nodes}


def get_page_search_cnt_dict(url):
    daily_result_nodes = get_page_search_nodes(url, ['新增', '本土新冠肺炎'])
    cnt_ret = {}
    for cnt_node in daily_result_nodes:
        p_title = process_cnt_title(cnt_node)
        if p_title is None:
            continue
        date, bl_cnt, wzz_cnt = p_title
        cnt_ret[date] = get_url(cnt_node)
    return cnt_ret


def is_infect_item(node, key):
    return all([k in node.xpath('./a')[0].text for k in key])

def get_index_url(node):
    ref_url = node.xpath('./a')[0].attrib['href'].strip()
    if ref_url.startswith('https:'):
        return ref_url
    return 'https://wsjkw.sh.gov.cn' + ref_url

def get_manual_config(file):
    ret = {}
    if not os.path.exists(file):
        return ret

    with open(CONFIG_FILE, 'r') as f:
        for l in f:
            l = l.strip()
            date = dt.datetime.strptime(l[:l.index(' ')], '%Y-%m-%d').date()
            url = l[l.index(' ') + 1:]
            ret[date] = url
    return ret

def get_index_report():
    response = request('https://wsjkw.sh.gov.cn/xwfb/index.html')
    html = etree.fromstring(response, etree.HTMLParser())
    pub_lists = html.xpath("//div[@id = 'main']")[0].xpath('./div/div/ul')[0]
    infect_list = [node for node in pub_lists if is_infect_item(node, '本市各区确诊病例、无症状感染者居住地信息')]
    infect_ret = {process_infect_title(i): get_index_url(i) for i in infect_list}
    
    cnt_list = [node for node in pub_lists if is_infect_item(node, ['新增', '本土新冠肺炎'])]
    cnt_ret = {}
    for cnt_node in cnt_list:
        date, bl_cnt, wzz_cnt = process_cnt_title(cnt_node)
        cnt_ret[date] = get_index_url(cnt_node)
    return infect_ret, cnt_ret

In [None]:
from html.parser import HTMLParser


DIST_LIST = ['浦东新区', '黄浦区', '静安区', '徐汇区', '长宁区', '普陀区', '虹口区', '杨浦区', '宝山区',
        '闵行区', '嘉定区', '金山区', '松江区', '青浦区', '奉贤区', '崇明区']


def get_adjust_data(d):
    adjust_file = os.path.join(ADJUST_FOLDER, f'{d.strftime("%Y-%m-%d")}.diff')
    if os.path.exists(adjust_file):
        with open(adjust_file, 'r') as f:
            return [i.strip() for i in f.read().split('\n') if len(i.strip()) != 0]
    return []
    

def convert_html_to_string(text):
    class HTMLFilter(HTMLParser):
        text = ""
        def handle_data(self, data):
            self.text += data
        def handle_starttag(self, tag, attrs):
            if tag == 'br':
                self.text += '\n'
            elif tag == 'p':
                self.text += ','
            
    f = HTMLFilter()
    f.feed(text)
    return f.text


def filter_string(text):
    if '各区信息如下' not in text:
        print(text)
    t = text[text.rfind('各区信息如下'):]
    if 'jQuery(' in t:
        t = t[:t.index('jQuery(')].strip()
    else:
        t = t[:t.index('var first_sceen__time')].strip()
    return t.replace('\xa0', '\n')


def parse_text(text, string_fileter):    
    infect = {}
    current = None

    for line in re.split('[，,\n]', string_fileter(text)):
        line = line.strip(' ,')
        if not line:
            continue
        if line in DIST_LIST:
            # if line in infect:
            #     print(line)
            # assert(line not in infect)
            current = line
            continue
        if current is None:
            continue
        if (line[:1].isdigit() or 
            '居住于' in line or 
            '终末消毒措施' in line or 
            '上海发布' in line or 
            '资料' in line or 
            '病例' in line or 
            '新增' in line or 
            '措施' in line or 
            '2022年' in line or
            '滑动' in line or
            ('月' in line and '日' in line) or
            '感染' in line or
            '编辑' in line or
            '目前' in line or
            '滑动查看更多' in line
           ):
            continue
        inf_list = [i.replace(' ', '') for i in re.split('[,.，。、]', line) if i]
        if current in infect:
            infect[current] += inf_list
        else:
            infect[current] = inf_list
    return infect


def parse_page_content(page_content, adjust_data, string_filter=filter_string):
    data = parse_text(convert_html_to_string(page_content), string_filter)
    for i in adjust_data:
        _dist, _addr = i[1:].split(',')
        if i.startswith('+'):
            data[_dist].append(_addr)
        elif i.startswith('-') and _addr in data[_dist]:
            data[_dist].remove(_addr)
        else:
            print('Wrong adjust data {i}')
    for dist in data:
        data[dist] = list(set(data[dist]))
    return data


def parse_infect_page(url, adjust_data):
    t = request(url)
    return parse_page_content(t, adjust_data)


def create_inf_df(inf_data):
    dist, com = [], []
    for _dist, _coms in inf_data.items():
        for _com in _coms:
            dist.append(_dist)
            com.append(_com)
    return pd.DataFrame({'Dist': dist, 'Community': com})


def trans_data(all_data):
    d, dist, com = [], [], []
    for _d, _inf in all_data.items():
        for _dist, _coms in _inf.items():
            for _com in _coms:
                d.append(_d)
                dist.append(_dist)
                com.append(_com)
    return pd.DataFrame({'Date': d, 'Dist': dist, 'Community': com})


def vague_search_print(comm, data):
    if comm != '*':
        df = data[data.Community.str.contains(comm)].set_index(
            ['Dist', 'Community', 'Date']).sort_index(
                ascending=[True, True, True, False])
    else:
        df = data.set_index(
            ['Dist', 'Community', 'Date']).sort_index(
                ascending=[True, True, True, False])
    df.index.names = ['区', '地址', '报告日期']
    return df

In [None]:
from collections import defaultdict

def filter_cnt_string(s):
    # s = s[:s.index('籍')]
    sec = re.split('[ ,、，。\n\xa0]', s)
    for n, l in enumerate(sec):
        if '病例1—' in l:
            return [l for l in sec[n:] if len(l) != 0]
    return [i for i in sec[sec.index('病例1'):] if len(i) != 0]


def get_cnt_index(l, kind):
    l = l.strip()
    if l.isdigit():
        return int(l)
    return int(l[l.index(kind)+len(kind):])


def check_single_item(l, kind):
    l = l.strip()
    return l.startswith(kind) and l[len(kind):].isdigit()


def check_item_start(l, kind):
    if '—' in l:
        s, e = l.split('—', 1)
        s_flag = check_single_item(s, kind)
        e_flag = check_single_item(e, kind)
        if not s_flag:
            return False
        if e_flag:
            return True
        else:
            return e.isdigit()
    else:
        return check_single_item(l, kind)


def parse_cnt_page(sec):
    curr_cnt = 0
    curr_index = 0
    bl_ret = defaultdict(int)
    wzz_ret = defaultdict(int)

    curr_kind = None
    curr_ret = None
    bl, wzz = '病例', '无症状感染者'
    prev = ''
    
    for n, l in enumerate(sec):
        processed = False
        for i in [l, prev+l] if prev else [l]:
            kind = None
            if check_item_start(i, bl):
                assert curr_kind is None
                kind = bl
                curr_ret = bl_ret
            elif check_item_start(i, wzz):
                assert curr_kind is None
                kind = wzz
                curr_ret = wzz_ret
            elif i.startswith('居住于'):
                if curr_cnt == 0:
                    print(f'missing on {n}:{i}')
                    prev = ''
                    continue
                # assert curr_cnt != 0
                dist = i[i.index('居住于') + 3:].strip()
                if len(dist) == 0 or dist not in DIST_LIST:
                    break
                curr_ret[dist] += curr_cnt
                curr_cnt = 0
                curr_ret = None
                processed = True
                break

            if kind is not None:
                if '—' in i:
                    assert curr_cnt == 0
                    b, e = i.split('—')
                    ind_b, ind_e = get_cnt_index(b, kind), get_cnt_index(e, kind)
                    curr_cnt = ind_e - ind_b + 1
                else:
                    curr_cnt += 1
                processed = True
                break
        prev = '' if processed else i
            
    return bl_ret, wzz_ret


def create_cnt_df(bl_ret, wzz_ret, date):
    bl_ret['Kind'] = 'BL'
    wzz_ret['Kind'] = 'WZZ'
    return pd.concat([pd.DataFrame(bl_ret, index=[date]), pd.DataFrame(wzz_ret, index=[date])])


def grab_infect_count(url):
    text = request(url)
    title_str = text[text.index('<title>')+len('<title>'):text.index('</title>')]
    d, bl, wzz = parse_cnt_title_str(title_str)
    s = convert_html_to_string(text)
    sec = filter_cnt_string(s)
    bl_ret, wzz_ret = parse_cnt_page(sec)
    df = create_cnt_df(bl_ret, wzz_ret, d)
    _bl, _wzz = df.set_index('Kind').T.sum()
    if _bl != bl or _wzz != wzz:
        raise Exception(f'Parsed unmatched count for {d.strftime("%Y-%m-%d")}:{url}')
    return df


### 抓取包含疫情数据的页面链接

In [None]:
%%time

# 从卫健委网站新闻发布索引页抓取数据页面链接
if processing_infect or processing_cnt:
    try:
        indexed_infect, indexed_cnt = get_index_report()
    except Exception as e:
        print(f'processing index page failed with {e}')
        publish_metrics('Infect', 'update_status', 0)
        publish_metrics('Count', 'update_status', 0)
        

# 从卫健委网站搜索页抓取感染小区数据页面链接
if processing_infect:
    try:
        infect_dict = get_manual_config(CONFIG_FILE)
        infect_dict.update(get_search_report_dict())
        infect_dict.update(indexed_infect)
        print('processing infect:')
        display(infect_dict)
    except Exception as e:
        print(f'processing infect dict failed with {e}')
        publish_metrics('Infect', 'update_status', 0)
        

# 从卫健委网站搜索页抓取感染人数数据页面链接
if processing_cnt:
    try:
        cnt_dict = get_manual_config(CNT_CONFIG_FILE)
        cnt_dict.update(get_search_cnt_dict())
        cnt_dict.update(indexed_cnt)
        print('processing cnt:')
        display(cnt_dict)
    except Exception as e:
        print(f'processing count dict failed with {e}')
        publish_metrics('Count', 'update_status', 0)
        

### 抓取疫情数据

#### 抓取小区感染数据

In [None]:
%%time
# 抓取感染小区页面数据，仅抓取更新数据
inf_data_changed = False
if processing_infect:
    dates = all_data.Date.unique()
    inf_data = [all_data]
    for d, url in infect_dict.items():
        if d in dates:
            continue
        print('processing infect data for {}'.format(d.strftime('%Y-%m-%d')))
        try:
            adjust_data = get_adjust_data(d)
            df = create_inf_df(parse_infect_page(url, adjust_data))
            df['Date'] = d
            inf_data.append(df)
            inf_data_changed = True
        except Exception as e:
            print(f'processing data error for {d.strftime("%Y-%m-%d")} as {e}')
            publish_metrics('Infect', 'update_status', 0)
    all_data = pd.concat(inf_data, ignore_index=True)

In [None]:
max_date = all_data.Date.max()

In [None]:
max_date

#### 抓取微博数据
后期官方会先在“上海发布”微信公众号和“上海发布”微博发布最新小区感染数据。过数小时以后，微信公众号的文章链接会出现在官方网站上。为第一时间更新数据，本站选择的抓取微博的数据。<br>
一般来说，微信公众号的发布时间会比微博早半个小时左右。但由于微信把官方公开信息也当他家宝贝，捂得非常紧（当然，花钱是可以买的），我没有时间和精力去破解，最后我还是选择从微博上抓取数据。<br>
微博本身也有防抓取的措施，本人相信如果微博也想像微信一样紧紧捂住数据是完全有能力办到的。但微博还是颇有“良心”地留下了少量抓取的口子。所以在这里，我就不公开抓取的代码了。

In [None]:
%run grab_weibo_data.ipynb

In [None]:
WEIBO_METRICS = 'weibo_import_status'

if processing_infect:
    try:
        weibo_data = grab_from_weibo(max_date)
        if not weibo_data.empty:
            print('import weibo data on {}'.format(weibo_data.Date.unique()))
            all_data = pd.concat([all_data, weibo_data], ignore_index=True)
            inf_data_changed = True
            max_date = all_data.Date.max()
            print(f'max_date updated to {max_date}')
            publish_metrics('Infect', WEIBO_METRICS, 2)
        else:
            publish_metrics('Infect', WEIBO_METRICS, 1)
    except Exception as e:
        print(f'failed to capture data from weibo, not end of the world. {e}')
        publish_metrics('Infect', WEIBO_METRICS, 0)

In [None]:
%%time
# 官方数据偶尔会将行政区加到地址前面，为统一数据格式，这里同一去除行政区
for d in DIST_LIST:
    all_data.loc[all_data.Community.str.startswith(d), 'Community'] = all_data.Community.str.slice(len(d))

#### 抓取感染人数数据

In [None]:
%%time
cnt_data_changed = False
need_restart = inf_data_changed
if processing_cnt:
    cnt_dates = cnt_data.index
    cnt_data_list = [cnt_data]
    for d, url in cnt_dict.items():
        if d in cnt_dates or d < dt.date(2022, 3, 18):
            continue
        print('processing cnt data for {}'.format(d.strftime('%Y-%m-%d')))
        try:
            df = grab_infect_count(url)
            if d != df.index.unique()[0]:
                raise Exception(f'processing data error for {d.strftime("%Y-%m-%d")}')
            cnt_data_list.append(df)
            cnt_data_changed = True
            if max_date >= d:
                need_restart = True
        except Exception as e:
            print(f'processing data error for {d.strftime("%Y-%m-%d")} as {e}')
            publish_metrics('Count', 'update_status', 0)
    cnt_data = pd.concat(cnt_data_list)
    cnt_data.index.name = 'Date'

## 数据保存

In [None]:
if inf_data_changed:
    print(f'updating infect file at {dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}')
    with open(PICK_FILE, 'wb') as f:
        pickle.dump(all_data, f)
    os.system(f'chmod a+r {PICK_FILE}')
    publish_metrics('Infect', 'update_status', 2)
elif processing_infect:
    publish_metrics('Infect', 'update_status', 1)
    

if cnt_data_changed:
    print(f'updating cnt file at {dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}')
    with open(CNT_FILE, 'wb') as f:
        pickle.dump(cnt_data, f)
    os.system(f'chmod a+r {CNT_FILE}')
    publish_metrics('Count', 'update_status', 2)
elif processing_cnt:
    publish_metrics('Count', 'update_status', 1)

In [None]:
need_restart

## 更新Redis数据

In [None]:
def insert_to_redis(df):
    key = df.Community.unique()[0]
    df['Date'] = pd.to_datetime(df.Date).dt.strftime('%Y/%-m/%-d')
    REDIS.set(key, df.to_csv(index=False, header=False))

In [None]:
if need_restart:
    REDIS = redis.Redis(host='localhost', port=6379, db=0) 

In [None]:
%%time
if inf_data_changed:
    print('refresh redis infect data')
    all_data.groupby('Community').apply(insert_to_redis)

In [None]:
%%time
if need_restart:
    dist_summary = all_data[
        all_data.Date.isin(sorted(all_data.Date.unique())[-5:])
    ].groupby(['Dist', 'Date']).size().rename('Counts'
    ).sort_index().reset_index().set_index(['Dist', 'Date']).unstack().fillna(0).astype(int)
    dist_summary = pd.concat([dist_summary, dist_summary.sum().rename('全市').to_frame().T])

    dist_summary.columns = [d[1].strftime('%-m月%-d日') for d in dist_summary.columns]
    dist_summary.index.name = None
    dist_summary = dist_summary.sort_index().style.background_gradient(
        axis=None, cmap='Oranges', high=0.85, text_color_threshold=0, subset=(
            [i for i in dist_summary.index if i != '全市'], dist_summary.columns))
    dist_summary = dist_summary.background_gradient(
        axis=None, cmap='Blues', high=0.85, low=0.25, text_color_threshold=0,
        subset=(['全市'], dist_summary.columns))
    REDIS.set('dist_summary', dist_summary.to_html())

In [None]:
%%time
if need_restart:
    df = cnt_data.loc[cnt_data.index.unique().sort_values(
        ascending=False)[:5]].fillna(0)
    df['全市'] = df.sum(axis=1)
    df = df.reset_index().set_index(['Date', 'Kind']).unstack(
        ).T.fillna(0).astype(int)

    df.columns = [d.strftime('%-m月%-d日') for d in df.columns]
    df.index.names = (None, None)
    df_log = df.applymap(lambda x: log(x) if x != 0 else 0)
    styled_df = df.sort_index().style.background_gradient(
        axis=None, cmap='Oranges', text_color_threshold=0,
        subset=([i for i in df.index if i[1] == 'BL' and i[0] != '全市'], df.columns),
        gmap=df_log, high=0.85).background_gradient(
            axis=None, cmap='Blues', text_color_threshold=0,
            subset=([i for i in df.index if i[1] == 'WZZ' and i[0] != '全市'], df.columns),
            gmap=df_log, high=0.85).background_gradient(
                axis=None, cmap='Purples', text_color_threshold=0,
                subset=([i for i in df.index if i[1] == 'BL' and i[0] == '全市'], df.columns),
                high=0.85, low=0.25).background_gradient(
                    axis=None, cmap='Greens', text_color_threshold=0,
                    subset=([i for i in df.index if i[1] == 'WZZ' and i[0] == '全市'], df.columns),
                    high=0.85, low=0.25).format_index(
                        formatter=lambda x: '确诊' if x == 'BL' else '无症状', level=1)
    REDIS.set('cnt_summary', styled_df.to_html())

In [None]:
if need_restart:
    REDIS.set('updated_date', max_date.strftime('%Y年%-m月%-d日'))

## 重启服务
为加快服务器应答速度，减少服务器端压力，本站尽量将低频更新页面提前计算好并常驻内存。所以，数据更新后，低频更新页面要重新计算加载，需要重启服务。

In [None]:
import os
if need_restart:
    print(f'restarting gunicorn instance at {dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}')
    os.system('systemctl restart gunicorn')
    print(f'shutdown nginx at {dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}')
    os.system('systemctl stop nginx')
    print(f'clear nginx cache at {dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}')
    os.system('rm -rf /var/lib/nginx/cache/*')
    print(f'start nginx at {dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}')
    os.system('systemctl start nginx')
    print(f'system restarted at {dt.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")}')

## 发送服务状态邮件通知

In [None]:
%run notify.ipynb

## 发送prometheus监控数据

In [None]:
publish_metrics('Infect', 'updated_date', int(dt.datetime.combine(max_date, dt.time()).timestamp()))
publish_metrics('Count', 'updated_date', int(dt.datetime.combine(cnt_data.index.max(), dt.time()).timestamp()))