In [None]:
#This program is an air quality monitoring data post-processing and analysis routine
#prepared by Environmental Defense Fund China.
#For details on how to use this program refer to the doc/ folder in each root
#subfolder.
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.   This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#GNU General Public License for more details at root level in LICENSE.txt
#or see http://www.gnu.org/licenses/.

import pandas as pd
import geopandas as gpd
import numpy as np
import glob
from datetime import datetime, timedelta
from calendar import monthrange, month_abbr
from functools import reduce
import scipy.stats as stats
from bs4 import BeautifulSoup
import requests
import os
import time
from shapely import wkt
import random
from requests.exceptions import ChunkedEncodingError, ConnectionError, ConnectTimeout
from apscheduler.schedulers.blocking import BlockingScheduler
import shutil
import warnings
import sqlite3
from scipy.stats import zscore
from io import StringIO
warnings.filterwarnings("ignore")
os.chdir(os.path.dirname(os.path.expanduser('dir_path')))
import upysal as ps

In [None]:
def download_mobile(date):
    url = 'this shoule be your path' # mobile data download path
    with requests.Session() as session:
        r = session.get(url)
        df = pd.read_csv(StringIO(str(r.content, 'utf-8')))
    return (df)

def reformat_mobile(df):
    grid100_gdf  = gpd.read_file('./dataset' + "/shp/grid100_shp.shp")
    df = df.rename(
        {
            "car_no": "taxi_id",
            "lat": "lat",
            "lng": "long",
            "speed": "velocity",
            "time": "timestamp"
        },
        axis=1)
    gdf = gpd.GeoDataFrame(df,
                           geometry=gpd.points_from_xy(df.long, df.lat),
                           crs=('epsg:4326'))
    df = gpd.sjoin(gdf,
                   grid100_gdf[["grid_id", "geometry"]],
                   how='left',
                   op='within')
    df = df.drop(["index_right", "geometry", "enable_level"], axis=1)
    df = df[~df["grid_id"].isnull()]
    df['taxi_id'] = df['taxi_id'].str.replace('冀', '')
    df = df.dropna(subset=["pm10", "pm25"], how='all')
    df = time_columns(df)
    return (df)


def time_columns(df):
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    df['hour'] = pd.DatetimeIndex(df['timestamp']).hour
    df['month'] = pd.DatetimeIndex(df['timestamp']).month
    df['year'] = pd.DatetimeIndex(df['timestamp']).year
    df['day'] = pd.DatetimeIndex(df['timestamp']).day
    return df


def agg_mobile_hourly(df):
    df_h = df.groupby(['grid_id', 'year', 'month',
                       'day'])['hour'].nunique().reset_index(name='mobile_hour_count')
    device_h_df = df.groupby(
        ['grid_id', 'year', 'month', 'day', "hour",
         'taxi_id']).agg(median_pm25=("pm25", 'median'),
                         median_pm10=("pm10", 'median')).reset_index()
    geohash_h_df = device_h_df.groupby(
        ['grid_id', 'year', 'month', 'day',
         'hour']).agg(pm25=("median_pm25", 'median'),
                      pm10=("median_pm10", 'median'),
                      taxi_count=('taxi_id', 'nunique')).reset_index()
    geohash_h_df = pd.merge(geohash_h_df,
                            df_h,
                            on=['grid_id', 'year', 'month', 'day'],
                            how="left")
    return geohash_h_df


def calculate_mobile_daily(df):
    df_h = df.assign(date=pd.to_datetime(
        df.loc[:, ['year', 'month', 'day']]).dt.strftime('%Y-%m-%d'))
    df_d = df_h[(df_h['mobile_hour_count'] >= 8)].groupby(["date", "grid_id"]).agg({
                    'pm10': [("pm10", 'mean')],
                    'pm25': [("pm25", 'mean')]
                })
    df_d.columns = df_d.columns.droplevel(0)
    df_d = df_d.reset_index()
    return df_d

def calculate_merged_daily(df):
    df_h = df.assign(date=pd.to_datetime(
        df.loc[:, ['year', 'month', 'day']]).dt.strftime('%Y-%m-%d'))
    df_d = df_h[(df_h['mobile_hour_count'] >= 8) |
                (df_h['fixed_size'] > 0)].groupby(["date", "grid_id"]).agg({
                    'pm10': [("pm10", 'mean')],
                    'pm25': [("pm25", 'mean')]
                })
    df_d.columns = df_d.columns.droplevel(0)
    df_d = df_d.reset_index()
    return df_d


