In [1]:
%load_ext autoreload

In [2]:
%autoreload 2

In [3]:
import os, sys
import pandas as pd
from dask import bag as db
from dask_k8 import DaskCluster
from dask.distributed import Client

In [4]:
sys.path.append('../')

In [5]:
from sanity_check.contents.s3_data import S3_CANONICAL_DATA_BUCKET
from sanity_check.contents.checks import run_checks_canonical

In [6]:
from sanity_check.contents.s3_data import list_files_rebuilt, fetch_issue_ids_rebuilt
from sanity_check.contents.s3_data import S3_CANONICAL_DATA_BUCKET, fetch_issue_ids
from sanity_check.contents.s3_data import S3_REBUILT_DATA_BUCKET, fetch_issues
from sanity_check.contents.sync import check_sync_db, configure_db_ingestion
from sanity_check.contents.sync import check_sync_rebuilt
from sanity_check.contents.checks import run_checks_canonical, find_duplicated_content_item_IDs

In [155]:
kube_cfg = """
  containers:
    - image: daskdev/dask:1.2.0
      args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '1', --no-bokeh, --memory-limit, 1G, --death-timeout, '120']
      imagePullPolicy: Always
      name: dask-worker
      env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: EXTRA_PIP_PACKAGES
          value: https://github.com/impresso/impresso-pycommons/archive/master.zip
        - name: EXTRA_CONDA_PACKAGES
          value:
        - name: SE_ACCESS_KEY
          value: {}
        - name: SE_SECRET_KEY
          value: {}
      resources:
        requests:
          cpu: 1
          memory: "1G"
        limits:
          cpu: 1
          memory: "1G"
""".format(
    os.environ["SE_ACCESS_KEY"],
    os.environ["SE_SECRET_KEY"]
)

In [156]:
cluster = DaskCluster(
    namespace="dhlab",
    cluster_id="romanell-impresso-sanitycheck",
    worker_pod_spec=kube_cfg
)

In [157]:
cluster.create()
cluster.scale(100, blocking=False)

Scheduler: tcp://10.90.47.26:17903
Dashboard: http://10.90.47.26:8121


In [9]:
cluster.close()

In [158]:
dask_client = cluster.make_dask_client()

In [159]:
# dask_client = Client()

In [160]:
dask_client

0,1
Client  Scheduler: tcp://10.90.47.26:17903  Dashboard: http://10.90.47.26:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [20]:
!s3cmd ls 

2018-12-14 08:44  s3://TRANSFER
2019-09-24 08:34  s3://ca-canonical
2019-09-24 08:34  s3://ca-clef-canonical
2019-09-24 08:34  s3://ca-clef-rebuilt
2019-09-24 08:34  s3://ca-rebuilt
2018-11-05 08:32  s3://canonical-rebuilt
2019-06-28 09:27  s3://evenized-canonical-rebuilt-new-bnl
2019-06-10 19:34  s3://evenized-canonical-rebuilt-pubrelease
2019-06-11 10:19  s3://evenized-light-canonical-rebuilt-pubrelease
2019-05-01 19:24  s3://impresso-image-labels
2019-03-25 12:19  s3://impresso-images
2019-01-30 14:47  s3://impresso-logs
2018-11-22 10:05  s3://impresso-public
2018-11-20 15:11  s3://nzz-data-test-temporary
2019-01-22 15:15  s3://original-canonical-compressed
2018-06-06 08:17  s3://original-canonical-data
2019-06-07 08:33  s3://original-canonical-data-publrelease
2019-06-16 09:39  s3://original-canonical-fixed
2019-09-12 07:59  s3://original-canonical-staging
2019-03-28 12:38  s3://passim-rebuilt
2018-06-06 08:17  s3://processed-canonical-data
2019-05-10 07:49  s3://snapshots


## Check canonical (content item IDs)

In [23]:
duplicates_df = run_checks_canonical(
    "s3://original-canonical-staging/",
    "../reports/contents/"
)

Fetching list of newspapers from s3://original-canonical-staging/
original-canonical-staging contains 33 newspapers
s3://original-canonical-staging/ contains 1674 .bz2 files
Fetching issue ids from 1674 .bz2 files (compute=False)
Found 1 duplicated content item IDs, belonging to 1 journals.


In [None]:
duplicates_df.head()

In [27]:
pd.DataFrame(columns=['id', 'issue_id', 'newspaper_id', 'year'])

Unnamed: 0,id,issue_id,newspaper_id,year


## Check canonical (page IDs)

### Pages from issue JSON

