In [1]:
!pip install git+https://github.com/SeguinBe/dask_k8.git

Collecting git+https://github.com/SeguinBe/dask_k8.git
  Cloning https://github.com/SeguinBe/dask_k8.git to /tmp/pip-req-build-pkobgy6w
  Running command git clone -q https://github.com/SeguinBe/dask_k8.git /tmp/pip-req-build-pkobgy6w
Collecting dask>=2 (from distributed>=1.27->dask-k8==0.1.1)
[?25l  Downloading https://files.pythonhosted.org/packages/bd/c0/ac42b459c77223b503fbfcc030850524c95170d27a5bcbf9ffb6fe129b5a/dask-2.0.0-py3-none-any.whl (760kB)
[K     |████████████████████████████████| 768kB 3.4MB/s eta 0:00:01
Building wheels for collected packages: dask-k8
  Building wheel for dask-k8 (setup.py) ... [?25ldone
[?25h  Stored in directory: /tmp/pip-ephem-wheel-cache-gkd45j3i/wheels/a8/70/d2/570a9e139dc6e5b6bcf457aae80efd00f5a100e2dd60407627
Successfully built dask-k8
Installing collected packages: dask
  Found existing installation: dask 1.2.0
    Uninstalling dask-1.2.0:
      Successfully uninstalled dask-1.2.0
Successfully installed dask-2.0.0


## Gather the articles in even partitions

**Warning**: despite setting the number of partition in the `repartition_evenly` call, the number does not seem to be fixed and somehow the number of final partitions can not be guessed in advance. I would recommend clearing the output directory before reprocessing to avoid having old partitions staying in the output folder.

In [2]:
from dask_k8 import DaskCluster

In [3]:
cluster = DaskCluster(namespace="dhlab", cluster_id="maud-evenize-bnl", worker_pod_spec="""
  containers:
    - image: daskdev/dask:1.1.5
      args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '1', --no-bokeh, --memory-limit, 90GB, --death-timeout, '120']
      imagePullPolicy: Always
      name: dask-worker
      env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: EXTRA_PIP_PACKAGES
          value: s3fs
        - name: EXTRA_CONDA_PACKAGES
          value: 
      resources:
        requests:
          cpu: 1
          memory: "90G"
        limits:
          cpu: 1
          memory: "90G"
""")

In [4]:
cluster.close()

In [4]:
# Create the cluster, have a look at the dashboard to check that everything is working fine
cluster.create()

Scheduler: tcp://10.90.47.35:23145
Dashboard: http://10.90.47.35:29556


In [5]:
cluster.scale(3)

Currently 2 workers out of the 3 required, waiting...
Reached the desired 3 workers!


In [6]:
# Create the dask client
client = cluster.make_dask_client()
client

KeyError: 'nthreads'

KeyError: 'nthreads'

In [10]:
import dask

In [11]:
dask.__version__

'2.0.0'

In [None]:
from dask import bag as db
#from dask.diagnostics import ProgressBar
from impresso_images import settings, data_utils
from random import shuffle
import json

dask_storage_options = {
            'client_kwargs': {'endpoint_url': 'https://os.zhdk.cloud.switch.ch'},
            'key': settings.get_access_key(), 'secret': settings.get_secret_key()
        }

In [None]:
def _filter(ci_json):
    light = dict()
    light = {k: ci_json[k] for k in ci_json if k == "id"
                                   or k == "s3v"
                                   or k == "ts"
                                   or k == "ft"
                                   or k == "tp"
                                   or k == "pp"
                                   or k == "lg"
                                   or k == "t"}
    return light

In [None]:
newspapers = [
    "armeteufel",
    "actionfem",
    "avenirgdl",
    "buergerbeamten",
    "courriergdl",
    "deletz1893",
    "demitock",
    "diekwochen",
    "dunioun",
    "gazgrdlux",
    "indeplux",
    "kommmit",
    "landwortbild",
    "luxland",
    "luxzeit1844",
    "luxzeit1858",
    "luxwort",
    "obermosel",
    "onsjongen",
    "schmiede",
    "volkfreu1869"
]

### evenize from rebuilt

In [None]:
files = []
for np in newspapers:
    print(f"Getting files from s3://canonical-rebuilt/{np}/*.jsonl.bz2")
    files.append(data_utils.fixed_s3fs_glob(f"s3://canonical-rebuilt/{np}/*.jsonl.bz2")

print(f"Have {len(files)} files")
shuffle(files)

# for full version, delete the 2 maps, just load the text
data = db.read_text(files, storage_options=dask_storage_options).map(json.loads).map(json.dumps, ensure_ascii=False)

# uncomment following line for full version
data = data.str.rstrip()  # Remove the \n added when doing just read_text

# !! change bucket if necessary
data_utils.repartition_evenly(data, nb_partitions=1000).to_textfiles("s3://evenized-canonical-rebuilt-new-bnl/*.jsonl.bz2", 
                                                                     storage_options=dask_storage_options,
                                                                    encoding='utf-8')

### light from rebuilt

In [None]:
files = data_utils.fixed_s3fs_glob("s3://canonical-rebuilt/*.jsonl.bz2")
shuffle(files)

# for full version, delete the 2 maps, just load the text
data = db.read_text(files, storage_options=dask_storage_options).map(json.loads).map(_filter).map(json.dumps, ensure_ascii=False)

# uncomment following line for full version
#data = data.str.rstrip()  # Remove the \n added when doing just read_text

# !! change bucket if necessary
data_utils.repartition_evenly(data, nb_partitions=1000).to_textfiles("s3://evenized-light-canonical-rebuilt-pubrelease/*.jsonl.bz2", 
                                                                     storage_options=dask_storage_options)

### light from evenized

In [None]:
files = data_utils.fixed_s3fs_glob("s3://evenized-canonical-rebuilt-pubrelease/*.jsonl.bz2")
data = db.read_text(files, storage_options=dask_storage_options).map(json.loads).map(_filter).map(json.dumps, ensure_ascii=False)


# !! change bucket if necessary
data.to_textfiles("s3://evenized-light-canonical-rebuilt-pubrelease/*.jsonl.bz2", 
                                                                     storage_options=dask_storage_options)

In [None]:
# After this is done, close the cluster
cluster.close()

## Process the data to get statistics

In [None]:
cluster = DaskCluster(namespace="dhlab", cluster_id="maud-test", worker_pod_spec="""
  containers:
    - image: daskdev/dask:1.1.5
      args: [dask-worker, $(DASK_SCHEDULER_ADDRESS), --nthreads, '1', --no-bokeh, --memory-limit, 20GB, --death-timeout, '120']
      imagePullPolicy: Always
      name: dask-worker
      env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        - name: EXTRA_PIP_PACKAGES
          value: s3fs
        - name: EXTRA_CONDA_PACKAGES
          value: 
      resources:
        requests:
          cpu: 1
          memory: "20G"
        limits:
          cpu: 1
          memory: "20G"
""")

In [None]:
cluster.create()
cluster.scale(200)

In [None]:
import json
data = db.read_text(data_utils.fixed_s3fs_glob("s3://rebuilt-dask-partitions-full/*.jsonl.bz2",), storage_options=dask_storage_options)
data = data.map(json.loads)

def simplify_fn(d):
    return {
        "id": d["id"],
        "ft": d.get("ft", "")
    }

# Simplify the data by only keeping the text of the articles and keeping them in memory for faster processing
data = data.map(simplify_fn).persist()

In [None]:
data.count().compute()

In [None]:
import json
def stats_fn(d):
    _id = d["id"]
    _id_split = _id.split("-")
    return {
        "id": _id,
        "np_id": _id_split[0],
        "year": int(_id_split[1]),
        "length": len(d.get("ft", ""))
    }
    
df = data.map(stats_fn).to_dataframe()

In [None]:
df.head()

In [None]:
df.persist()

In [None]:
from dask import array as da
s = df.length.values
h, bins = da.histogram(s, bins=100, range=[0, 20000])

In [None]:
h = h.compute()

In [None]:
len(bins), len(h)

In [None]:
%matplotlib inline
from matplotlib import pyplot as plt
plt.bar(bins[1:-1], h[1:], width=200)

In [None]:
import dask
dask.compute()
gg = df.groupby(['np_id', 'year'])

results = gg.agg({'length': ['count', 'mean', 'std']}).compute()

In [None]:
results

In [None]:
cluster.close()

In [None]:
import matplotlib.style as style
plt.style.use('seaborn')

plt.figure(figsize=(14, 12))
palette = plt.get_cmap('tab20')
labels = []

labels_ordered = sorted(results.index.levels[0], key = lambda np_id: -len(results.loc[np_id]))

for i, np_id in  enumerate(labels_ordered):
    tmp = results.loc[np_id][('length', 'count')]
    # Filtering out newspapers with less than 10 years of presence
    if len(tmp) > 0:
        plt.plot(tmp.index, tmp.values, color=palette(i % 20), label=np_id, marker='' if i < 20  else 'o')
        labels.append(np_id)

plt.legend(labels)
plt.xlabel('Year')
plt.ylabel("Nb articles")

In [None]:
plt.figure(figsize=(14, 12))
palette = plt.get_cmap('tab20')
labels = []

labels_ordered = sorted(results.index.levels[0], key = lambda np_id: -len(results.loc[np_id]))

for i, np_id in  enumerate(labels_ordered):
    tmp = results.loc[np_id][('length', 'mean')]
    # Filtering out newspapers with less than 10 years of presence
    if len(tmp) > 0:
        plt.plot(tmp.index, tmp.values, color=palette(i % 20), label=np_id, marker='' if i < 20  else 'o')
        labels.append(np_id)

plt.legend(labels)
plt.xlabel('Year')
plt.ylabel("Avg length of article")