def concat_dfs(filenames):
    df_l = []
    for fn in filenames:
        tdf = pd.read_csv(fn)
        df_l.append(tdf)
    df = pd.concat(df_l, ignore_index=False)
    return df


def calculate_merged_ratio(date, var='pm25'):
    filenames = getLast30DaysData(date, 'merged')
    so_df = pd.read_csv('./dataset' + '/sources/sources_list.csv')
    df = concat_dfs(filenames)
    so_l = so_df['grid_id'].unique()
    df_s = df[df['grid_id'].isin(so_l)]
    df_s = df_s.assign(
        timestamp=pd.to_datetime(df_s.loc[:, ['year', 'month', 'day', 'hour']]
                                 ).dt.strftime('%Y-%m-%d %H:%M:%S'))

    filenames = getLast30DaysData(date, 'regular')
    r_df = concat_dfs(filenames)
    r_df = r_df.groupby(['timestamp']).agg({
        'pm10': [("pm10_reg", 'mean')],
        'pm25': [("pm25_reg", 'mean')]
    })
    r_df.columns = r_df.columns.droplevel(0)
    r_df = r_df.reset_index()

    df_s = pd.merge(df_s, r_df, on=['timestamp'], how="left")
    df_s = pd.merge(df_s, so_df, on=['grid_id'], how="left")
    df_s = df_s.assign(ratio=df_s['pm25'] / df_s['pm25_reg'])
    return df_s

def get_previous_n_dates(date, n=30):
    n = n - 1
    b = datetime.strptime(date, '%Y-%m-%d')
    a = b - timedelta(days=n)
    l = [d.strftime('%Y-%m-%d') for d in pd.date_range(a, b)]
    return l

def getLast30DaysData(date, file_type):
    if file_type == "merged":
        hs_filenames = glob.glob("./daily_workflow/data/merged/" +
                                 "merged_hourly_*.csv")
        hs_filenames = sorted(hs_filenames)
    if file_type == "regular":
        hs_filenames = glob.glob("./daily_workflow/data/regular/" +
                                 "regular_hourly_2*.csv")
        hs_filenames = sorted(hs_filenames)
    l = [d.replace('-', '_') for d in get_previous_n_dates(date)]
    filenames = []  # dict store our results
    for filename in hs_filenames:
        for keyword in l:
            if keyword in filename:
                filenames.append(filename)
    return filenames


def data_not_avail(date):
    filenames = getLast30DaysData(date, 'merged')
    df = concat_dfs(filenames)
    df = df.assign(date=pd.to_datetime(df.loc[:, ['year', 'month', 'day']]))
    df = df.groupby(['grid_id'
                     ])['date'].nunique().reset_index(name='count_num')
    df = df[df['count_num'] <= 3]
    return list(df['grid_id'].unique())


def clean_for_moran(df_d, var="pm25"):
    if (var in ["pm25", 'pm10', 'pm25_ratio', 'pm10_ratio', 'pp_ratio']):
        df_d = df_d.dropna(subset=[var], how='any')
        df_d = df_d.reset_index(drop=True)
    elif (var in ["o3", "so2", "co", "no2", "o3_8h"]):
        searchfor = ["names of regular station"]
        df_d = df_d[~(df_d['name'].isin(searchfor))]
        df_d = df_d.assign(grid_id=map_fixed_grid100(df_d["name"]))
        df_d = df_d.groupby(['grid_id', 'date'])[var].mean().reset_index()
        df_d = df_d.dropna(subset=[var], how='any')
        df_d = df_d.reset_index(drop=True)
    df_d = pd.merge(df_d,
                    grid100_cen[['grid_id', 'long', 'lat']],
                    on=['grid_id'],
                    how="left")
    return df_d[["grid_id", "date", var, "long", "lat"]]

def moran(df_concat, dist_d, var):
    w_s = ps.threshold_continuousW_from_array(df_concat[["long",
                                                         "lat"]].values,
                                              dist_d)  #0.001° =111 m
    w_s.remap_ids(df_concat["grid_id"])
    w_s.transform = 'R'
    lisa = ps.Moran_Local(df_concat[var].values, w_s)
    df_concat.loc[:, 'quadrant'] = lisa.q
    return w_s, df_concat

