In [None]:
import io
import os
import time
import pathlib
from urllib.parse import unquote
from collections import Counter

import pandas as pd

import requests
from concurrent.futures import ThreadPoolExecutor
from requests_futures.sessions import FuturesSession

## GLOBAL OPTIONS

In [None]:
WORKER_THREADS = 50
MIN_WORKER_THREADS = 5
CHECK_INTERVAL = 120
STALLED_LIMIT = 10

---
## INIT

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
base = 'https://emir.palyazat.gov.hu/nyertes/'
url = 'https://emir.palyazat.gov.hu/nyertes/index.php'

In [None]:
query = {
    'node': 'export',
    'forras': '1420',      # SZ2020
    'op_type': 'op_nev',   # op_type
    'op_nev': '1382',      # OP program
    'eupik_nev': '180100', # tamogatasi konstrukcio
    'palyazo_nev': '',
    'regio': '0',
    'megye': '0',
    'kisterseg': '0',
    'helyseg': '0',
    'ttipus': '',
    'tkod': '',
    'ttype': '',
    'print': '0',
    'export': '1',
    'id_szerv': '0',
    'sort': 'asc',
    'order': 'NEV',
    'page': '1',
    'rows': '10000',
}

In [None]:
codes = pd.read_excel('./data/CODEMAPPING.xlsx')
codes.columns = ['source_code', 'source_name', 'op_type', 'op_code', 
                 'op_name', 'eupik_code', 'eupik_name']

codes['source_code'] = codes['source_code'].astype(str)
codes['op_code'] = codes['op_code'].astype(str)
codes['op_type'] = codes['op_type'].astype(str)
codes['eupik_code'] = codes['eupik_code'].astype(str)

codes['index'] = codes.source_code.str.cat([codes.op_code, codes.eupik_code], '_')

In [None]:
def generate_query(query, source_code, op_code, op_type, eupik_code):
    filled = query.copy()
    filled['forras'] = source_code
    filled['op_nev'] = op_code
    filled['op_type'] = op_type
    filled['eupik_nev'] = eupik_code
    return filled


def download_one(session, index, row):
    params = generate_query(query, row.source_code, row.op_code, row.op_type, row.eupik_code)
    return {
        'index': index,
        'resp': session.get(url, params=params, verify=False, 
                            background_callback=export_result)
    }


def download_all(session, codes):
    return [download_one(session, index, row)
            for index, row in codes.iterrows()]


def check_status(responses, stalled_list):
    numfinished = sum([resp['resp'].done() for resp in responses])
    numrunning = sum([resp['resp'].running() for resp in responses])
    for response in responses:
        index = response['index']
        resp = response['resp']
        if resp.done():
            stalled_list = [download for download in stalled_list 
                            if not download == index]
        elif resp.running():
            stalled_list.append(index)
    
    return numfinished, numrunning, stalled_list


def restart_download(session, codes, download):
    index = download['index']
    row = codes.loc[download['index']]
    download['resp'].cancel()
    return download_one(session, index, row)


def handle_stalled(responses, stalled_list, session, codes):
    targets = [download for download, cnt in Counter(stalled_list).items() 
               if cnt > STALLED_LIMIT - 1]
    stalled_list = [download for download in stalled_list 
                    if download not in targets]
    print('Handling stalled downloads: {}'.format(len(targets)))

    responses = [restart_download(session, codes, response)
                 if response['index'] in targets 
                 else response
                 for response in responses]
                    
    return responses, stalled_list


def export_result(sess, resp):
    query_param = dict([p.split('=') 
                        for p in resp.request.path_url.split('&') 
                        if p.startswith('forras') 
                        or p.startswith('op_nev') 
                        or p.startswith('op_type') 
                        or p.startswith('eupik_nev')])
    df = pd.read_csv(io.StringIO(resp.content.decode('latin1')), sep=';')
    
    
    source_code = query_param['forras']
    op_code = query_param['op_nev']
    op_type = query_param['op_type']
    eupik_code = query_param['eupik_nev']
    
    if len(df) > 0:
        df['source_code'] = str(source_code)
        df['op_code'] = str(unquote(op_code))
        df['op_type'] = str(op_type)
        df['eupik_code'] = str(eupik_code)

        dirname = f'./data/projects/{source_code}/{op_code}'
        if not os.path.exists(dirname):
            os.makedirs(dirname)
        filename = dirname + f'/{eupik_code}.csv'

        df.to_csv(filename, index=False)
    else:
        with open('./errors.txt', 'a') as f:
            f.write(f'{source_code}_{op_type}_{unquote(op_code)}_{eupik_code}\n')

