<a href="https://colab.research.google.com/github/cyyeh/kaggle/blob/master/google-qa/google_qa_preprocessing_optimization_testing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preprocessing Optimization Testing

### TODOS

- [ ] Optimization for reading a large file(16.26GB)
- [ ] Optimization for computation in dataframes

## Import Libraries and Environment Setup

In [1]:
# make sure colab use tf2.x
try:
  %tensorflow_version 2.x
except Exception:
  pass

TensorFlow 2.x selected.


In [0]:
import json
import os
import pandas as pd
import numpy as np

In [3]:
!pip install transformers # BertModel



In [0]:
from transformers import AlbertTokenizer

### Import `Preprocessor.py`

In [0]:
from Preprocessor import has_long_answer, data_cleaning_for_short_answer

path = 'simplified-nq-train.jsonl'

## Download Training Dataset(16.26GB)

In [0]:
if not os.path.exists(path):
  import os
  os.environ['KAGGLE_USERNAME'] = "chihyuyeh" # username from the json file
  os.environ['KAGGLE_KEY'] = "f21b340fc8082977cbf954c80ad69ae1" # key from the json file
  !kaggle competitions download -c tensorflow2-question-answering -f simplified-nq-train.jsonl
  !unzip simplified-nq-train.jsonl.zip
  !rm simplified-nq-train.jsonl.zip

## Optimization for reading a large file(16.26GB)

### Method 1: using `multiprocessing`

issues
- code is dirty and hard to read
- still not sure if GIL limits the performance here
- how to combine sub-results to one big end result

In [0]:
import multiprocessing as mp
import os
import dill

short_answer_dataset = []

def process_data(line):
  json_obj = json.loads(line)
  if has_long_answer(json_obj['annotations'][0]['long_answer']):
    short_answer_dataset.append(data_cleaning_for_short_answer(json_obj))
  print(len(short_answer_dataset))

def process_wrapper(chunk_start, chunk_size):
  with open(path) as f:
    f.seek(chunk_start)
    lines = f.read(chunk_size).splitlines()
    for line in lines:
      process_data(line)

def chunkify(path, size=1024*1024):
  file_end = os.path.getsize(path)
  with open(path, 'rb') as f:
    chunk_end = f.tell()
    while True:
      chunk_start = chunk_end
      f.seek(size, 1)
      f.readline()
      chunk_end = f.tell()
      yield chunk_start, chunk_end - chunk_start
      if chunk_end > file_end:
        break

def run_dill_encoded(payload):
  fun, args = dill.loads(payload)
  return fun(*args)

def apply_async(pool, fun, args):
  payload = dill.dumps((fun, args))
  return pool.apply_async(run_dill_encoded, (payload,))

# init objects
pool = mp.Pool()
jobs = []

# create jobs
for chunk_start, chunk_size in chunkify(path):
  job = apply_async(pool, process_wrapper, (chunk_start, chunk_size))
  jobs.append(job)

# wait for all jobs to finish
for job in jobs:
  job.get()

# clean up
pool.close()

raw_df = pd.DataFrame(short_answer_dataset)
#print(len(raw_df))
#print(raw_df.columns)

### Method 2: using Dask

- bypass GIL

In [0]:
import dask.bag as db
import json
import pprint

In [0]:
def get_short_ans_clean_df(path, task='both', example_id=False):
  return (db.read_text(path)
           .map(json.loads)
           .filter(lambda json_obj: has_long_answer(
             json_obj['annotations'][0]['long_answer']
           ))
           .map(lambda json_obj: data_cleaning_for_short_answer(
            json_obj,
            task,
            example_id   
           ))
           .to_dataframe()
         )

In [0]:
%%timeit -n1
raw_df = get_short_ans_clean_df(path)

print(len(raw_df))
print(raw_df.columns)

152148
Index(['question_text', 'long_answer_text', 'short_answer_start_token',
       'short_answer_end_token', 'yes_no_answer'],
      dtype='object')
152148
Index(['question_text', 'long_answer_text', 'short_answer_start_token',
       'short_answer_end_token', 'yes_no_answer'],
      dtype='object')


In [0]:
data = (db.read_text(path)
          .map(json.loads)
          .filter(lambda json_obj: has_long_answer(
            json_obj['annotations'][0]['long_answer']
          ))
          .map(lambda json_obj: data_cleaning_for_short_answer(
            json_obj,
            task,
            example_id   
          ))
          .take(10)
        )

for instance in data:
  pprint.pprint(instance)