In [1]:
import os
import gzip
import glob
import time

import requests
import backoff
from requests.exceptions import ConnectionError

from joblib import Parallel, delayed

import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

from slugify import slugify

In [2]:
YEAR = 2022
ANON_NEEDLE = "MAŁE GOSPODARSTWO"
# All amounts in PLN
AMOUNT_CUTOFF = [999_999_999, 10_000_000, 150_000, 125_000, 120_000, 67000, 62000, 50_000, 32_000, 31_000, 16_900, 10750, 3000,  0, -1_0000, -10_000, -9_999_999]
AMOUNT_STEPS  = [None       , 20_000_000,   5_000,   1_000,   1_000, 1_000,   100,    100,     10,     10,     10,    10,   10,  1,      10,     100, 100_000]

def get_ranges():
    last_top = None
    for top, step in zip(AMOUNT_CUTOFF, AMOUNT_STEPS):
        if last_top is not None:
            yield from reversed(range(top, last_top, step))
        last_top = top

In [3]:
RANGES = []
for x in get_ranges():
    if x != 0:
        RANGES.append(x)
    else:
        # 0 is not working so make sure to make a range around it
        RANGES.append(-1)

In [4]:
[r for r in RANGES if -100 < r < 5]

[4, 3, 2, 1, -1, -10, -20, -30, -40, -50, -60, -70, -80, -90]

In [5]:
len(RANGES)

11073

In [6]:
SPECIALS = {
    # Lots of people got these values, so do these with postcode search
    2020: [
        120_000,
        64_000,
        48_000,
        20_000,
        12_000,
    ],
    2021: [
        125_000,
        120_000,
        68_000,
        67_000,
        66_000,
        64_000,
        48_000,
        31_434,
        20_000,
        12_000,
        5_682,
        5_680,
        2_225,
        2_010,
        2000,
        1990,
        1970,
        1736,
        1243,
        1154,
        1114,
    ], 
    2022: [
        120_000,
        47_990,
        11_990,
        5_770,
    ] + [48000, 1629, 12000, 1083, 1043, 1222]
}

In [7]:
BASE_URL = 'https://beneficjenciwpr.minrol.gov.pl/search/export/csv/'
BAD_SENTINEL = b'<!DOCTYPE'

In [8]:
def get_url(params):
    return BASE_URL + '/'.join('{k}:{v}'.format(k=k, v=str(v).replace(".", ","))
                               for k, v in params.items() if v is not None) + '/sort:total/direction:asc'

In [None]:
MIN_OFFSET = 0.000001 # because the search is exclusive

class TooMuchException(Exception):
    def __init__(self, amount, url):
        self.amount = amount
        self.url = url

def download(year, amount, total_to, postal=None):
#             http://beneficjenciwpr.minrol.gov.pl/search/index/year:2020/postal:01-*/totalfrom:119999,999/totalto:120000,001/#outrec
    params = {'year': year, 'totalfrom': amount - MIN_OFFSET, 'totalto': total_to}
    # params = {'year': year, 'totalfrom': amount - MIN_OFFSET, 'totalto': total_to + (MIN_OFFSET if total_to == 0 else 0) }
    if postal is not None:
        params.update({'postal': '{}-*'.format(postal)})
    url = get_url(params)
    
    filename = 'data/{year}_{amount}_{total_to}.csv.gz'.format(year=year, amount=amount, total_to=total_to)
    if postal is not None:
        filename = filename.replace('.csv.gz', '_{}.csv.gz'.format(postal))
    if os.path.exists(filename):
        return
    # print(filename, url)

    @backoff.on_exception(backoff.expo,requests.exceptions.RequestException, max_time=60)
    def get_with_backoff(url):
        return requests.get(url)
    
    response = get_with_backoff(url)
    with gzip.open(filename, 'wb') as f:
        if BAD_SENTINEL in response.content:
            raise TooMuchException(amount, url)
        f.write(response.content)
            
    # response = requests.get(url, stream=True)
    # with gzip.open(filename, 'wb') as f:
    #     check = False
    #     for chunk in response.iter_content(chunk_size=1024): 
    #         if chunk:
    #             f.write(chunk)
    #         if BAD_SENTINEL in chunk:
    #             check = True
    #             break
    #     if check:
    #         os.unlink(filename)
    #         print(chunk)
    #         raise TooMuchException(amount, url)
            
def download_with_postal(year, amount, total_to):
    for postal in range(0, 100):
        download(year, amount, total_to, postal='{:0>2}'.format(postal))

                
def start(year, extra_specials=None):
    extra_specials = extra_specials or []
    specials = SPECIALS.get(year, [])
    specials.extend(extra_specials)
    total_to = None
    tasks = []
    for amount in RANGES:
        if total_to is None:
            total_to = amount 
            continue
        specs = [s for s in specials if amount <= s < total_to]
        # if a range is special, then iterate over post codes as well
        if specs:
            tasks.append(delayed(download_with_postal)(year, amount, total_to))
        else:
            tasks.append(delayed(download)(year, amount, total_to))
            
        total_to = amount
    Parallel(n_jobs=2, verbose=10, batch_size=5)(tasks)

