In [1]:
import json
import os
import bz2
import io
from bz2 import BZ2File

### Transform the data


You can run bash commands from your notebook, just prefix the command with `!`

For example, let's check free space on the disk:

In [3]:
! df -h

Filesystem      Size  Used Avail Use% Mounted on
udev             16G     0   16G   0% /dev
tmpfs           3.2G  2.7M  3.2G   1% /run
/dev/sda2       234G  114G  108G  52% /
tmpfs            16G  321M   16G   2% /dev/shm
tmpfs           5.0M  4.0K  5.0M   1% /run/lock
tmpfs            16G     0   16G   0% /sys/fs/cgroup
/dev/loop0      256K  256K     0 100% /snap/jq/6
/dev/loop2      216M  216M     0 100% /snap/atom/236
/dev/loop1       55M   55M     0 100% /snap/core18/1144
/dev/loop3      4.2M  4.2M     0 100% /snap/gnome-calculator/406
/dev/loop4      220M  220M     0 100% /snap/atom/238
/dev/loop6      5.2M  5.2M     0 100% /snap/htop/1066
/dev/loop5      150M  150M     0 100% /snap/gnome-3-28-1804/67
/dev/loop7      150M  150M     0 100% /snap/slack/16
/dev/loop13     1.0M  1.0M     0 100% /snap/gnome-logs/73
/dev/loop8      141M  141M     0 100% /snap/gnome-3-26-1604/90
/dev/loop11      43M   43M     0 100% /snap/gtk-common-themes/1313
/dev/loop10     4.3M  4.3

whereas without `!` it won't work:

In [4]:
df -h

NameError: name 'df' is not defined

In [7]:
# rm data/*

In [8]:
!s3cmd get s3://impresso-public/sample-rebuilt-teaching/* data/

download: 's3://impresso-public/sample-rebuilt-teaching/GDL-1900.jsonl.bz2' -> 'data/GDL-1900.jsonl.bz2'  [1 of 2]
 72413580 of 72413580   100% in    0s    85.86 MB/s  done
download: 's3://impresso-public/sample-rebuilt-teaching/JDG-1900.jsonl.bz2' -> 'data/JDG-1900.jsonl.bz2'  [2 of 2]
 210016648 of 210016648   100% in    1s   101.29 MB/s  done


In [9]:
cd data/

/home/romanell/Documents/impresso/epfl-shs-class/data


In [27]:
ls -la

