In [None]:
import pandas as pd
from tqdm.notebook import tqdm

import asyncio
import multiprocessing as mp
from multiprocessing import cpu_count()

import json

import tensorflow as tf
import numpy as np
cpu_count()

In [None]:
## config stuff TODO: tidy this up
n_splits = 5
seed = 1
CHUNK_SIZE = 512
N_FEATURES = 543
N_PART = 1
part = 0

n_procs = 4

import json
WORD_DICT = json.load(open('/kaggle/input/asl-fingerspelling/character_to_prediction_index.json'))
manifest = pd.read_csv('/kaggle/input/asl-fingerspelling/train.csv')
manifest = manifest.groupby('file_id').sample(frac=1).reset_index(drop=True)

In [None]:
import random
import os
import re

class ParquetReader():
    def __init__(self, file_name):
        self.file_name = file_name
        
    async def open_parquet(self):
        self.parquet = pd.read_parquet(self.file_name)
        self.examples = self.parquet.index.unique()
        self.progress = tqdm(desc=f'reader {self.file_name}', total=len(self.examples))
        
        return self
        
    def __getitem__(self, idx):
        
        frame = self.parquet.loc[idx]
        self.progress.update(1)
        
        if type(frame) != pd.DataFrame:
            return None
        frame = frame.drop('frame', axis=1)
        
        frame = frame[sorted(frame.columns, key=lambda x: x[2:])]
        n_frames = len(frame)
        
        return frame.values.reshape(n_frames, N_FEATURES, 3) # we shape this into a frame x feature x axis tensor
    
    def __contains__(self, idx):
        return idx in self.examples
    
    def __len__(self):
        return len(self.examples)
    
class ChunkWriter():
    def __init__(self, file_name, chunk_size):
        self.file_name = file_name
        self.num_written = 0
        self.file = tf.io.TFRecordWriter(file_name, options='GZIP')
        self.chunk_size = chunk_size
        self.progress = tqdm(desc=f'writer {file_name}', total=self.chunk_size)
        
    async def write(self, row, coordinates):
        m_bytes = encode_row(row, coordinates)
        self.file.write(m_bytes)
        self.num_written += 1
        self.progress.update(1)
        return self
    
    def is_full(self):
        return self.num_written >= self.chunk_size
    
    def close(self):
        self.file.close()
        if self.num_written != self.chunk_size:
            new_filename = re.sub(r"(-)([0-9]*)(\.)",f"\g<1>{self.num_written}\g<3>", self.file_name)
            os.rename(self.file_name, new_filename)

