In [1]:
%matplotlib inline
import numpy as np
import pandas as pd
import py7zlib
from io import StringIO
import os
import tarfile
import time
import csv
import glob
import dask.dataframe as dd
from dask import delayed
from dask.distributed import Client
client = Client()
# check status at: http://localhost:8787/status

In [38]:
data_dir = "data/ares"
dump_dir = f"{data_dir}/2018-01-10"

## Sources of information

In [3]:
vr_full_name = pd.read_csv(f"{data_dir}/ares_ciselnik_VR.csv", sep=';', header=None).set_index(0)[1]
vr_full_name

0
NR                              Nadační rejstřík
OR                             Obchodní rejstřík
ROPS     Rejstřík obecně prospěšných společností
RSVJ    Rejstřík společenství vlastníků jednotek
RU                               Rejstřík ústavů
SR                             Spolkový rejstřík
Name: 1, dtype: object

## Available ICOs

In [43]:
def read_7z(data_dir, base_filename, read_func):
    
    with open(f"{data_dir}/{base_filename}.7z", 'rb') as file:
        archive = py7zlib.Archive7z(file)

        csv = StringIO(archive.getmember(base_filename).read().decode())

        return read_func(csv)
    
pd_read_csv_ares = lambda csv, col_names: pd.read_csv(csv, sep=';', header=None, names=col_names)
pd_read_csv_ares_7z = lambda d, fn, col_names: read_7z(d, fn, lambda csv: pd_read_csv_ares(csv, col_names)) 

In [47]:
# Obsahem je seznam IČO s datem posledního zpracování v IS ARES.
ico_all = pd_read_csv_ares_7z(data_dir, "ares_seznamIC_VR.csv", ['ico', 'date_processed'])
ico_all.head()

Unnamed: 0,ico,date_processed
0,108,24.10.2015
1,124,24.10.2015
2,205,12.01.2017
3,248,24.10.2015
4,256,24.10.2015


In [48]:
# Obsahem je seznam IČO, pro které byl vytvořen Balík.
ico_dump = pd_read_csv_ares_7z(dump_dir, "ares_seznamIC_VR_balik.csv", ['ico'])
ico_dump.head()

Unnamed: 0,ico
0,108
1,124
2,205
3,248
4,256


In [51]:
len(set(ico_dump['ico']) & set(ico_all['ico']))

973595

In [53]:
len(set(ico_dump['ico']) - set(ico_all['ico']))

1660

In [54]:
len(set(ico_all['ico']) - set(ico_dump['ico']))

0

## The main dataset

In [10]:
print(f"number of companies in the dataset: {len(ico_dump)}")

number of companies in the dataset: 975255


Untar so that we can read files quickly using dask

In [16]:
dir_untarred = f"{dump_dir}/untarred"
os.makedirs(dir_untarred, exist_ok=True)

batch_size = 5000
with tarfile.open(f"{dump_dir}/ares_vreo_all.tar.gz", "r:gz") as tar:
    members = tar.getmembers()
    file_out = open(f"{dir_untarred}/000.csv", 'w', 
                    newline='\n', encoding='utf8')
    writer = csv.writer(file_out, delimiter=',', 
                        quotechar='|', quoting=csv.QUOTE_MINIMAL,
                        lineterminator='\n')

    i = 0
    start = time.time()
    for member in members:
        writer.writerow([tar.extractfile(member).read().decode()])

        i += 1
        if i%batch_size == 0:
            file_out.close()
            file_out = open(f"{dir_untarred}/{(i//batch_size):03}.csv", 'w', 
                            newline='\n', encoding='utf8')
            writer = csv.writer(file_out, delimiter=',', 
                                quotechar='|', quoting=csv.QUOTE_MINIMAL,
                                lineterminator='\n')

        if i%50000 == 0:
            print(f"{i}-th: {(time.time()-start)/60} min") 

    file_out.close()

50000-th: 0.2488145112991333 min
100000-th: 0.5087166468302409 min
150000-th: 0.733437716960907 min
200000-th: 1.008519951502482 min
250000-th: 1.6550072073936462 min
300000-th: 2.36258620818456 min
350000-th: 3.267474893728892 min
400000-th: 4.182306981086731 min
450000-th: 4.94613067706426 min
500000-th: 5.712887322902679 min
550000-th: 6.377229917049408 min
600000-th: 6.982297758261363 min
650000-th: 7.6355587641398115 min
700000-th: 8.250639641284943 min
750000-th: 8.939453113079072 min
800000-th: 9.70247150659561 min
850000-th: 10.478009335199992 min
900000-th: 10.94570183356603 min
950000-th: 11.216181480884552 min


In [13]:
# read into a dask dataframe, standard dd.read_csv doesn't work with \n inside the entries
filenames = glob.glob(f"{dir_untarred}/*.csv")

def csv_reader(filename):
    return pd.read_csv(filename, quotechar='|', header=None)

dfs = [delayed(csv_reader)(fn) for fn in filenames]
data = dd.from_delayed(dfs)[0]
data

Dask Series Structure:
npartitions=196
    object
       ...
     ...  
       ...
       ...
Name: 0, dtype: object
Dask Name: getitem, 588 tasks

In [14]:
x = data.apply(lambda s: len(s), meta=('x', 'i8')).compute()
x.sum()

5161064672

In [15]:
len(x)

975255