---
## DOWNLOAD

In [None]:
def filter_finished(codes, data_dir):
    print('Filtering finished downloads from codes df... ', end='')
    files = [{'index': '{}_{}_{}'.format(source_code, str(unquote(op_code)), eupik_code[:-4]),
              'source_code': str(source_code),
              'op_code': str(unquote(op_code)),
              'eupik_code': str(eupik_code[:-4]),
              'df': pd.read_csv(f'{data_dir}{source_code}/{op_code}/{eupik_code}')}
         for source_code in os.listdir(data_dir)
         for op_code in os.listdir(data_dir + source_code)
         for eupik_code in os.listdir(data_dir + source_code + '/' + op_code)]
    
    dfs = []
    for data in files:
        df = data['df']
        df.columns = [col.strip() for col in df.columns]
        df['source_code'] = str(data['source_code'])
        df['op_code'] = str(data['op_code'])
        df['eupik_code'] = str(data['eupik_code'])
        df['index'] = data['index']
        df['done'] = True
        dfs.append(df)
        
    merged = pd.merge(left=codes, right=pd.concat(dfs, ignore_index=True), 
                      on='index', how='left')
        
    indices = merged.loc[~merged.done.fillna(False), 'index'].unique()
    print('done.')
    return codes.loc[codes['index'].isin(indices)]

In [None]:
def start(codes):
    print('session init... ', end='')
    session = requests.session()
    _ = session.get(base, verify=False)
    session = FuturesSession(session=session, 
                             executor=ThreadPoolExecutor(max_workers=WORKER_THREADS))
    print('done. starting downloads... ', end='')
    responses = download_all(session, codes)
    print('started.')
    return session, responses


def restart(session, codes, responses, data_dir):
    print('shutting down running downloads... ', end='')
    for response in responses:
        response['resp'].cancel()
    session.close()
    print('shutdown complete, starting new session...')
    return start(filter_finished(codes, data_dir))

In [None]:
session, responses = start(filter_finished(codes, './data/projects/'))

In [None]:
finished = False
stalled = []
while not finished:
    numfinished, numrunning, stalled = check_status(responses, stalled)
    if numfinished == len(responses):
        finished = True
        continue

    responses, stalled = handle_stalled(responses, stalled, session, codes)
    
    print(f'{numfinished / len(responses) * 100:.2f}% done. '
          f'[{numfinished}/{len(responses)}] '
          f'- {numrunning} active downloads')
    
    if numrunning < min(len(responses) - numfinished, MIN_WORKER_THREADS):
        session, responses = restart(session, codes, responses, './data/projects/')
    
    time.sleep(CHECK_INTERVAL)

---
# Merging

In [None]:
import os
import pandas as pd
from urllib.parse import unquote

In [None]:
data_dir = './data/'
files = [{'index': '{}_{}_{}'.format(source_code, str(unquote(op_code)), eupik_code[:-4]),
          'source_code': str(source_code),
          'op_code': str(unquote(op_code)),
          'eupik_code': str(eupik_code[:-4]),
          'df': pd.read_csv(f'{data_dir}{source_code}/{op_code}/{eupik_code}')}
         for source_code in os.listdir(data_dir)
         for op_code in os.listdir(data_dir + source_code)
         for eupik_code in os.listdir(data_dir + source_code + '/' + op_code)]

In [None]:
codes = pd.read_excel('./CODEMAPPING.xlsx')
codes.columns = ['source_code', 'source_name', 'op_type', 'op_code', 
                 'op_name', 'eupik_code', 'eupik_name']

codes['source_code'] = codes['source_code'].astype(str)
codes['op_code'] = codes['op_code'].astype(str)
codes['eupik_code'] = codes['eupik_code'].astype(str)

codes['index'] = codes.source_code.str.cat([codes.op_code, codes.eupik_code], '_')

In [None]:
dfs = []
for data in files:
    df = data['df']
    df.columns = [col.strip() for col in df.columns]
    df['source_code'] = str(data['source_code'])
    df['op_code'] = str(data['op_code'])
    df['eupik_code'] = str(data['eupik_code'])
#    df['index'] = data['index']
    dfs.append(df)

In [None]:
merged = pd.merge(left=codes, right=pd.concat(dfs, ignore_index=True), 
                  on=['source_code', 'op_code', 'eupik_code'], how='right')

In [None]:
merged.shape

In [None]:
merged.to_csv(data_dir + 'merged.csv', index=False)

In [None]:
merged.to_excel(data_dir + 'merged.xlsx', index=False)