total 312892
drwxr-xr-x 2 romanell DHLAB-unit      4096 Sep 25 13:59 [0m[01;34m.[0m/
drwxr-xr-x 5 romanell DHLAB-unit      4096 Sep 25 14:30 [01;34m..[0m/
-rw-r--r-- 1 romanell DHLAB-unit  72413580 Sep 25 10:33 [01;31mGDL-1900.jsonl.bz2[0m
-rw-r--r-- 1 romanell DHLAB-unit  11701319 Sep 25 13:59 [01;31mGDL-1900-reduced.jsonl.bz2[0m
-rw-r--r-- 1 romanell DHLAB-unit 210016648 Sep 25 10:33 [01;31mJDG-1900.jsonl.bz2[0m
-rw-r--r-- 1 romanell DHLAB-unit  26243380 Sep 25 14:00 [01;31mJDG-1900-reduced.jsonl.bz2[0m


The command here below does the following things:
- reads the `bz2` files in the current directory one by one
- for each file calls `jq` and uses to reshape the JSON documents, selecting and renaming some fields
- passes on the reshaped documents to `bzip2` which creates a new file, having same name + a `-reduced` prefix and containing the reshaped data

Why is useful to do this before processing the data?

Our data contain quite a lot of information that won't be useful to you. By filtering it out you reduce the size of data and speed up the processing.

In [13]:
%%time
!for f in *[0-9].jsonl.bz2; do bzcat $f | jq -c '{id: .id, type: .tp, date: .d, title: .t, fulltext: .ft}' | bzip2 > "${f%.jsonl.bz2}-reduced.jsonl.bz2" ; done

CPU times: user 1.87 s, sys: 326 ms, total: 2.19 s
Wall time: 1min 30s


In [20]:
ls -la ./

total 312888
drwxr-xr-x 2 romanell DHLAB-unit      4096 Sep 25 13:16 [0m[01;34m.[0m/
drwxr-xr-x 5 romanell DHLAB-unit      4096 Sep 25 13:18 [01;34m..[0m/
-rw-r--r-- 1 romanell DHLAB-unit  72413580 Sep 25 10:33 [01;31mGDL-1900.jsonl.bz2[0m
-rw-r--r-- 1 romanell DHLAB-unit  11701319 Sep 25 13:16 [01;31mGDL-1900-reduced.jsonl.bz2[0m
-rw-r--r-- 1 romanell DHLAB-unit 210016648 Sep 25 10:33 [01;31mJDG-1900.jsonl.bz2[0m
-rw-r--r-- 1 romanell DHLAB-unit  26243380 Sep 25 13:17 [01;31mJDG-1900-reduced.jsonl.bz2[0m


In [21]:
cd ../

/home/romanell/Documents/impresso/epfl-shs-class


### Reading newspaper archive data

Reminder: the data is already 'clean' and the files at hand contains only the following information:
- id
- date
- title
- type (article or advertisement)
- fulltext

In [22]:
pwd

'/home/romanell/Documents/impresso/epfl-shs-class'

In [2]:
input_dir = "data/" # update with your path 

In [3]:
# a helper function to get the lines from am archive
def read_jsonlines(bz2_file):
    text = bz2_file.read().decode('utf-8')
    for line in text.split('\n'):
        if line != '':
            yield line

### reading data the classical way

In [25]:
for archive in os.listdir(input_dir):
    
    # take only the transformed archives
    if "reduced" in archive:
        
        # open the archive
        f = BZ2File(os.path.join(input_dir, archive), 'r')
        
        # get the list of articles it contains (= a json object on each line)
        articles = list(read_jsonlines(f))
        
        # load the first 100 articles as json and access their attributes
        for a in articles[:100]:
            
            # decode the json string into an object (dict)
            json_article = json.loads(a)
            print(
                json_article["date"],
                json_article["id"],
                json_article["title"]
            )

1900-10-10 JDG-1900-10-10-a-i0001 BULLETIN GENÈVE, 9 octobre 1900
1900-10-10 JDG-1900-10-10-a-i0057 None
1900-10-10 JDG-1900-10-10-a-i0058 None
1900-10-10 JDG-1900-10-10-a-i0059 None
1900-10-10 JDG-1900-10-10-a-i0004 None
1900-10-10 JDG-1900-10-10-a-i0013 Angleterre
1900-10-10 JDG-1900-10-10-a-i0030 RÉUNiONS.-CONVOCATIONS.-CONCERTS
1900-10-10 JDG-1900-10-10-a-i0060 None
1900-10-10 JDG-1900-10-10-a-i0005 NOUVELLES Df.S CANTONS
1900-10-10 JDG-1900-10-10-a-i0031 Comité électoral démocratique
1900-10-10 JDG-1900-10-10-a-i0061 None
1900-10-10 JDG-1900-10-10-a-i0032 DERNIERES DÉPÊCHES
1900-10-10 JDG-1900-10-10-a-i0062 None
1900-10-10 JDG-1900-10-10-a-i0007 None
1900-10-10 JDG-1900-10-10-a-i0016 CHRONIQUE LOCALE
1900-10-10 JDG-1900-10-10-a-i0033 Turquie CONSTANTINOPLE (via Sofia), 9. — Le sultan a ordonné au conseil des ministres de mettre de
1900-10-10 JDG-1900-10-10-a-i0063 None
1900-10-10 JDG-1900-10-10-a-i0017 DOCTEUR GERMAINE
1900-10-10 JDG-1900-10-10-a-i0034 Les affaires de Chine
1900-1

### Reading from s3

It's possible to read the data directly from S3.

We set up a public bucket (~ folder) with some sample data that you can access without need for authentication. 

In [14]:
import dask.bag as db
import json

In [24]:
_storage_options={
    'client_kwargs': {'endpoint_url':'https://os.zhdk.cloud.switch.ch'},
    'anon':True
}

ci_bag = db.read_text(
    's3://impresso-public/sample-rebuilt-teaching/*bz2',
    storage_options=_storage_options
).map(json.loads)

In [25]:
%%time
ci_bag.count().compute()

CPU times: user 88 ms, sys: 67.1 ms, total: 155 ms
Wall time: 1min 38s


45260

In [28]:
ci_bag.take(1)

({'id': 'GDL-1900-12-12-a-i0001',
  'pp': [1],
  'd': '1900-12-12',
  'olr': True,
  'ts': '2019-06-17T10:02:03Z',
  'lg': 'fr',
  'tp': 'ar',
  's3v': None,
  'ppreb': [{'id': 'GDL-1900-12-12-a-p0001',
    'n': 1,
    't': [{'c': [471, 1240, 406, 113], 's': 0, 'l': 12},
     {'c': [113, 1233, 15, 54], 's': 13, 'l': 1},
     {'c': [127, 1233, 44, 54], 's': 15, 'l': 2},
     {'c': [235, 1233, 27, 54], 's': 18, 'l': 1},
     {'c': [262, 1233, 135, 54], 's': 20, 'l': 6},
     {'c': [204, 1304, 67, 40], 's': 27, 'l': 4},
     {'c': [300, 1304, 135, 40], 's': 32, 'l': 7},
     {'c': [106, 1365, 56, 40], 's': 40, 'l': 3},
     {'c': [175, 1365, 42, 40], 's': 44, 'l': 2},
     {'c': [225, 1365, 75, 40], 's': 47, 'l': 4},
     {'c': [315, 1365, 48, 40], 's': 52, 'l': 2},
     {'c': [377, 1365, 48, 40], 's': 55, 'l': 2},
     {'c': [438, 1365, 169, 40], 's': 58, 'l': 8},
     {'c': [606, 1365, 10, 40], 's': 66, 'l': 1},
     {'c': [635, 1365, 63, 40], 's': 68, 'l': 3},
     {'c': [710, 1365, 17

In [20]:
%%time
ci_bag.filter(lambda ci: 'ft' in ci).map(lambda ci: len(ci['ft'])).sum().compute()

CPU times: user 84.3 ms, sys: 69.2 ms, total: 154 ms
Wall time: 1min 26s


152542922

### using dask and map
see http://dask.pydata.org/en/latest/docs.html 

In [4]:
# make sure of having these libraries in your environment ('conda install' / or 'pip install')
from dask.diagnostics import ProgressBar
from dask.distributed import Client, progress
import dask.bag as db

#### Helper functions

In [5]:
def get_archives(path):
    archives = []
    for archive in os.listdir(path):
        if "reduced" in archive:
            archives.append(os.path.join(input_dir, archive))
    return archives

In [6]:
def get_articles(archive_file):
    articles = []
    # open the archive
    f = BZ2File(archive_file, 'r')
    # get the list of articles it contains (= a json object on each line)
    lines = list(read_jsonlines(f))
    # load the articles as json and access their attributes
    for a in lines:
        articles.append(json.loads(a))
    return articles

#### Read and filter articles in parallel

In [7]:
# fetch only "reduced" bz2 files
archives = get_archives(input_dir)

In [8]:
archives

['data/JDG-1900-reduced.jsonl.bz2', 'data/GDL-1900-reduced.jsonl.bz2']

In [9]:
bag_archives = db.from_sequence(archives)

In [10]:
bag_articles = bag_archives.map(get_articles)\
                        .flatten()\
                        .filter(lambda ar: ar['fulltext'] != '')\
                        .repartition(npartitions=100)

In [11]:
with ProgressBar():
    bag_articles = bag_articles.persist()

[########################################] | 100% Completed |  1min 59.0s


In [22]:
bag_articles.take(1, npartitions=10)

({'id': 'JDG-1900-10-10-a-i0001',
  'type': 'ar',
  'date': '1900-10-10',
  'title': 'BULLETIN GENÈVE, 9 octobre 1900',
  'fulltext': 'BULLETIN GENÈVE, 9 octobre 1900 Nul n\'a jamais occupé le télégraphe plus que S. E. Li-Hung-Chang. Tous ses faits et gestes sont transmis au monde en attendant qu\'ils soient conservés par l\'histoire. Et il arrive que, s\'il change d\'avis en quelques minutes, deux dépêches contraires se suivent, pour nous l\'apprendre, dans les colonnes des journaux. C\'est ainsi qu\'hier nos lecteurs ont pu lire coup sur coup que le départ de ce grand homme pour Pékin était ajourné définiment et qu\'il était parti pour cette ville. Et une autre dépêche, datée du 8, ù-dire de deux jours plus tard, nous annonce que M de Giers est parti pour Tieh-Tsin afin de conférer avec cet encombrant mandarin. Est-ce que vraiment les actes et les paroles de ce personnage dont on ne Sait s\'il est encore quelqu\'un ou s\'il ne l\'est plus ont tant rêt pour le monde ? Il s\'entoure d\

In [16]:
bag_articles.count().compute()

41908

In [17]:
articles = bag_articles.compute()

In [18]:
len(articles)

41908

#### Compute total number of tokens (parallel)

In [19]:
n_tokens = bag_articles.filter(lambda i: i['fulltext'] is not None).map(lambda ar: len(ar['fulltext'].split()))

In [20]:
total_tokens = n_tokens.sum().compute()

In [21]:
"{:,}".format(total_tokens)

'26,088,321'