def find_hh_max(w_s, df_concat, regular_mean, var):
    hh_l = df_concat[(df_concat['quadrant'] == 1)]["grid_id"]
    if len(hh_l) != 0:
        hs_l = []
        for hh_t in hh_l:
            hh_n = list(w_s[hh_t].keys())
            hh_n.append(hh_t)
            hh_df = df_concat[(df_concat['grid_id'].isin(hh_n))][[
                'grid_id', var
            ]]
            hh_s = hh_df.loc[hh_df[var].idxmax() - 1:hh_df[var].idxmax(), :]
            
            if (hh_s[var].iloc[0] >= regular_mean):
                hs_l.append(hh_s)
        if len(hs_l) != 0:
            hs_df = pd.concat(hs_l, ignore_index=True).groupby(
                ["grid_id"]).size().reset_index(name='warning')
        else:
            hs_df = pd.DataFrame()
    else:
        hs_df = pd.DataFrame()
    return hs_df


def run_cluster_outlier_detection(df, regular_mean, var='pm25'):
    if (var in ["pm25", 'pm10', 'pm25_ratio', 'pm10_ratio', 'pp_ratio']):
        df_concat = clean_for_moran(df, var)
        dist_d = 300 / 111 * 0.001
    elif (var in ["o3", "so2", "co", "no2", "o3_8h"]):
        df_concat = clean_for_moran(df, var)
        dist_d = 1500 / 111 * 0.001
    else:
        print("pollutant not supported")
        return
    w_s, lm_r_df = moran(df_concat, dist_d, var)
    mm_df = find_hh_max(w_s, lm_r_df, regular_mean, var)
    if len(mm_df) > 0:
        hs_df = pd.merge(mm_df,
                         df_concat[['grid_id', var, 'date']],
                         on=["grid_id"],
                         how="left")
        hs_df = pd.merge(hs_df,
                         grid100_cen[["grid_id", "long", "lat"]],
                         on=["grid_id"],
                         how="left")
        hs_df = hs_df.rename({var: 'concentration'}, axis=1)
        hs_df['pollutant'] = var
        hs_df['ag_id'] = 'a2'
    else:
        hs_df = pd.DataFrame()
    return hs_df

def get_regular_hourly_mean(regular_df, var):
    regular_df = regular_df.assign(
        time=pd.to_datetime(regular_df['timestamp']))
    regular_df['hour'] = pd.DatetimeIndex(regular_df['time']).hour
    regular_df['day'] = pd.DatetimeIndex(regular_df['time']).day
    regular_df['month'] = pd.DatetimeIndex(regular_df['time']).month
    regular_df = regular_df.groupby(
        ['month', 'day', 'hour'])[var].mean().rename(var + "_r").reset_index()
    return regular_df


def fill_max_hour(hs_df_pm25, df_h, regular_df_h, var):
    pm_df_h = df_h[df_h["grid_id"].isin(hs_df_pm25["grid_id"])]
    pm_reg_h = get_regular_hourly_mean(regular_df_h, var)
    pm_df_h = pd.merge(pm_df_h,
                       pm_reg_h,
                       on=['month', 'day', 'hour'],
                       how="left")
    pm_df_h["diff"] = pm_df_h[var] - pm_df_h[var + "_r"]
    max_dic = pm_df_h.sort_values('diff', ascending=False).groupby('grid_id').head(3).sort_values(
            'hour',
            ascending=True).groupby('grid_id')["hour"].apply(list).to_dict()
    for k in list(max_dic.keys()):
        hs_df_pm25.loc[hs_df_pm25["grid_id"] == k,
                       "max_h"] = ",".join("{0}".format(n)
                                           for n in max_dic.get(k))
    if 'district' not in hs_df_pm25.columns:
        hs_df_pm25 = pd.merge(hs_df_pm25,
                              grid100_cen[["grid_id", "district"]],
                              on=["grid_id"],
                              how="left")
    return hs_df_pm25