In [81]:
from collections import Counter

In [82]:
issue_bag = fetch_issues(compute=False)

Fetching list of newspapers from s3://original-canonical-fixed
original-canonical-fixed contains 48 newspapers
s3://original-canonical-fixed contains 1919 .bz2 files
Fetching issue ids from 1919 .bz2 files (compute=False)


In [83]:
page_ids_from_issues = issue_bag.map(lambda i: i['pp']).flatten().compute()

In [84]:
len(page_ids_from_issues)

4634371

In [85]:
len(set(page_ids_from_issues))

3812590

In [23]:
duplicates = []
for page_id, count in Counter(page_ids_from_issues).items():
    if count > 1:
        duplicates.append(page_id)

In [27]:
len(duplicates)

323011

In [28]:
duplicates

['NZZ-1780-10-25-a-p0001',
 'NZZ-1780-10-25-a-p0002',
 'NZZ-1780-10-25-a-p0003',
 'NZZ-1780-10-25-a-p0004',
 'NZZ-1780-10-25-b-p0001',
 'NZZ-1780-10-25-b-p0002',
 'NZZ-1780-10-25-c-p0001',
 'NZZ-1780-11-15-a-p0001',
 'NZZ-1780-11-15-a-p0002',
 'NZZ-1784-12-29-a-p0001',
 'NZZ-1784-12-29-a-p0002',
 'NZZ-1784-12-29-a-p0003',
 'NZZ-1784-12-29-a-p0005',
 'NZZ-1798-10-31-a-p0001',
 'NZZ-1798-10-31-a-p0002',
 'NZZ-1798-10-31-a-p0003',
 'NZZ-1798-10-31-a-p0004',
 'NZZ-1802-04-30-a-p0001',
 'NZZ-1802-04-30-a-p0002',
 'NZZ-1804-05-29-a-p0001',
 'NZZ-1804-05-29-a-p0002',
 'NZZ-1804-05-29-a-p0004',
 'NZZ-1806-04-11-a-p0001',
 'NZZ-1806-04-11-a-p0002',
 'NZZ-1806-04-11-a-p0003',
 'NZZ-1806-04-11-a-p0004',
 'NZZ-1806-04-11-a-p0005',
 'NZZ-1806-04-11-a-p0006',
 'NZZ-1822-12-30-a-p0001',
 'NZZ-1822-12-30-a-p0002',
 'NZZ-1822-12-30-a-p0003',
 'NZZ-1822-12-30-a-p0004',
 'NZZ-1822-12-30-a-p0007',
 'NZZ-1823-03-03-a-p0001',
 'NZZ-1823-03-03-a-p0002',
 'NZZ-1823-03-03-a-p0003',
 'NZZ-1823-03-03-a-p0004',
 

### Pages from pages JSON

In [16]:
import json
from sanity_check.contents.s3_data import list_pages, list_newspapers
from impresso_commons.utils.s3 import IMPRESSO_STORAGEOPT, fixed_s3fs_glob
from impresso_commons.utils import chunk

In [14]:
def list_pages(bucket_name=S3_CANONICAL_DATA_BUCKET):
    if bucket_name:
        newspapers = list_newspapers(bucket_name)
    else:
        newspapers = list_newspapers()

    page_files = db.from_sequence(newspapers).map(
        lambda np: fixed_s3fs_glob(
            f"{os.path.join(bucket_name, f'{np}/pages/*')}"
        )
    ).persist()
    print(f'{bucket_name} contains {page_files.count().compute()} .bz2 files')
    return page_files

In [22]:
type(page_files_bag)

list

In [17]:
page_files_bag = list_pages()

Fetching list of newspapers from s3://original-canonical-fixed
original-canonical-fixed contains 48 newspapers
s3://original-canonical-fixed contains 455884 .bz2 files


In [24]:
len(page_files_bag)

455884

In [28]:
def read_pages(page_files):
    # the read text could be wrapped in a try/except clause
    bag_ids = db.read_text(
        page_files,
        storage_options=IMPRESSO_STORAGEOPT
    ).map(
        json.loads
    ).filter(lambda i: len(i) > 0).pluck('id')
    return bag_ids.compute()

In [31]:
from random import shuffle

In [34]:
shuffle(page_files_bag)

In [35]:
page_files_bag[:10]

['s3://original-canonical-fixed/GDL/pages/GDL-1897/GDL-1897-02-11-a-pages.jsonl.bz2',
 's3://original-canonical-fixed/JDG/pages/JDG-1959/JDG-1959-04-18-a-pages.jsonl.bz2',
 's3://original-canonical-fixed/JDG/pages/JDG-1971/JDG-1971-01-22-a-pages.jsonl.bz2',
 's3://original-canonical-fixed/NZZ/pages/NZZ-1785/NZZ-1785-07-13-a-pages.jsonl.bz2',
 's3://original-canonical-fixed/NZZ/pages/NZZ-1912/NZZ-1912-08-17-d-pages.jsonl.bz2',
 's3://original-canonical-fixed/NZZ/pages/NZZ-1910/NZZ-1910-02-08-d-pages.jsonl.bz2',
 's3://original-canonical-fixed/NZZ/pages/NZZ-1926/NZZ-1926-10-27-c-pages.jsonl.bz2',
 's3://original-canonical-fixed/LSE/pages/LSE-1954/LSE-1954-01-15-a-pages.jsonl.bz2',
 's3://original-canonical-fixed/LSE/pages/LSE-1969/LSE-1969-07-18-a-pages.jsonl.bz2',
 's3://original-canonical-fixed/luxwort/pages/luxwort-1907/luxwort-1907-01-10-a-pages.jsonl.bz2']

In [42]:
filtered_pages = [
    page
    for page in page_files_bag
    if "NZZ-" not in page
]

In [43]:
len(filtered_pages)

343208

In [48]:
%%time
page_ids = []
batch_counter = 0

# divide the list of bz2 files (one per newspaper issue)
# into even chunks of size n. Otherwise, if we keep the
# partitions per newspaper everything blows up for newspapers
# that have hundreds of thousands of issues
for batch in chunk(filtered_pages, 2500):
    batch_counter += 1
    print(f'Processing batch {batch_counter} of size {len(batch)}')
    page_ids += read_pages(batch)
    print(f'{len(page_ids)} page ids collected')
    

Processing batch 1 of size 2500
23461 page ids collected
Processing batch 2 of size 2500
47378 page ids collected
Processing batch 3 of size 2500
70107 page ids collected
Processing batch 4 of size 2500
93229 page ids collected
Processing batch 5 of size 2500
117256 page ids collected
Processing batch 6 of size 2500
140961 page ids collected
Processing batch 7 of size 2500
164512 page ids collected
Processing batch 8 of size 2500
188640 page ids collected
Processing batch 9 of size 2500
212501 page ids collected
Processing batch 10 of size 2500
236536 page ids collected
Processing batch 11 of size 2500
259883 page ids collected
Processing batch 12 of size 2500
283405 page ids collected
Processing batch 13 of size 2500
306632 page ids collected
Processing batch 14 of size 2500
329474 page ids collected
Processing batch 15 of size 2500
352821 page ids collected
Processing batch 16 of size 2500
376209 page ids collected
Processing batch 17 of size 2500
400047 page ids collected
Processing

3250296 page ids collected
CPU times: user 20min 16s, sys: 1min 14s, total: 21min 31s
Wall time: 3h 10min 7s


In [51]:
assert len(page_ids) == len(set(page_ids))

In [52]:
import pickle

In [54]:
page_pickle_fname = os.path.join(
    "../reports/contents/",
    "pages_ids_from_pages.pkl"
)

with open(page_pickle_fname, "wb") as outfile:
    pickle.dump(page_ids, outfile)

In [72]:
df_page_ids_from_pages = db.from_sequence(page_ids).map(
    lambda id: {"id": id, "from_pages": True}
).to_dataframe().set_index('id').persist()

In [86]:
df_page_ids_from_issues = db.from_sequence(set(page_ids_from_issues)).map(
    lambda id: {"id": id, "from_issues": True}
).to_dataframe().set_index('id').persist()

In [87]:
df_page_ids_from_pages.head()

Unnamed: 0_level_0,from_pages
id,Unnamed: 1_level_1
BDC-1839-01-20-a-p0001,True
BDC-1839-01-20-a-p0002,True
BDC-1839-01-20-a-p0003,True
BDC-1839-01-20-a-p0004,True
BDC-1839-01-20-a-p0005,True


In [88]:
df_page_ids_from_issues.head()

Unnamed: 0_level_0,from_issues
id,Unnamed: 1_level_1
BDC-1839-01-20-a-p0001,True
BDC-1839-01-20-a-p0002,True
BDC-1839-01-20-a-p0003,True
BDC-1839-01-20-a-p0004,True
BDC-1839-01-20-a-p0005,True


In [89]:
df_pages = df_page_ids_from_pages.join(df_page_ids_from_issues, how='outer').compute()

In [96]:
df_pages['newspaper_id'] = df_pages.index.map(lambda z: z.split('-')[0])

In [97]:
df_pages.head()

Unnamed: 0_level_0,from_pages,from_issues,newspaper_id
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
BDC-1839-01-20-a-p0001,True,True,BDC
BDC-1839-01-20-a-p0002,True,True,BDC
BDC-1839-01-20-a-p0003,True,True,BDC
BDC-1839-01-20-a-p0004,True,True,BDC
BDC-1839-01-20-a-p0005,True,True,BDC


In [151]:
df_pages[~(df_pages.from_pages == df_pages.from_issues)].shape

(562294, 3)

In [109]:
df_pages[~(df_pages.from_pages == df_pages.from_issues) & ~(df_pages.newspaper_id=='NZZ')].newspaper_id.unique()

array(['indeplux', 'luxembourg1935', 'luxwort', 'tageblatt'], dtype=object)

In [120]:
df_pages[~(df_pages.from_pages == df_pages.from_issues) & (df_pages.newspaper_id=='tageblatt')]

                             from_pages  from_issues newspaper_id
id                                                               
tageblatt-1913-10-11-a-p0007        NaN         True    tageblatt
tageblatt-1913-10-11-a-p0008        NaN         True    tageblatt
tageblatt-1913-10-11-a-p0009        NaN         True    tageblatt
tageblatt-1913-10-11-a-p0010        NaN         True    tageblatt
tageblatt-1920-05-05-a-p0003        NaN         True    tageblatt
...                                 ...          ...          ...
tageblatt-1947-10-06-a-p0005        NaN         True    tageblatt
tageblatt-1949-05-21-a-p0008        NaN         True    tageblatt
tageblatt-1949-05-21-a-p0009        NaN         True    tageblatt
tageblatt-1949-05-21-a-p0010        NaN         True    tageblatt
tageblatt-1950-10-16-a-p0007        NaN         True    tageblatt

[808 rows x 3 columns]


In [107]:
df_pages[~(df_pages.from_pages == df_pages.from_issues) & ~(df_pages.newspaper_id=='NZZ')].head(20)

Unnamed: 0_level_0,from_pages,from_issues,newspaper_id
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
indeplux-1873-01-10-a-p0006,,True,indeplux
indeplux-1876-02-16-a-p0008,,True,indeplux
indeplux-1878-11-15-a-p0008,,True,indeplux
indeplux-1881-05-05-a-p0006,,True,indeplux
indeplux-1886-04-15-a-p0001,,True,indeplux
indeplux-1886-04-15-a-p0002,,True,indeplux
indeplux-1886-04-16-a-p0001,,True,indeplux
indeplux-1886-04-16-a-p0002,,True,indeplux
indeplux-1886-04-17-a-p0001,,True,indeplux
indeplux-1886-04-17-a-p0002,,True,indeplux


In [177]:
df_pages.shape

(3812590, 3)

In [123]:
df_pages_no_nzz = df_pages[
    ~(df_pages.from_pages == df_pages.from_issues) & 
    ~(df_pages.newspaper_id=='NZZ')
]

In [181]:
df_pages_no_nzz.groupby(by="newspaper_id").agg({"newspaper_id": len})

Unnamed: 0_level_0,newspaper_id
newspaper_id,Unnamed: 1_level_1
indeplux,22
luxembourg1935,2
luxwort,3
tageblatt,808


In [180]:
", ".join(list(df_pages_no_nzz.newspaper_id.unique()))

'indeplux, luxembourg1935, luxwort, tageblatt'

In [125]:
df_pages_no_nzz.to_csv("../reports/contents/inconsistent_page_ids.csv")

In [104]:
df_pages.to_pickle("../reports/contents/pages_ids_df.pkl")

In [105]:
cluster.close()

In [41]:
# this remains a mystery...
db.read_text(
    "s3://original-canonical-fixed/NZZ/pages/NZZ-1926/NZZ-1926-09-08-e-pages.jsonl.bz2",
    storage_options=IMPRESSO_STORAGEOPT
).map(json.loads).pluck('id').compute()

FileNotFoundError: original-canonical-fixed/NZZ/pages/NZZ-1926/NZZ-1926-09-08-e-pages.jsonl.bz2

In [34]:
cluster.close()

## Check canonical (duplicated issue IDs)

In [128]:
from dask.distributed import Client

In [129]:
dask_client = Client()

In [148]:
from sanity_check.contents.s3_data import fetch_issue_ids
from sanity_check.contents.checks import check_duplicated_issues_IDs

In [None]:
duplicate_issue_ids = check_duplicated_issues_IDs(issue_bag)

## Sync rebuilt/canonical

In [161]:
issues_to_ingest, issues_to_rebuild = check_sync_rebuilt(
    S3_CANONICAL_DATA_BUCKET, S3_REBUILT_DATA_BUCKET
)

Fetching list of newspapers from s3://original-canonical-fixed
original-canonical-fixed contains 48 newspapers
s3://original-canonical-fixed contains 1919 .bz2 files
Fetching issue ids from 1919 .bz2 files (compute=False)
Fetching list of newspapers from s3://canonical-rebuilt
canonical-rebuilt contains 63 newspapers
s3://canonical-rebuilt contains 2280 .bz2 files
Joining the two dataframes
Joining the two dataframes... done


In [163]:
issues_to_ingest.to_pickle('../reports/contents/issues_to_ingest.pkl')

In [164]:
issues_to_rebuild.to_pickle('../reports/contents/issues_to_rebuild.pkl')

In [165]:
issues_to_ingest.head()

Unnamed: 0_level_0,in_rebuilt,in_canonical,newspaper_id,year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
BLB-1845-12-28-a,True,,BLB,1845
BLB-1846-01-08-a,True,,BLB,1846
BLB-1846-01-13-a,True,,BLB,1846
BLB-1846-01-17-a,True,,BLB,1846
BLB-1846-01-24-a,True,,BLB,1846


In [166]:
issues_to_rebuild.head()

Unnamed: 0_level_0,in_rebuilt,in_canonical,newspaper_id,year
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
EXP-2017-05-02-a,,True,EXP,2017
luxwort-1852-07-02-a,,True,luxwort,1852
luxwort-1860-01-01-a,,True,luxwort,1860
luxwort-1860-01-05-a,,True,luxwort,1860
luxwort-1860-01-08-a,,True,luxwort,1860


## Configure rebuild

In [167]:
from sanity_check.contents.sync import configure_db_ingestion, configure_ingestion, configure_rebuild

In [168]:
configure_rebuild(
    S3_CANONICAL_DATA_BUCKET,
    S3_REBUILT_DATA_BUCKET,
    issues_to_rebuild=issues_to_rebuild
)

Watch out: there are 146 issues that are already ingested (s3://original-canonical-fixed) but not yet rebuilt (s3://canonical-rebuilt).


[{'EXP': [2017, 2018]},
 {'luxwort': [1852, 1853]},
 {'luxwort': [1860, 1861]},
 {'luxwort': [1861, 1862]}]

In [169]:
ingestion_config = configure_ingestion(
    S3_CANONICAL_DATA_BUCKET,
    S3_REBUILT_DATA_BUCKET,
    issues_to_ingest=issues_to_ingest
)

Watch out: there are 70583 issues that are already rebuilt (s3://canonical-rebuilt) but not yet ingested (s3://original-canonical-fixed).


In [172]:
cfg_path = '/home/romanell/Documents/impresso/impresso-processing/staging.json'
with open(cfg_path, 'w') as cfg_file:
    json.dump(ingestion_config, cfg_file, indent=4)

In [173]:
newspapers_to_ingest = set([
    list(x.keys())[0] for x in ingestion_config
])

In [174]:
newspapers_to_ingest

{'BLB',
 'BNN',
 'DFS',
 'DVF',
 'FZG',
 'HRV',
 'LAB',
 'LLE',
 'MGS',
 'NTS',
 'NZG',
 'NZZ',
 'SGZ',
 'SRT',
 'WHD',
 'ZBT'}

In [175]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.client -

## Sync with DB

In [17]:
config = configure_db_ingestion(S3_CANONICAL_DATA_BUCKET, 'dev')

Fetching list of newspapers from s3://original-canonical-fixed
original-canonical-fixed contains 48 newspapers
s3://original-canonical-fixed contains 1919 .bz2 files
Fetching issue ids from 1919 .bz2 files (compute=False)
Connecting to MySQL DB dev
Fetched 441861 content item IDs from DB
There are 13963 issues from s3://original-canonical-fixed missing from MySQL (dev).


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  lambda i: i.split('-')[0]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  lambda i: int(i.split('-')[1])


In [61]:
config

Available objects for config:
     AliasManager
     DisplayFormatter
     HistoryManager
     IPCompleter
     IPKernelApp
     LoggingMagics
     MagicsManager
     OSMagics
     PrefilterManager
     ScriptMagics
     StoreMagics
     ZMQInteractiveShell
