Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

support option to enable compression and hide details of compression from users #123

Open
foreverneverer opened this issue Aug 4, 2020 · 1 comment
Labels
enhancement New feature or request

Comments

@foreverneverer
Copy link
Contributor

This issue is based on #121 and is the details of Support auto-compress.

This feature is simple:

class ZstdWrapper {
        // .......

public static byte[] tryDecompress(byte[] src) {
    byte[] decompressedValue;
    try {
      decompressedValue = decompress(src);
    } catch (PException e) {
      // decompress fail
      decompressedValue = src;
    }
    return decompressedValue;
  }
}


class PegasusTable {
//set
value = autoCompress? ZstdWrapper.compress(value) : value
//get
byte[] value = autoCompress? ZstdWrapper.tryDecompress(value) : value;
}
@neverchanje neverchanje added the enhancement New feature or request label Aug 5, 2020
@neverchanje neverchanje changed the title Add auto-compress function support option to enable compression and hide details of compression from users Aug 5, 2020
@neverchanje
Copy link

neverchanje commented Aug 5, 2020

"auto-compression" is not an accurate saying of this feature.

Background

Design

There's something more to discuss:

1. API Design

How to design the API? Can we make this feature backward compatible with the previous API? Is one more new method needed for every type of Pegasus RPC?


Since it's required to be backward compatible with API modified as minimum as possible, we provide this feature as a table-level option rather than a call-level option.

class TableOptions {
  // Compression is one of the options.
  bool enableCompression() {}
  bool enableAutoRetry() {}
  bool enableBackupRequest() {}

  TableOptions withZstdCompression() {}
  TableOptions withAutoRetry(int maxRetries, BackoffPolicy backoff) {}
  TableOptions withBackupRequest(int backupRequestDelayMs) {}
}

class PegasusClientInterface {
  PegasusTableInterface openTable(String tableName, TableOptions opts);
}

However, there may be some cases that users require finer-grained control over RPC.
For example:

  1. requests (within a table) for write (multiset) with auto-retry but read (multiget) without retrying
  2. requests (within a table) for one type of data with compression but others not.

2. Implementation Design

How can we design the API with as little modification of TableHandler as possible? In other words, can we design TableHandler to be more extensible, for features like compression or auto-retry?


One possible solution is to firstly refactor TableHandler, this refactoring suits go-client as well. It was inspired by grpc UnaryInterceptor and CallOption.

// TableInterceptor is a plugin to control the behavior of an RPC to a Pegasus table.
// 
//        request
//  client ----> Pegasus Table
//           |
//           | ---- TableInterceptor is a middle layer to control the RPC process.
//           |
//         <---- 
//        response 
//
interface TableInterceptor {
  // The behavior before the ReplicaSession sends the RPC.
  void interceptBefore(ClientRequestRound rpc, TableHandler table, TableOptions opts) throws Exception;
  // The behavior after the ReplicaSession get reply or failure of the RPC.
  void interceptAfter(ClientRequestRound rpc, error_types errno, TableHandler table, TableOptions opts) Exception;
}

This is a simple implementation of interceptor of backup request.

class BackupRequestInterceptor implements TableInterceptor {
  void interceptBefore(ClientRequestRound rpc, TableHandler table, TableOptions opts) throws Exception {
    if (opts.enableBackupRequest()) {
      rpc.backupRequestTask = executor.schedule();
    }
  }

  void interceptAfter(ClientRequestRound rpc, TableHandler table, TableOptions opts) throws Exception {}

  private int backupRequestDelayMs;
}

To implement compression:

class CompressionInterceptor implements TableInterceptor {
  void interceptBefore(ClientRequestRound rpc, TableHandler table, TableOptions opts) throws Exception {
    if(opts.enableCompression()) {
      rpc.operator.compress();
    }
  }

  void interceptAfter(ClientRequestRound rpc, TableHandler table, TableOptions opts) throws Exception {
    if(opts.enableCompression()) {
      rpc.operator.decompress();
    }
  }
}

Where are these interceptors registered? One possible mean is:

class TableHandler {
  public TableHandler(ClusterManager mgr, String name, TableOptions options) {
    ...
    interceptors.add(new CompressionInterceptor()).add(new BackupRequestInterceptor());    
  }

  void call() {
    for(TableInterceptor interceptor : interceptors) {
      interceptor.interceptBefore();
    }
    primarySession.send( () -> {
      for(TableInterceptor interceptor : interceptors) {
        interceptor.interceptAfter();
      }
    }
  }

  private List<TableInterceptor> interceptors;
}

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants