In [1]:
import json
import os
import bz2
import io
from bz2 import BZ2File

### Transform the data


You can run bash commands from your notebook, just prefix the command with `!`

For example, let's check free space on the disk:

In [2]:
! df -h

Filesystem                                                   Size   Used  Avail Capacity iused      ifree %iused  Mounted on
/dev/disk1s5                                                466Gi   15Gi  211Gi     7%  485949 4881966891    0%   /
devfs                                                       347Ki  347Ki    0Bi   100%    1202          0  100%   /dev
/dev/disk1s1                                                466Gi  233Gi  211Gi    53% 3352173 4879100667    0%   /System/Volumes/Data
/dev/disk1s4                                                466Gi  5.0Gi  211Gi     3%       6 4882452834    0%   /private/var/vm
map auto_home                                                 0Bi    0Bi    0Bi   100%       0          0  100%   /System/Volumes/Data/home
/dev/disk1s3                                                466Gi  502Mi  211Gi     1%      38 4882452802    0%   /Volumes/Recovery
com.apple.TimeMachine.2020-02-28-075704.local@/dev/disk1s1  466Gi  221Gi  211Gi    52% 3330208 48791226

whereas without `!` it won't work:

In [3]:
df -h

NameError: name 'df' is not defined

In [4]:
# rm data/*

In [8]:
!s3cmd get s3://impresso-public/sample-rebuilt-teaching/* data/

download: 's3://impresso-public/sample-rebuilt-teaching/GDL-1900.jsonl.bz2' -> 'data/GDL-1900.jsonl.bz2'  [1 of 2]
 72413580 of 72413580   100% in    0s    85.86 MB/s  done
download: 's3://impresso-public/sample-rebuilt-teaching/JDG-1900.jsonl.bz2' -> 'data/JDG-1900.jsonl.bz2'  [2 of 2]
 210016648 of 210016648   100% in    1s   101.29 MB/s  done


In [14]:
cd data/

/Users/hugo/Code/shs-dig-hum/data


In [15]:
ls -la