def fill_fixed_info(hs_df, f_df_d0):
    fixed_grid100 = pd.read_csv('./dataset/fixed/fixed_grid_inbox.csv')
    g = fixed_grid100.groupby('grid_id')
    fixed_grid100_dic = g['name'].apply(lambda s: s.tolist()).to_dict()
    f_df_d = f_df_d0.rename(
        {
            'pm25': 'PM2.5',
            "pm10": 'PM10',
            "no2": 'NO2',
            "so2": 'SO2',
            "co": 'CO',
            "o3": 'O3',
            "o3_8h": "O3_8H"
        },
        axis=1)
    for idx, row in hs_df.iterrows():
        f_l = fixed_grid100_dic.get(row["grid_id"])
        if f_l is None:
            continue
        else:
            aa = f_df_d[f_df_d["name"].isin(f_l)].iloc[:,
                                                       4:11].round(2).dropna(
                                                           axis='columns',
                                                           how="all").mean()
            if len(aa) == 0:
                continue
            else:
                d = aa.round(2).to_dict()
                hs_df.loc[idx, "fixed_name"] = ", ".join(
                    str(x) for x in list(f_df_d[f_df_d["name"].isin(
                        f_l)].dropna(axis='columns', how="all")["name"]))
                hs_df.loc[idx, "fixed_info"] = ", ".join(
                    ("{}: {}".format(*i) for i in d.items()))
    return hs_df


def limit_hs_per_district(hs_df, num_thresh=6):
    hs_df = pd.merge(hs_df,
                     grid100_cen[["grid_id", "district"]],
                     on=["grid_id"],
                     how="left")
    df_r_ss = hs_df.groupby(['district'])['grid_id'].count()
    if (df_r_ss.max() > num_thresh):
        print(', '.join(df_r_ss[df_r_ss > num_thresh].index) +
              ' hotspots number exceed ' + str(num_thresh))
        hs_df = hs_df.sort_values(
            ['warning'],
            ascending=[False]).groupby('district').head(num_thresh)
    return hs_df


def sample_by_district(df, n_sample):
    if 'district' not in df.columns:
        df = pd.merge(df,
                  grid100_cen[['grid_id', 'district']],
                  on=['grid_id'],
                  how='left')
    df = df.groupby("district").apply(lambda x: x.sample(
        n=n_sample) if x.shape[0] >= n_sample else x).reset_index(drop=True)
    return df


def filter_hs_to_push(hs_df_a2, hs_df_a3):
    if len(hs_df_a3) > 0:
        hs_df_a2 = hs_df_a2[~hs_df_a2['grid_id'].isin(hs_df_a3['grid_id'])]
        hs_df_a3 = limit_hs_per_district(hs_df_a3)
    hs_df_a2 = sample_by_district(hs_df_a2, 10)
    hs_df = pd.concat([hs_df_a3, hs_df_a2], ignore_index=True)
    hs_df = fill_hotspots_uid(hs_df)
    return hs_df

def fill_hotspots_uid(df):
    '''
    hs uis dertermined by pollutant, ag_id, date and grid
    '''
    p_dic = dict(zip(['PM2.5','O3_8H','pm25','o3_8h'], [1,2,1,2]))
    a_dic = dict(zip(['a1','a2','a3'], [1,2,3]))
    df = df.assign(date=pd.to_datetime(df['date'].str.strip(), format='%Y-%m-%d'))
    df['uid'] = df['pollutant'].map(p_dic).astype(str) + df['ag_id'].map(a_dic).astype(str) + df['date'].dt.strftime('%Y%m%d').astype(str)  + df.grid_id.str.replace('T','').str.replace('_','')
    df['uid'] = df['uid'].astype(int)
    df['date']=df['date'].dt.strftime('%Y-%m-%d')
    return df

