# Analysis of OA-availability of subscribed journals
December 2020

#### Dataset components
1. Unpaywall [database snapshot](https://unpaywall.org/products/snapshot), updated on 10-09-2020; accessed 11-25-2020. 
2. [ISSN-L](https://www.issn.org/understanding-the-issn/assignment-rules/the-issn-l-for-publications-on-multiple-media/) mapping table, accessed on 11-23-2020.
3. GW Elsevier/ScienceDirect subscribed journals (FY 2020) as of 12-01-2020.

#### Methodology
1. Reduce the Unpaywall database to the following fields:
   - `journal_issn_l`  (for linking on library subscriptions)
   - `doi` (unique article identifier) 
   - `year` (of publication) 
   - `is_oa` (is this article available OA?)
2. Match library journal titles to their ISSN-L.
3. Match the latter to the corresponding rows in the Unpaywall database.
4. Compute metrics per title.

In [1]:
import json
from dask.distributed import Client, progress
import dask
import dask.bag as db
import dask.dataframe
import pandas as pd

The Unpaywall database is a single JSON-L (line-break-delimited JSON) file. The current snapshot is approximately 175 GB, so far too large to load in memory.

But we can use `json.loads` to inspect a single row to determine the fields we need.

In [None]:
f = open('./unpaywall_snapshot_2020-10-09T153852.jsonl')

In [None]:
json.loads(next(f))

We're using the [Dask distributed](https://distributed.dask.org/en/latest/) module to process the Unpaywall JSON in parallel without persisting it in memory.

The `client` allows us to monitor performance as well as use distributed processing by default.

In [2]:
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:55681  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 17.18 GB


A simple function will extract the fields we want from the JSON data. 

In [None]:
def flatten(record):
    return {'issn_l': record.get('journal_issn_l'),
           'doi': record.get('doi'),
           'year': record.get('year'),
           'is_oa': record.get('is_oa'),
           'genre': record.get('genre'),
           'is_paratext': record.get('is_paratext')}

#### Dask pipeline: data prep

1. Load the JSON file as text and map it to `json.loads`. (We don't need to use `jsonlines` because the Dask worker will automatically read the file line by line.

**Note**: Setting the `blocksize` parameter is important: without it, dask allocates the whole file to a single worker, which defeats the parallelization (and on my machine, caused the task to restart before finishing every time).


In [None]:
b = db.read_text('./unpaywall_snapshot_2020-10-09T153852.jsonl', blocksize='128MiB').map(json.loads)

2. Map the custom function to the JSON and then convert to a DataFrame, setting the `dtype` for each column.

In [None]:
df = b.map(flatten).to_dataframe(meta={'issn_l': 'object',
                                      'doi': 'object',
                                      'year': 'object',
                                      'is_oa': 'object',
                                      'genre': 'object',
                                      'is_paratext': 'object'})

3. Save the DataFrame to disk as parquet files. (The conversion from `object` to `str` seems to be necessary for `pyarrow`, at least. Try next time with `fastparquet`?)

In [None]:
#df.to_parquet('./unpaywall_parquet', engine='fastparquet')
df.astype(str).to_parquet('./unpaywall_parquet', engine='fastparquet')

**Restart the notebook kernel here.** (We're using a differently configured dask client for subsequent operations, so it's easiest just to start with a new kernel at this point.)

#### Prepping the local subscription data

1. Read a CSV of journals we subscribe to. (These are from a report run in Alma Analytics.) 
   - Each journal occupies a single line. 
   - ISSN's are included for matching.
   - We also include the Alma unique title identifier (MMS Id). (This number is unique per title, whereas a single title may have multiple ISSN's.)

In [2]:
elsevier_jrnls = pd.read_csv('./elsevier-subs.csv')

In [None]:
elsevier_jrnls.head()

2. Exclude those rows where the ISSN field is null.

In [3]:
elsevier_jrnls = elsevier_jrnls.loc[~elsevier_jrnls.ISSN.isnull()].copy()

3. Reindex the DataFrame on the unique identifier (this will make reshaping it easier).

In [4]:
elsevier_jrnls = elsevier_jrnls.set_index('MMS Id')

4. Split the semicolon-delimited ISSN field into multiple columns.

In [5]:
elsevier_issns = elsevier_jrnls.ISSN.str.split('; ', expand=True)\
                        .rename(columns={n: f'issn{n}' for n in range(3)})

In [6]:
elsevier_issns.head()

Unnamed: 0_level_0,issn0,issn1,issn2
MMS Id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
9927679463604107,0360-8557,0742-1974,
99185880941204107,0740-624X,0740-624X,
99185890753304107,0747-5632,0747-5632,
9969962313604107,0953-9611,0954-0504,
99185890742504107,1090-2147,0278-2626,


5. Load the ISSN-to-ISSN-L mapping table from the ISSN authority.

In [7]:
with open('issnltables/20201123.ISSN-to-ISSN-L.txt', 'r') as f:
    issn_tbl = f.read()

6. Convert this tab-delimited table to a dictionary. Keys are ISSN's; values are ISSN-L's.

In [8]:
issn_tbl=issn_tbl.split('\n')

In [9]:
# Last row is a null
issn_tbl = dict(((l.split('\t') for l in issn_tbl[:-1])))

7. Reshape our DataFrame of ISSN's into a Series where each row consists of a single ISSN.

In [11]:
elsevier_issns = elsevier_issns.stack()

AttributeError: 'Series' object has no attribute 'stack'

8. Here we add a second column to our Series (creating a DataFrame) that maps every ISSN from our list of subscriptions to a its ISSN-L.

In [12]:
elsevier_issns = pd.concat([elsevier_issns, elsevier_issns.apply(lambda x: issn_tbl.get(x))], axis=1)\
                                        .rename(columns={0: 'issn_alma', 1: 'issn_l'})

In [13]:
# Get rid of any duplicates
elsevier_issns = elsevier_issns.reset_index().drop_duplicates(subset='issn_l')

In [14]:
elsevier_issns.head()

Unnamed: 0,MMS Id,level_1,issn_alma,issn_l
0,9927679463604107,issn0,0360-8557,0360-8557
1,9927679463604107,issn1,0742-1974,0742-1974
2,99185880941204107,issn0,0740-624X,0740-624X
4,99185890753304107,issn0,0747-5632,0747-5632
6,9969962313604107,issn0,0953-9611,0953-9611


#### Dask pipeline: Analysis

The objective here is to merge our small DataFrame of subscriptions with the Unpaywall database in order to retrieve OA-information about the contents of each journal over time. Since the Unpaywall database, even in its reduced form, does not fit into memory, we're again using dask to handle this part.

1. Restart the dask client. Here we set the `processes` parameter to false to use a single worker. (On my machine, distributed processing did not work for merging, though I'm not sure why. It is a [documented use case](https://docs.dask.org/en/latest/dataframe-joins.html). 

In [15]:
client = Client(processes=False)
client

0,1
Client  Scheduler: inproc://192.168.0.19/40324/1  Dashboard: http://192.168.0.19:8787/status,Cluster  Workers: 1  Cores: 8  Memory: 17.18 GB


2. Create a dask `DataFrame` from the parquet files saved above.

In [16]:
df = dask.dataframe.read_parquet('./unpaywall_parquet', engine='fastparquet')

3. This is the computationally intensive part. On my MacBook Pro 2019 (2.8 GHz Intel Core i7), it took about 30 minutes.

In [17]:
elsevier_merged = dask.dataframe.merge(df, elsevier_issns, how='right', on='issn_l')

In [18]:
elsevier_merged = elsevier_merged.compute()



In [19]:
elsevier_merged.head()

Unnamed: 0,issn_l,doi,year,is_oa,genre,is_paratext,MMS Id,level_1,issn_alma
0,0022-0965,10.1016/j.jecp.2019.104675,2019.0,False,journal-article,False,99185890742304107,issn0,1096-0457
1,0022-0965,10.1016/0022-0965(76)90079-5,1976.0,False,journal-article,False,99185890742304107,issn0,1096-0457
2,0022-0965,10.1016/j.jecp.2020.104944,2020.0,False,journal-article,False,99185890742304107,issn0,1096-0457
3,0022-0965,10.1016/0022-0965(88)90060-4,1988.0,False,journal-article,False,99185890742304107,issn0,1096-0457
4,0022-0965,10.1016/j.jecp.2019.104708,2020.0,True,journal-article,False,99185890742304107,issn0,1096-0457


In [21]:
elsevier_merged.to_csv('./elsevier_merged.csv', index=False)

For 137 unique ISSN's, we retrieved 560K DOI's.

In [20]:
elsevier_merged.nunique()

issn_l            137
doi            561009
year               66
is_oa               2
genre               3
is_paratext         2
MMS Id            135
level_1             2
issn_alma         138
dtype: int64

4. Clean up the merged dataset.
   - Drop rows where the publication year is null. 
   - Drop the ISSN columns; we have those in our original dataset, to which we can match using the unique title ID's.
   - Convert data types from string to the appropriate types.

In [22]:
elsevier_merged = elsevier_merged.loc[~elsevier_merged.year.isnull() & (elsevier_merged.year != 'nan')].copy()

In [23]:
elsevier_merged = elsevier_merged.drop(['level_1', 'issn_alma'], axis=1).drop_duplicates()

In [24]:
elsevier_merged.is_oa = elsevier_merged.is_oa.apply(lambda x: True if x == 'True' else False)

In [25]:
elsevier_merged.is_paratext = elsevier_merged.is_paratext.apply(lambda x: True if x == 'True' else False)

In [26]:
elsevier_merged.year = elsevier_merged.year.astype(float).astype(int)

5. As a validation step/sanity check, I created a random sample of DOI's from our merged dataset to check against results from the [Unpaywall Simple Query Tool](https://unpaywall.org/products/simple-query-tool). (You can paste in up to 1,000 DOI's and receive the Unpaywall data for those as a CSV.)

In [None]:
elsevier_merged.sample(999).doi.to_clipboard(index=False)

In [None]:
sample = pd.read_csv('./unpaywall_sample.csv')

In [None]:
result = sample[['doi', 'is_oa']].merge(elsevier_merged, suffixes=['_sample', '_actual'], on='doi')

The Unpaywall snapshot we're using here is a couple of months old. So the 0.6% mismatch rate probably reflects DOI's whose status has changed since the snapshot was created.

In [None]:
len(result.loc[result.is_oa_sample != result.is_oa_actual]) / len(sample)

6. Use the `genre` and `is_paratext` fields to exclude DOI's for non-journal articles.

In [36]:
articles_merged = elsevier_merged.loc[(elsevier_merged.genre == 'journal-article') \
                                      & elsevier_merged.is_paratext].copy()

7. Calculate the number of DOI's per title per year in each OA category (_i.e._, where `is_oa` is `True` or `False`). 

In [39]:
oa_counts = elsevier_merged.groupby(['MMS Id', 'year', 'is_oa']).doi.count().unstack(level=2)

8. Merge with the original subscriptions dataset to include the full title information.

In [40]:
elsevier_oa = oa_counts.reset_index().merge(elsevier_jrnls, on='MMS Id')

In [41]:
# Convert our unique title ID's to strings so that when reading from CSV later, 
# they won't get put into scientific notation by helpful algorithms in Excel, etc.
elsevier_oa['MMS Id'] = elsevier_oa['MMS Id'].astype(str)

9. Convert the raw counts from `is_oa` to a total number of DOI's and a percentage for which `is_oa` is `True`.

In [42]:
elsevier_oa[False] = elsevier_oa[False].fillna(0)
elsevier_oa[True] = elsevier_oa[True].fillna(0)
elsevier_oa['Total Articles'] = elsevier_oa[False] + elsevier_oa[True]
elsevier_oa['Percent OA'] = elsevier_oa[True] / elsevier_oa['Total Articles']

In [43]:
elsevier_oa = elsevier_oa.drop([True, False], axis=1)

In [46]:
elsevier_oa.to_csv('elsevier_oa_counts.csv', index=False)

9. To create a more readable and manageable subset, I did the following:
   - Limit to those rows where the publication year is 2015 or later.
   - Pivot the years from rows to columns.

In [None]:
group_cols = [c for c in elsevier_oa.columns if c not in ['Total Articles', 'Percent OA']]
elsevier_metrics = elsevier_oa.loc[elsevier_oa.year >= 2015]\
                        .groupby(group_cols).agg({'Percent OA': 'sum',
                                          'Total Articles': 'sum'}) \
                        .unstack(level=1)\
                        .stack(level=0)\
                        .sort_index(level=[3,5], ascending=False)

In [None]:
elsevier_metrics.head()

In [None]:
elsevier_metrics.to_csv('elsevier_metrics.csv')