class ParallelFoldWriterGroup():
    """A processing class that reads and writes tfrecord files in parallel. It is a little bit tedious because we don't have 1:1 correspondence between parquet files and training examples.
    Yeah this might be a bit over-engineered. And yes, I know that it should use parallell processing instead of coroutines. That will be addressed when I am done with the project and have time to worry about these things."""
    def __init__(self, manifest, write_dir, n_folds, chunk_size, max_writers):
        # generate fold groups
        file_groups = [df for _, df in manifest.groupby('file_id')]
        fold_size = len(manifest) // n_folds
        random.shuffle(file_groups)
        self.folds = pd.concat(file_groups).reset_index()
        self.folds = [df.reset_index(drop=True) for _, df in self.folds.groupby(lambda x: x // fold_size)]
        if len(self.folds) > n_folds:
            self.folds[-1] = pd.concat(self.folds[-2:]).reset_index(drop=True)
            del self.folds[-2]
            
        self.chunk_size = chunk_size
        self.max_writers = min(fold_size // chunk_size, max_writers)
        self.write_dir = write_dir
        self.current_reader, self.next_reader = None, None
        
    async def main_write_loop(self):
        for current_fold, fold in enumerate(self.folds):
            current_chunk = 0
#             fold = fold.iloc[:self.chunk_size*3]
            read_files = fold.path.unique().tolist()[::-1]
            read_files = [os.path.join('/kaggle/input/asl-fingerspelling/', x) for x in read_files]
            examples_to_write = 0
            await self.swap_readers(read_files, True)
            done, pending, n_writers = set(), set(), 0
            
            for idx in fold.index:
                # read row from the current parquet
                row = fold.loc[idx]
                if not row.sequence_id in self.current_reader:
                    # current reader has expired. Need to swap to the new one
                    await self.swap_readers(read_files)
                    
                # assert that we don't have this error that I am anticipating being a big issue
                if not row.sequence_id in self.current_reader:
                    raise RuntimeError('This should not happen since sequence ids are stored in the same order that the files are opened.')

                coordinates = self.current_reader[row.sequence_id]
                
                # sometimes coordinates will be a single-row dataframe. In this case, just skip the row
                if type(coordinates) != np.ndarray:
                    continue
                
                # load the coordinates and row into one of the open readers
                if n_writers < self.max_writers and len(fold) - examples_to_write > self.chunk_size:
                    # we are filling up the writers pool
                    writer = self.open_new_writer(current_fold, current_chunk)
                    n_writers += 1
                    current_chunk += 1
                    examples_to_write += self.chunk_size
                else:   
                    new_done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
                    done = done.union(new_done)
                    writer = done.pop().result()
                    if writer.is_full() and len(fold) - examples_to_write < self.chunk_size:
                        while done and writer.is_full():
                            # we can write the entire fold with the currently-open writers, close the current writer
                            writer.close()
                            n_writers -= 1
                            writer = done.pop().result()
                        
                    elif writer.is_full():
                        # we can open a new writer
                        writer.close()                        
                        writer = self.open_new_writer(current_fold, current_chunk)
                        examples_to_write += self.chunk_size
                        current_chunk += 1

                new_task = asyncio.create_task(writer.write(row, coordinates))
                pending.add(new_task)
                   
            # close up shop. All pending writing tasks should finish
            done, _ = await asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED)
            for writer in done:
                writer = writer.result()
                writer.close()
                del writer
                
    async def swap_readers(self, files, new_fold=False):
        """Do this to keep a background reader that can be hot swapped with the current one when it is done"""
        if new_fold:
            if not self.current_reader:
                assert not self.next_reader
                self.current_reader = await ParquetReader(files.pop()).open_parquet()
                if files:
                    self.next_reader = ParquetReader(files.pop()).open_parquet()
            else:
                last_filename = self.current_reader.file_name
                if last_filename in files:
                    files.remove(last_filename)
                if files:
                    self.next_reader = ParquetReader(files.pop()).open_parquet()
        else:
            self.current_reader = await self.next_reader
            if files:
                self.next_reader = ParquetReader(files.pop()).open_parquet()
            else:
                self.next_reader = None                
                
    def open_new_writer(self, current_fold, current_chunk):
        # logic for next chunk name
        writer_name = os.path.join(self.write_dir, f'fold{current_fold}-{current_chunk}-{self.chunk_size}')
        return ChunkWriter(writer_name, self.chunk_size)
    
def encode_row(row, coordinates):
    """Encode a row from the train manifest and the coordinates in a numpy array into a tfrecord bytes string"""
    coordinates_encoded = coordinates.tobytes()
    participant_id = int(row.participant_id)
    sequence_id = int(row.sequence_id)
    sequence = np.array(list(map(lambda x: WORD_DICT[x], row.phrase))).tobytes()
    record_bytes = tf.train.Example(features=tf.train.Features(feature={
        'coordinates': tf.train.Feature(bytes_list=tf.train.BytesList(value=[coordinates_encoded])),
        'participant_id': tf.train.Feature(int64_list=tf.train.Int64List(value=[participant_id])),
        'sequence_id': tf.train.Feature(int64_list=tf.train.Int64List(value=[sequence_id])),
        'sequence': tf.train.Feature(bytes_list=tf.train.BytesList(value=[sequence]))})).SerializeToString()
    return record_bytes

In [None]:
!rm -rf /tmp/fold*

In [None]:
### TODO: move this to the colab notebook

# # from IPython.display import HTML
# import matplotlib.animation as animation
# from matplotlib.animation import FuncAnimation

# fig, ax = plt.subplots()

# def filter_nans(frames):
#     return 

# def plot_frame(frame, edges=[], indices=[]):
#     frame[np.isnan(frame)] = 0
#     x = list(frame[...,0])
#     y = list(frame[...,1])
#     if len(indices) == 0:
#         indices = list(range(len(x)))
#     ax.clear()
#     ax.scatter(x, y, color='blue')
#     for i in range(len(x)):
#         ax.text(x[i], y[i], indices[i])
    
#     ax.set_xticks([])
#     ax.set_yticks([])
#     ax.set_xticklabels([])
#     ax.set_yticklabels([])
    
# def animate_frames(frames, edges=[], indices=[]):
#         anim = FuncAnimation(fig, lambda frame: plot_frame(frame, edges, indices), frames=frames, interval=100)
#         return HTML(anim.to_jshtml())

In [None]:
DATASET_NAME = f'ASLF-{n_splits}fold'

os.makedirs(f'/tmp/{DATASET_NAME}', exist_ok=True)

with open('/kaggle/input/kaggleapi/kaggle.json') as f:
    kaggle_creds = json.load(f)
    
os.environ['KAGGLE_USERNAME'] = kaggle_creds['username']
os.environ['KAGGLE_KEY'] = kaggle_creds['key']

!kaggle datasets init -p /tmp/{DATASET_NAME}

with open(f'/tmp/{DATASET_NAME}/dataset-metadata.json') as f:
    dataset_meta = json.load(f)

dataset_meta['id'] = f'jonathanpick/{DATASET_NAME}'
dataset_meta['title'] = DATASET_NAME

with open(f'/tmp/{DATASET_NAME}/dataset-metadata.json', 'w') as output:
    json.dump(dataset_meta, output)
print(dataset_meta)

!cp /tmp/{DATASET_NAME}/dataset-metadata.json /tmp/{DATASET_NAME}/meta.json
!ls /tmp/{DATASET_NAME}

In [None]:
writeGroup = ParallelFoldWriterGroup(manifest, f'/tmp/{DATASET_NAME}', 5, 256, 10)
await writeGroup.main_write_loop()

In [None]:
from datetime import datetime
version_name = datetime.now().strftime("%Y%m%d-%H%M%S")
print(version_name)

In [None]:
import boto3

settings = {
    'id': 'id',
    'secret': 'secret'
}

session = boto3.Session(
    aws_access_key_id=settings['id'],
    aws_secret_access_key=settings['secret'],
    region_name='us-west-1'
)

s3 = session.resource('s3')

In [None]:
bucket_name = DATASET_NAME.lower() + "-" + version_name
bucket = s3.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={
                'LocationConstraint': 'us-west-1'
            }
        )

In [None]:
def upload_directory(bucket, directory):
    for root, dirs, files in os.walk(directory):
        for file in files:
            bucket.upload_file(os.path.join(root, file), file)
        

In [None]:
upload_directory(bucket, f'/tmp/{DATASET_NAME}')