In [None]:
def fill_max_hour(hs_df_pm25, df_h, regular_df_h, var):
    '''
    Dependency
    ----------
    get_regular_hourly_mean

    Paramater
    ---------
    hs_df_pm25    :    hotspot 
    df_h          :    merged hourly data
    regular_df_h  :    hourly regular data
    var           :    pollutant type 
    '''
    pm_df_h = df_h[df_h["grid_id"].isin(hs_df_pm25["grid_id"])]
    pm_reg_h = get_regular_hourly_mean(regular_df_h, var)
    pm_df_h = pd.merge(pm_df_h,
                       pm_reg_h,
                       on=['month', 'day', 'hour'],
                       how="left")
    pm_df_h["diff"] = pm_df_h[var] - pm_df_h[var + "_r"]
    max_dic = pm_df_h.sort_values(
        'diff', ascending=False).groupby('grid_id').head(3).sort_values(
            'hour',
            ascending=True).groupby('grid_id')["hour"].apply(list).to_dict()
    for k in list(max_dic.keys()):
        hs_df_pm25.loc[hs_df_pm25["grid_id"] == k,
                       "max_h"] = ",".join("{0}".format(n)
                                           for n in max_dic.get(k))
    if 'district' not in hs_df_pm25.columns:
        hs_df_pm25 = pd.merge(hs_df_pm25,
                              grid100_cen[["grid_id", "district"]],
                              on=["grid_id"],
                              how="left")
    return hs_df_pm25


def fill_fixed_info(hs_df, f_df_d0):
    fixed_grid100 = pd.read_csv('./data/fixed/fixed_grid_inbox.csv')
    g = fixed_grid100.groupby('grid_id')
    fixed_grid100_dic = g['name'].apply(lambda s: s.tolist()).to_dict()
    f_df_d = f_df_d0.rename(
        {
            'pm25': 'PM2.5',
            "pm10": 'PM10',
            "no2": 'NO2',
            "so2": 'SO2',
            "co": 'CO',
            "o3": 'O3',
            "o3_8h": "O3_8H"
        },
        axis=1)
    for idx, row in hs_df.iterrows():
        f_l = fixed_grid100_dic.get(row["grid_id"])
        if f_l is None:
            continue
        else:
            aa = f_df_d[f_df_d["name"].isin(f_l)].iloc[:,
                                                       4:11].round(2).dropna(
                                                           axis='columns',
                                                           how="all").mean()
            if len(aa) == 0:
                continue
            else:
                d = aa.round(2).to_dict()
                hs_df.loc[idx, "fixed_name"] = ", ".join(
                    str(x) for x in list(f_df_d[f_df_d["name"].isin(
                        f_l)].dropna(axis='columns', how="all")["name"]))
                hs_df.loc[idx, "fixed_info"] = ", ".join(
                    ("{}: {}".format(*i) for i in d.items()))
    return hs_df


def reGeocode(e_df):
    web = 'http://restapi.amap.com/v3/geocode/regeo?output=xml&location='
    key = '&key=your_key' #insert your gaode api key
    other = "&radius=100&batch=false&roadlevel=0&homeorcorp=0"
    for idx, row in e_df.iterrows():
        time.sleep(1)
        [lon, lat] = [row["long"], row["lat"]]
        url = web + str(lon) + "," + str(lat) + key + other
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'lxml')
        address = soup.find("formatted_address").text
        if address is None:
            continue
        else:
            print(address)
    return e_df


def reformat_to_upload(df):
    df = df.assign(concentration=df["concentration"].round(2))
    df = reGeocode(df)
    df = df.sort_values(['long', 'lat'], ascending=[True, True])
    df['hotspotID'] = [
        "R" + str(s) for s in list(df.groupby('district').cumcount().add(1))
    ]
    # add date
    df = df.assign(date=date)
    cols = [
        "hotspotID", 'date', 'district', 'grid_id', 'pollutant',
        'concentration', 'warning', 'address', 'ag_id', 'long', 'lat', 'uid'
    ]
    df = df[cols]
    return df

In [None]:
def get_past_sev_dates(date):
    a = datetime.strptime(date, '%Y-%m-%d')
    b = a - timedelta(days=6)
    l = [d.strftime('%Y-%m-%d') for d in pd.date_range(b, a)]
    return l


def fill_district_id(df, col="district"):
    # called by nova api
    if ((df[col] == 'cx')):
        return 'd1'
    elif ((df[col] == 'yhq')):
        return 'd2'
    elif ((df[col] == 'xhq')):
        return 'd3'
    elif ((df[col] == 'gxq')):
        return 'd4'
    elif ((df[col] == 'kfq')):
        return 'd5'