extra_specials = []
while True:
    try:
        start(YEAR, extra_specials=extra_specials)
        break
    except ConnectionError as e:
        print(e)
        time.sleep(10)
    except TooMuchException as e:
        print("too much at", e.amount)
        if e.amount in extra_specials:
            print("we give up")
            print(e.url)
            break
        else:
            extra_specials.append(e.amount)
            print("retrying!")
            time.sleep(1)
print(extra_specials)

[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.
[Parallel(n_jobs=2)]: Done  12 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done  27 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done  52 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done  77 tasks      | elapsed:    0.3s
[Parallel(n_jobs=2)]: Done 109 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 144 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 189 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 234 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 289 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 344 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 409 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 474 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 549 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 624 tasks      | elapsed:    0.4s
[Parallel(n_jobs=2)]: Done 709 tasks      | elapsed:    0.4s
[Parallel(

In [None]:
def get_year(year):
    for i, filename in enumerate(tqdm(sorted(glob.glob('data/{}_*.csv.gz'.format(year))))):
        if os.stat(filename).st_size <= 50:
            continue
        if i % 1000 == 0:
            print(filename)
        print(filename)
        df =  pd.read_csv(filename, compression='gzip', encoding='utf-8-sig', sep=';', engine="python", on_bad_lines="warn")
        df['fn'] = filename
        yield df
#         yield pd.DataFrame(poor_csv_parser(filename))

In [None]:
df = pd.concat(get_year(YEAR))
df.head()

In [None]:
dupes = df.duplicated(subset=df.columns.tolist()[:-1], keep=False)

In [None]:
df[dupes].head(50)

In [None]:
df[dupes].sort_values('Imię')

In [None]:
df['Suma'] = df['Suma'].replace(',', '.', regex=True).astype('float')

In [None]:
df_pos = df[df['Suma'] > 0]

In [None]:
df_pos.shape

In [None]:
df.shape

In [None]:
df['Suma'].sum()

In [None]:
df_pos['Suma'].sum()

In [None]:
df[df['Suma'] == 0]

In [None]:
df[df['Nazwisko'] == "1"]

In [None]:
df[df['Nazwisko'] == "141098"]

In [None]:
df[df['Nazwisko'] == "497873"]

In [None]:
df[df['Suma'] < -1].count()

In [None]:
df[df['Suma'] < -1].sort_values('Suma', ascending=False).iloc[101]

In [None]:
list(df.columns)

In [None]:
def apply_fixes(df):
    df = df.rename(columns={
        'Imię': 'recipient_firstname',
        'Nazwisko': 'recipient_lastname',
        'Nazwa': 'recipient_name',
        'Gmina': 'recipient_location',
        'Kod pocztowy': 'recipient_postcode',
        '\ufeff\ufeff\ufeffRok': 'year'
    })
    df = df.drop(columns=['Suma'])
    df['recipient_firstname'] = df['recipient_firstname'].fillna('').apply(str)
    df['recipient_lastname'] = df['recipient_lastname'].fillna('').apply(str)
    df['recipient_name'] = df['recipient_name'].where(df['recipient_name'].notnull(),
                                                      df['recipient_firstname'] + ' ' + df['recipient_lastname'])

    anonymous = 'Małe gospodarstwo'
    df['recipient_name'] = df['recipient_name'].str.replace(anonymous, '').str.strip()
    df = df.drop(columns=['recipient_firstname', 'recipient_lastname'])
    df['recipient_id'] = df.apply(lambda x: 'PL-%s-%s' % (x['recipient_postcode'], slugify(x['recipient_name'])), 1)
    return df

In [None]:
df = apply_fixes(df)
df.head()

In [None]:
len(df)

In [None]:
cols = 'recipient_name|recipient_location|recipient_id|recipient_postcode|year'.split('|')
scheme_cols = list(set(df.columns) - set(cols))

In [None]:
len(scheme_cols)

In [None]:
scheme_cols

In [None]:
# df['Suma'] = pd.to_numeric(df['Suma'].str.replace(',', '.'))
# for c in scheme_cols:
#     df[c] = pd.to_numeric(df[c].str.replace(',', '.'))

In [None]:
# mismatch = np.isclose(df[scheme_cols].sum(1), df['Suma'] , atol=1)
# len(df) - mismatch.sum()

In [None]:
df_final = pd.melt(df, id_vars=cols,  var_name='scheme', value_name='amount', value_vars=scheme_cols)
df_final.head()

In [None]:
df_final = df_final[(df_final['amount'] != 0.0) & df_final['amount'].notnull()]
df_final['country'] = 'PL'
df_final['currency'] = 'PLN'
df_final.head()

In [None]:
len(df)

In [None]:
df_final.to_csv('pl_{}.csv.gz'.format(YEAR), compression='gzip', index=False)