<a href="https://colab.research.google.com/github/charleslow-cmu/bva-capstone/blob/charles-regex/clean_single_issue_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook downloads raw documents from the GCP bucket `bva-appeals-raw-data` and filters it to single-issue documents only. It then writes this smaller set of data in `.avro` format, which is suitable for processing by dask. The processed data is finally uploaded into the GCP bucket `bva-appeals-processed-data`.

In [1]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Authenticate to GCP
from google.colab import auth
project_id = 'bva-appeal'
auth.authenticate_user()
!gcloud config set project {project_id}
raw_bucket_name = "bva-appeal-raw-data"
processed_bucket_name = "bva-appeal-processed-data"

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive
Updated property [core/project].


To take a quick anonymous survey, run:
  $ gcloud survey



This section imports libraries and contains some utility functions. Ideally, these utility functions should be in a common library on the git repo. However, it is not trivial to sync the Colab workflow with the github repository, so currently each Colab notebook is self-contained (i.e. does not import scripts from the github repo).

In [0]:
%%capture
# Libraries and Utility Functions
!pip install fastavro
from dask.distributed import Client, progress
import numpy as np
import dask.bag as db
import time
import os
import re
import random
import glob
from dask.distributed import Client, LocalCluster
import fastavro
import pandas as pd

# For some reason, Dask does not work well with the inline %timeit function, so use a simple timer
class Timer():
    def start(self):
        self.start_time = time.time()

    def end(self):
        self.end_time = time.time()
        print(f"Time elapsed: {self.end_time - self.start_time:.2f} seconds.")

# Read text into dask bag
def load_case_documents(files, npartitions=100):
    def load_from_filename(file):
        with open(file, errors="ignore", encoding="utf-8") as f:
            filename = file.split("/")[-1].split(".")[0]                # Get filename between parent_directory/ and .txt
            return {"bva_id": int(filename), "text" : f.read()}
    b = db.from_sequence(files, npartitions=npartitions).map(load_from_filename)
    return b

# Init timer
timer = Timer()

In [3]:
# Download all-bva-decisions from GCP and untar it for processing
if not "all-bva-decisions" in os.listdir():
    timer.start()
    !gsutil -m cp gs://$raw_bucket_name/all-bva-decisions.tar.gz .
    !tar -xf all-bva-decisions.tar.gz
    timer.end()

Copying gs://bva-appeal-raw-data/all-bva-decisions.tar.gz...
\ [1/1 files][  4.6 GiB/  4.6 GiB] 100% Done  67.9 MiB/s ETA 00:00:00           
Operation completed over 1 objects/4.6 GiB.                                      
Time elapsed: 423.72 seconds.


Here we identify the files that are single-issue documents.

In [4]:
# Some preprocessing on the documents
documents = [x.split(".")[0] for x in os.listdir("all-bva-decisions")]
print(list(filter(lambda x: not str.isdigit(x), documents)))  
!mv all-bva-decisions/9221305a.txt all-bva-decisions/9221305.txt  # There is one document with "a" appended
!rm all-bva-decisions/all-bva-decisions.tar.gz                    # There is another tar.gz inside
documents = np.array(os.listdir("all-bva-decisions"))
documents_int = np.array([x.split(".")[0] for x in documents], dtype=np.int64)

['all-bva-decisions', '9221305a']


In [6]:
# Download BVACases.csv and updated_vacols.csv
timer.start()
if not "BVACases.csv" in os.listdir():
    !gsutil -m cp gs://$raw_bucket_name/BVACases.csv .
if not "updated_vacols.csv" in os.listdir():
    !gsutil -m cp gs://$raw_bucket_name/updated_vacols.csv .
timer.end()

Copying gs://bva-appeal-raw-data/BVACases.csv...
\ [1/1 files][134.7 MiB/134.7 MiB] 100% Done                                    
Operation completed over 1 objects/134.7 MiB.                                    
Copying gs://bva-appeal-raw-data/updated_vacols.csv...
| [1/1 files][151.7 MiB/151.7 MiB] 100% Done                                    
Operation completed over 1 objects/151.7 MiB.                                    
Time elapsed: 8.86 seconds.