def prep_hs_to_upload(date):
    hs_filenames = glob.glob(data_path + "/results/" +
                             "hotspot_20*.csv")
    hs_filenames = sorted(hs_filenames)
    l = get_past_sev_dates(date)
    l = [d.replace("-", "_") for d in l]
    filenames = []  # dict store our results
    for filename in hs_filenames:
        for keyword in l:
            if keyword in filename:
                filenames.append(filename)
    df_list = []
    for i in range(len(filenames)):
        adf = pd.read_csv(filenames[i])
        if 'ag_id' in adf.columns:
            adf = adf.drop(["ag_id"], axis=1)
        if 'uid' in adf.columns:
            adf = adf.drop(["uid"], axis=1)
        df_list.append(adf)
    df_merged = pd.concat(df_list, ignore_index=True)
    p_dic = dict(zip(['pm25', 'o3_8h'], ['PM2.5', 'O3_8H']))
    df_merged['pollutant'] = df_merged['pollutant'].map(p_dic)
    df_merged['district_id'] = df_merged.apply(fill_district_id, axis=1)
    df_merged['warning'] = df_merged['warning'].replace(0, 1)
    df_merged = df_merged.rename({
        'hotspotID': 'hotspot_id',
        'date': 'time'
    },
                                 axis=1)
    return df_merged


def update_nova_api(date):
    if os.path.isfile(data_path + "/results/" + "hotspot_" +
                      date.replace('-', '_') + ".csv") == True:
        print('Hotspots(' + str(date) +
              ') successfully generated, preparing data to upload...')
        df_merged = prep_hs_to_upload(date)
        conn = sqlite3.connect('../api/data/hotspots/HotspotsDB.db')
        c = conn.cursor()
        c.execute('DROP TABLE IF EXISTS daily_hotspots')
        c.execute(
            'CREATE TABLE daily_hotspots (hotspot_id text, time text,district text, grid_id text, pollutant text, concentration number, warning number, address text, fixed_name text, fixed_info text, max_h text, long number, lat number, district_id text)'
        )
        conn.commit()
        df_merged.to_sql('daily_hotspots',
                         conn,
                         if_exists='replace',
                         index=False)
        print('Hotspots(' + str(date) + ') uploaded to cloud! 🍻')
        print("API data at " + str(date) + " was successfully updated! 🍻")
    else:
        print('warning: Hotspots(' + str(date) + ') NOT generated ☠️, exist.')

In [None]:
def hs_run(m_df, f_df=None, export=False):

    m_df = reformat_mobile(m_df)

    if export == True:
        m_df.to_csv(data_path + "/data/mobile/" + "mobile_" +
                    date.replace('-', '_') + ".csv",
                    index=False,
                    encoding="utf_8_sig")

    m_df_h = agg_mobile_hourly(m_df)

    print('Starting A2 on pm25...')
    if 'reg_df_d' not in locals():
        df_d = calculate_mobile_daily(m_df_h)
        regular_mean = df_d['pm25'].mean()
    else:
        regular_mean = reg_df_d['pm25'].mean()
        df_d = calculate_merged_daily(df_h)
    print(regular_mean)
    so_df = pd.read_csv('./dataset' + '/sources/sources_list.csv')
    df_d = df_d[~df_d['grid_id'].isin(so_df['grid_id'])]  # drop know sources
    hs_df_a2 = run_cluster_outlier_detection(df_d, regular_mean, 'pm25')
    print(hs_df_a2)

    hs_df = sample_by_district(hs_df_a2, 10)
    hs_df = fill_hotspots_uid(hs_df)
    hs_df = reformat_to_upload(hs_df)
    if export == True:
        hs_df.to_csv(data_path + "/results/"+"hotspot_" +
                     date.replace('-','_')+".csv", 
                     index =False, 
                     encoding ='utf_8_sig')
    update_nova_api(date)

In [None]:
def daily_hs_detect():
    global date, data_path, grid100_cen
    data_path = './dataset/daily_workflow'
    grid100_cen = pd.read_csv('./dataset/grid100_centroid.csv')
    date = (datetime.today() - timedelta(1)).strftime('%Y-%m-%d')
    print(date)
#     date = '2021-01-19'
    print(date)
    m_df = download_mobile(date)
#     f_df = download_fixed(date)
    hs_run(m_df, f_df=None, export=True)

In [None]:
sched = BlockingScheduler()
sched.add_job(daily_hs_detect,
    'cron',
    day='*',hour=5,minute=5,second=10,
    misfire_grace_time=3600,
#     next_run_time=datetime(2020,7,6),
    next_run_time=datetime.now(),
    max_instances=6
    )
sched.start()