total 233664
drwxr-xr-x  4 hugo  staff        128 11 mar 09:49 [1m[36m.[m[m/
drwxr-xr-x  9 hugo  staff        288 11 mar 10:01 [1m[36m..[m[m/
-rw-r--r--  1 hugo  staff    2060288 11 mar 10:00 GDL-1969-reduced.jsonl.bz2
-rw-r--r--@ 1 hugo  staff  107227845 26 fév 16:21 GDL-1969.jsonl.bz2


The command here below does the following things:
- reads the `bz2` files in the current directory one by one
- for each file calls `jq` and uses to reshape the JSON documents, selecting and renaming some fields
- passes on the reshaped documents to `bzip2` which creates a new file, having same name + a `-reduced` prefix and containing the reshaped data

Why is useful to do this before processing the data?

Our data contain quite a lot of information that won't be useful to you. By filtering it out you reduce the size of data and speed up the processing.

In [17]:
%%time
!for f in *[0-9].jsonl.bz2; do bzcat $f | jq -c '{id: .id, type: .tp, date: .d, title: .t, fulltext: .ft}' | bzip2 > "${f%.jsonl.bz2}-reduced.jsonl.bz2" ; done

CPU times: user 790 ms, sys: 212 ms, total: 1 s
Wall time: 44.3 s


In [18]:
ls -la ./

total 264384
drwxr-xr-x  4 hugo  staff        128 11 mar 09:49 [1m[36m.[m[m/
drwxr-xr-x  9 hugo  staff        288 11 mar 10:01 [1m[36m..[m[m/
-rw-r--r--  1 hugo  staff   17183966 11 mar 10:02 GDL-1969-reduced.jsonl.bz2
-rw-r--r--@ 1 hugo  staff  107227845 26 fév 16:21 GDL-1969.jsonl.bz2


In [19]:
cd ../

/Users/hugo/Code/shs-dig-hum


### Reading newspaper archive data

Reminder: the data is already 'clean' and the files at hand contains only the following information:
- id
- date
- title
- type (article or advertisement)
- fulltext

In [20]:
pwd

'/Users/hugo/Code/shs-dig-hum'

In [21]:
input_dir = "data/" # update with your path 

In [22]:
# a helper function to get the lines from am archive
def read_jsonlines(bz2_file):
    text = bz2_file.read().decode('utf-8')
    for line in text.split('\n'):
        if line != '':
            yield line

### reading data the classical way

In [23]:
for archive in os.listdir(input_dir):
    
    # take only the transformed archives
    if "reduced" in archive:
        
        # open the archive
        f = BZ2File(os.path.join(input_dir, archive), 'r')
        
        # get the list of articles it contains (= a json object on each line)
        articles = list(read_jsonlines(f))
        
        # load the first 100 articles as json and access their attributes
        for a in articles[:100]:
            
            # decode the json string into an object (dict)
            json_article = json.loads(a)
            print(
                json_article["date"],
                json_article["id"],
                json_article["title"]
            )

1969-07-07 GDL-1969-07-07-a-i0001 Privée de gouvernement l'Italie est inquiète
1969-07-07 GDL-1969-07-07-a-i0009 None
1969-07-07 GDL-1969-07-07-a-i0010 Cent évêques européens aujourd'hui à Coire
1969-07-07 GDL-1969-07-07-a-i0024 Meta Antenen: record du monde
1969-07-07 GDL-1969-07-07-a-i0035 GAÉTAN GANY A ESSAYÉ POUR VOUS Une Opel dans le vent: la GT 1,
1969-07-07 GDL-1969-07-07-a-i0041 None
1969-07-07 GDL-1969-07-07-a-i0054 FEUILLETON ERIC MALPASS Le Matin est servi
1969-07-07 GDL-1969-07-07-a-i0068 L'Institut suisse de recherches expérimentales sur le cancer prévoit la construction d'un nouveau bâtiment
1969-07-07 GDL-1969-07-07-a-i0089 Lundi 7 juillet 1969 SE JOUANT DE TOUTES LES DIFFICULTÉS DU PARCOURS Gazette de Lausanne Jacky Stewart remporte le deuxième Grand Prix de France à Clermont-Ferrand 9 MAGNIFIQUE BATAILLE ENTRE ICKX ET BELTOISE 625 . 819
1969-07-07 GDL-1969-07-07-a-i0098 Vive tension entre tribus au Kenya
1969-07-07 GDL-1969-07-07-a-i0002 Un point pour les syndicats
196

### Reading from s3

It's possible to read the data directly from S3.

We set up a public bucket (~ folder) with some sample data that you can access without need for authentication. 

In [29]:
import dask.bag as db
import json

ModuleNotFoundError: No module named 'dask'

In [14]:
_storage_options={
    'client_kwargs': {'endpoint_url':'https://os.zhdk.cloud.switch.ch'},
    'anon':True
}

ci_bag = db.read_text(
    's3://impresso-public/sample-rebuilt-teaching/*bz2',
    storage_options=_storage_options
).map(json.loads)

NameError: name 'db' is not defined

In [15]:
%%time
ci_bag.count().compute()

NameError: name 'ci_bag' is not defined

In [16]:
ci_bag.take(1)

NameError: name 'ci_bag' is not defined

In [17]:
%%time
ci_bag.filter(lambda ci: 'ft' in ci).map(lambda ci: len(ci['ft'])).sum().compute()

NameError: name 'ci_bag' is not defined

### using dask and map
see http://dask.pydata.org/en/latest/docs.html 

In [34]:
# make sure of having these libraries in your environment ('conda install' / or 'pip install')
from dask.diagnostics import ProgressBar
from dask.distributed import Client, progress
import dask.bag as db

#### Helper functions

In [35]:
def get_archives(path):
    archives = []
    for archive in os.listdir(path):
        if "reduced" in archive:
            archives.append(os.path.join(input_dir, archive))
    return archives

In [36]:
def get_articles(archive_file):
    articles = []
    # open the archive
    f = BZ2File(archive_file, 'r')
    # get the list of articles it contains (= a json object on each line)
    lines = list(read_jsonlines(f))
    # load the articles as json and access their attributes
    for a in lines:
        articles.append(json.loads(a))
    return articles

#### Read and filter articles in parallel

In [37]:
# fetch only "reduced" bz2 files
archives = get_archives(input_dir)

In [38]:
archives

['data/GDL-1969-reduced.jsonl.bz2']

In [39]:
bag_archives = db.from_sequence(archives)

In [40]:
bag_articles = bag_archives.map(get_articles)\
                        .flatten()\
                        .filter(lambda ar: ar['fulltext'] != '')\
                        .repartition(npartitions=100)

In [41]:
with ProgressBar():
    bag_articles = bag_articles.persist()

[########################################] | 100% Completed |  3min 29.0s


In [42]:
bag_articles.take(1, npartitions=10)

({'id': 'GDL-1969-07-07-a-i0001',
  'type': 'ar',
  'date': '1969-07-07',
  'title': "Privée de gouvernement l'Italie est inquiète",
  'fulltext': "Privée de gouvernement l'Italie est inquiète APRES LA DÉMISSION DU CABINET DE M. MARIANO RUMOR Depuis 18 mois, le pays est traversé d'une vague de désordres et de mécontentement Le Gouvernement Rumor avait été formé avec la plus grande difficulté en décembre dernier Avec l'éclatement du parti socialiste, la création d'une majorité est des plus hypothétiques ROME, 6 juillet. — Six mois après avoir été formé, le Gouvernement de centre-gauche de M. Mariano Rumor a démissionné samedi, faute de pouvoir compter maintenant sur une majorité cohérente à la suite de la scission intervenue au sein du parti socialiste. La démission de M. Rumor met fin au 29 Gouvernement italien de l'après-guerre et plonge le pays dans l'incertitude. La crise s'annonce grave, car elle intervient après six mois d'agitation estudiantine, syndicale et sociale. M. Mariano R

In [43]:
bag_articles.count().compute()

38510

In [44]:
articles = bag_articles.compute()

In [45]:
len(articles)

38510

#### Compute total number of tokens (parallel)

In [19]:
n_tokens = bag_articles.filter(lambda i: i['fulltext'] is not None).map(lambda ar: len(ar['fulltext'].split()))

In [20]:
total_tokens = n_tokens.sum().compute()

In [21]:
"{:,}".format(total_tokens)

'26,088,321'