This section imports libraries and contains some utility functions. Ideally, these utility functions should be in a common library on the git repo. However, it is not trivial to sync the Colab workflow with the github repository, so currently each Colab notebook is self-contained (i.e. does not import scripts from the github repo).

In [None]:
%%capture
# Libraries and Utility Functions
!pip install fastavro
from dask.distributed import Client, progress
import numpy as np
import dask.bag as db
import time
import os
import re
import random
import glob
from dask.distributed import Client, LocalCluster
import fastavro
import pandas as pd

# For some reason, Dask does not work well with the inline %timeit function, so use a simple timer
class Timer():
    def start(self):
        self.start_time = time.time()

    def end(self):
        self.end_time = time.time()
        print(f"Time elapsed: {self.end_time - self.start_time:.2f} seconds.")

# Read text into dask bag
def load_case_documents(files, npartitions=100):
    def load_from_filename(file):
        with open(file, errors="ignore", encoding="utf-8") as f:
            filename = file.split("/")[-1].split(".")[0]                # Get filename between parent_directory/ and .txt
            return {"bva_id": int(filename), "text" : f.read()}
    b = db.from_sequence(files, npartitions=npartitions).map(load_from_filename)
    return b

# Init timer
timer = Timer()

Here we identify the files that are single-issue documents.

In [4]:
# Some preprocessing on the documents
documents = [x.split(".")[0] for x in os.listdir("all-bva-decisions")]
print(list(filter(lambda x: not str.isdigit(x), documents)))  
!mv all-bva-decisions/9221305a.txt all-bva-decisions/9221305.txt  # There is one document with "a" appended
!rm all-bva-decisions/all-bva-decisions.tar.gz                    # There is another tar.gz inside
documents = np.array(os.listdir("all-bva-decisions"))
documents_int = np.array([x.split(".")[0] for x in documents], dtype=np.int64)

['all-bva-decisions', '9221305a']


In [8]:
# Load BVACases and Vacols
dict_types = {"appeal_id": str, 
              "tiread2": np.int64, 
              "issue_id": str, 
              "imgadtm": str,
              "issdc": str,
              "issseq": str,
              "issprog": str,
              "isscode": str,
              "isslev2": str,
              "isslev3": str,
              "cvdocket": str,
              "cvdisp": str,
              "appealed_CAVC": np.int32,
              "issue_count": np.int32}

bva = pd.read_csv("BVACases.csv", dtype=dict_types)
bva = bva.sort_values("appeal_id").reset_index(drop=True)
bva.fillna("na", inplace=True)

vacols = pd.read_csv("updated_vacols.csv", dtype=dict_types)
vacols.columns.values[0] = "citation_num"
vacols = vacols.sort_values("appeal_id").reset_index(drop=True)
vacols.fillna("na", inplace=True)

# Check equality between bva and vacols
# Yes, all overlapping columns are identical 
overlapping_cols = list(set(vacols.columns) & set(bva.columns))
any_diff = False
for col in overlapping_cols:
    diff = np.sum(bva[col] != vacols[col])
    if diff > 0:
        print(f"{col}: {diff} rows differ.")
        any_diff = True
if not any_diff:
    print("All overlapping columns between vacols and bva are identical.")

# Append issue_count to vacols
vacols["issue_count"] = bva["issue_count"]

All overlapping columns between vacols and bva are identical.


In [9]:
# Find set of documents which are single issue
single_issue_citations = np.array(vacols[vacols.issue_count == 1].tiread2)
single_issue_documents = documents[np.isin(documents_int, single_issue_citations)]
print(f"There are {len(single_issue_documents):,} single issue documents in corpus.")
vacols_single_issue = vacols[vacols.issue_count == 1]

There are 346,915 single issue documents in corpus.


In [48]:
# There are some rows with duplicate tiread2
# Keep the row with the earlier imgadtm date
vacols_single_issue = vacols_single_issue.sort_values(["tiread2", "imgadtm"])
dups = vacols_single_issue[vacols_single_issue.tiread2.duplicated(keep="first")]
print(f"There are {dups.shape[0]} tiread2 values with more than 1 row in metadata.")

non_dups = vacols_single_issue[~vacols_single_issue.tiread2.duplicated(keep=False)]
vacols_dedup = pd.concat((non_dups, dups), ignore_index=True)

There are 12 tiread2 values with more than 1 row in metadata.


In [49]:
# Upload vacols
vacols_dedup.to_csv("vacols_processed.csv", index=False)

Copying file://vacols_processed.csv [Content-Type=text/csv]...
-
Operation completed over 1 objects/25.2 MiB.                                     


This section writes all the single-issue BVA decision documents into avro format and uploads to GCP. Avro format is suitable for reading and processing by Dask.

In [51]:
# Start Dask Client
cluster = LocalCluster(processes=False, n_workers=12, threads_per_worker=1, diagnostics_port=None)
client = Client(cluster)
client

# Avro Schema for Storing Documents
schema = {'name': 'all-bva-decisions',
          'namespace': 'Documents',
          'doc': 'Full case documents for all BVA decisions',
          'type': 'record',
          'fields': [{'name': 'text', 'type': 'string'},
                     {'name': 'bva_id', 'type': 'int'}]}

# Write documents to Avro (compressed format)
timer = Timer()
timer.start()
folder = "all-bva-decisions"
list_files = [f"{folder}/{x}" for x in single_issue_documents]
loaded_files = load_case_documents(list_files)
!mkdir single-issue-decisions-avro
loaded_files.to_avro("single-issue-decisions-avro/decisions.*.avro", schema=schema, codec='deflate')
timer.end()

mkdir: cannot create directory ‘single-issue-decisions-avro’: File exists
Time elapsed: 437.37 seconds.