In [8]:
# Load BVACases and Vacols
dict_types = {"appeal_id": str, 
              "tiread2": np.int64, 
              "issue_id": str, 
              "imgadtm": str,
              "issdc": str,
              "issseq": str,
              "issprog": str,
              "isscode": str,
              "isslev2": str,
              "isslev3": str,
              "cvdocket": str,
              "cvdisp": str,
              "appealed_CAVC": np.int32,
              "issue_count": np.int32}

bva = pd.read_csv("BVACases.csv", dtype=dict_types)
bva = bva.sort_values("appeal_id").reset_index(drop=True)
bva.fillna("na", inplace=True)

vacols = pd.read_csv("updated_vacols.csv", dtype=dict_types)
vacols.columns.values[0] = "citation_num"
vacols = vacols.sort_values("appeal_id").reset_index(drop=True)
vacols.fillna("na", inplace=True)

# Check equality between bva and vacols
# Yes, all overlapping columns are identical 
overlapping_cols = list(set(vacols.columns) & set(bva.columns))
any_diff = False
for col in overlapping_cols:
    diff = np.sum(bva[col] != vacols[col])
    if diff > 0:
        print(f"{col}: {diff} rows differ.")
        any_diff = True
if not any_diff:
    print("All overlapping columns between vacols and bva are identical.")

# Append issue_count to vacols
vacols["issue_count"] = bva["issue_count"]

All overlapping columns between vacols and bva are identical.


In [9]:
# Find set of documents which are single issue
single_issue_citations = np.array(vacols[vacols.issue_count == 1].tiread2)
single_issue_documents = documents[np.isin(documents_int, single_issue_citations)]
print(f"There are {len(single_issue_documents):,} single issue documents in corpus.")
vacols_single_issue = vacols[vacols.issue_count == 1]

There are 346,915 single issue documents in corpus.


In [48]:
# There are some rows with duplicate tiread2
# Keep the row with the earlier imgadtm date
vacols_single_issue = vacols_single_issue.sort_values(["tiread2", "imgadtm"])
dups = vacols_single_issue[vacols_single_issue.tiread2.duplicated(keep="first")]
print(f"There are {dups.shape[0]} tiread2 values with more than 1 row in metadata.")

non_dups = vacols_single_issue[~vacols_single_issue.tiread2.duplicated(keep=False)]
vacols_dedup = pd.concat((non_dups, dups), ignore_index=True)

There are 12 tiread2 values with more than 1 row in metadata.


In [49]:
# Upload vacols to GCP
vacols_dedup.to_csv("vacols_processed.csv", index=False)
!gsutil -m cp vacols_processed.csv gs://$processed_bucket_name

Copying file://vacols_processed.csv [Content-Type=text/csv]...
-
Operation completed over 1 objects/25.2 MiB.                                     


This section writes all the single-issue BVA decision documents into avro format and uploads to GCP. Avro format is suitable for reading and processing by Dask.

In [51]:
# Start Dask Client
cluster = LocalCluster(processes=False, n_workers=12, threads_per_worker=1, diagnostics_port=None)
client = Client(cluster)
client

# Avro Schema for Storing Documents
schema = {'name': 'all-bva-decisions',
          'namespace': 'Documents',
          'doc': 'Full case documents for all BVA decisions',
          'type': 'record',
          'fields': [{'name': 'text', 'type': 'string'},
                     {'name': 'bva_id', 'type': 'int'}]}

# Write documents to Avro (compressed format)
timer = Timer()
timer.start()
folder = "all-bva-decisions"
list_files = [f"{folder}/{x}" for x in single_issue_documents]
loaded_files = load_case_documents(list_files)
!mkdir single-issue-decisions-avro
loaded_files.to_avro("single-issue-decisions-avro/decisions.*.avro", schema=schema, codec='deflate')
timer.end()

mkdir: cannot create directory ‘single-issue-decisions-avro’: File exists
Time elapsed: 437.37 seconds.


In [0]:
# Upload to GCP
%%capture
!gsutil -m cp -r single-issue-decisions-avro/ gs://$processed_bucket_name