Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CUDNN_STATUS_MAPPING_ERROR when using fl::Module from a different thread #73

Closed
yutkin opened this issue Nov 11, 2019 · 2 comments
Closed

Comments

@yutkin
Copy link

yutkin commented Nov 11, 2019

I'm using wav2letter that based on Flashlight.

I'm trying to embed wav2letter decoder (fl::Sequential) in gRPC service and use it simultaneously in concurrent requests. All works fine when I have a sequential version (i.e. only one request per time). But when I try to run decoding request in a separate thread I got CUDNN_STATUS_MAPPING_ERROR error.


Normal output

root@1c4f49d5bc9f:/app# ./asr --flagsfile configs/decode.cfg  &
[1] 7609
root@1c4f49d5bc9f:/app# I1111 08:53:14.180243  7609 main.cpp:31]
ArrayFire v3.6.4 (CUDA, 64-bit Linux, build 1b8030c5)
Platform: CUDA Toolkit 10.1, Driver: 430.14
[0] GeForce GTX TITAN X, 12213 MB, CUDA Compute 5.2
-1- GeForce GTX TITAN X, 12213 MB, CUDA Compute 5.2
I1111 08:53:14.214509  7609 main.cpp:33] gRPC thread pool size=8
I1111 08:53:14.214536  7609 main.cpp:34] URLs decoder thread pool size=16
I1111 08:53:14.214545  7609 main.cpp:35] PPUSERAPI_PROXY_ADDR=psv4.userapi.com
I1111 08:53:14.214565  7609 model.cc:14] Reading flags from file configs/decode.cfg
I1111 08:53:14.214838  7609 model.cc:24] [Network] Reading acoustic model from configs/am.binary
I1111 08:53:14.761054  7609 model.cc:29] [Network] Number of params: 3920452
I1111 08:53:14.761106  7609 model.cc:41] [Network] Updating flags from config file: configs/am.binary
I1111 08:53:14.761644  7609 model.cc:58] Reading lexicon and token dictionary
I1111 08:53:15.147934  7609 model.cc:77] Building a language model
I1111 08:53:15.189512  7609 model.cc:83] LM constructed
I1111 08:53:15.743183  7609 model.cc:105] [Decoder] Trie planted
I1111 08:53:15.743968  7609 main.cpp:43] Service started at 0.0.0.0:5000

root@1c4f49d5bc9f:/app# ./test_client

Batch size: 101
I1111 08:56:02.010625  7609 server.cc:186] Got request with batch of 101 urls
E1111 08:56:02.176126  7631 convert.cpp:110] Unable to open: https://bad-url.com/kek.ogg: Input/output error
I1111 08:56:04.473172  7609 server.cc:122] Total time spent for downloading 101 URLs: 2.462 sec. Per URL: 0.0243762 sec.
I1111 08:56:04.622124  7609 W2lListFilesDataset.cpp:113] 100 files found.
I1111 08:56:04.622190  7609 Utils.cpp:98] Filtered 0/100 samples
I1111 08:56:04.622239  7609 W2lListFilesDataset.cpp:45] Total batches (i.e. iters): 100
I1111 08:56:04.622254  7609 model.cc:162] [Serialization] Running forward pass ...
I1111 08:56:08.849159  7609 model.cc:194] Total time spent for decoding preparation: 4.37159 sec. Per sample: 0.043716 sec.
I1111 08:56:08.849213  7609 model.cc:197] Decoding begin
I1111 08:56:08.849226  7609 model.cc:204] [Dataset] Number of samples per thread: 7
I1111 08:56:11.228459  7609 model.cc:237] Total time spent for decoding: 2.36044 sec. Per sample: 0.0118022 sec.
I1111 08:56:11.304963  7609 server.cc:220] Total time spent for processing batch of 101 URLs: 9.294 sec. Per URL: 0.0920198 sec.
Response size: 101

Output when doing forward in a different thread

export CUDNN_LOGDEST_DBG=stderr
root@1c4f49d5bc9f:/app# ./asr --flagsfile configs/decode.cfg  &
[1] 7337
root@1c4f49d5bc9f:/app# I1111 08:40:17.504963  7337 main.cpp:31]
ArrayFire v3.6.4 (CUDA, 64-bit Linux, build 1b8030c5)
Platform: CUDA Toolkit 10.1, Driver: 430.14
[0] GeForce GTX TITAN X, 12213 MB, CUDA Compute 5.2
-1- GeForce GTX TITAN X, 12213 MB, CUDA Compute 5.2
I1111 08:40:17.542042  7337 main.cpp:33] gRPC thread pool size=8
I1111 08:40:17.542078  7337 main.cpp:34] URLs decoder thread pool size=16
I1111 08:40:17.542104  7337 main.cpp:35] PPUSERAPI_PROXY_ADDR=psv4.userapi.com
I1111 08:40:17.542136  7337 model.cc:14] Reading flags from file configs/decode.cfg
I1111 08:40:17.542382  7337 model.cc:24] [Network] Reading acoustic model from configs/am.binary
I1111 08:40:18.092175  7337 model.cc:29] [Network] Number of params: 3920452
I1111 08:40:18.092224  7337 model.cc:41] [Network] Updating flags from config file: configs/am.binary
I1111 08:40:18.092670  7337 model.cc:58] Reading lexicon and token dictionary
I1111 08:40:18.477126  7337 model.cc:77] Building a language model
I1111 08:40:18.518196  7337 model.cc:83] LM constructed
I1111 08:40:19.078462  7337 model.cc:105] [Decoder] Trie planted
I1111 08:40:19.079236  7337 main.cpp:43] Service started at 0.0.0.0:5000

root@1c4f49d5bc9f:/app# ./test_client

Batch size: 101
I1111 08:40:22.176491  7356 server.cc:186] Got request with batch of 101 urls
E1111 08:40:22.233448  7360 convert.cpp:110] Unable to open: https://bad-url.com/kek.ogg: Input/output error
I1111 08:40:24.565629  7356 server.cc:122] Total time spent for downloading 101 URLs: 2.389 sec. Per URL: 0.0236535 sec.
I1111 08:40:24.733465  7356 W2lListFilesDataset.cpp:113] 100 files found.
I1111 08:40:24.733520  7356 Utils.cpp:98] Filtered 0/100 samples
I1111 08:40:24.733575  7356 W2lListFilesDataset.cpp:45] Total batches (i.e. iters): 100
I1111 08:40:24.733597  7356 model.cc:162] [Serialization] Running forward pass ...

I! CuDNN (v7603) function cudnnCreateTensorDescriptor() called:
i! Time: 2019-11-11T08:40:26.721171 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnSetTensorNdDescriptor() called:
i!     dataType: type=cudnnDataType_t; val=CUDNN_DATA_FLOAT (0);
i!     nbDims: type=int; val=4;
i!     dimA: type=int; val=[1,40,1,411];
i!     strideA: type=int; val=[16440,411,411,1];
i! Time: 2019-11-11T08:40:26.721292 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnCreateFilterDescriptor() called:
i! Time: 2019-11-11T08:40:26.721361 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnSetFilterNdDescriptor() called:
i!     dataType: type=cudnnDataType_t; val=CUDNN_DATA_FLOAT (0);
i!     format: type=cudnnTensorFormat_t; val=CUDNN_TENSOR_NCHW (0);
i!     nbDims: type=int; val=4;
i!     filterDimA: type=int; val=[256,40,1,8];
i! Time: 2019-11-11T08:40:26.721400 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnCreateConvolutionDescriptor() called:
i! Time: 2019-11-11T08:40:26.721460 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnSetConvolutionNdDescriptor() called:
i!     arrayLength: type=int; val=2;
i!     padA: type=int; val=[0,4];
i!     strideA: type=int; val=[1,2];
i!     dilationA: type=int; val=[1,1];
i!     mode: type=cudnnConvolutionMode_t; val=CUDNN_CROSS_CORRELATION (1);
i!     dataType: type=cudnnDataType_t; val=CUDNN_DATA_FLOAT (0);
i! Time: 2019-11-11T08:40:26.721501 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnSetConvolutionGroupCount() called:
i!     groupCount: type=int; val=1;
i! Time: 2019-11-11T08:40:26.721550 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnGetConvolutionNdForwardOutputDim() called:
i!     convDesc: type=cudnnConvolutionDescriptor_t:
i!         mode: type=cudnnConvolutionMode_t; val=CUDNN_CROSS_CORRELATION (1);
i!         dataType: type=cudnnDataType_t; val=CUDNN_DATA_FLOAT (0);
i!         mathType: type=cudnnMathType_t; val=CUDNN_DEFAULT_MATH (0);
i!         reorderType: type=int; val=0;
i!         arrayLength: type=int; val=2;
i!         padA: type=int; val=[0,4];
i!         strideA: type=int; val=[1,2];
i!         dilationA: type=int; val=[1,1];
i!         groupCount: type=int; val=1;
i!     inputTensorDesc: type=cudnnTensorDescriptor_t:
i!         dataType: type=cudnnDataType_t; val=CUDNN_DATA_FLOAT (0);
i!         nbDims: type=int; val=4;
i!         dimA: type=int; val=[1,40,1,411];
i!         strideA: type=int; val=[16440,411,411,1];
i!     filterDesc: type=cudnnFilterDescriptor_t:
i!         dataType: type=cudnnDataType_t; val=CUDNN_DATA_FLOAT (0);
i!         nbDims: type=int; val=4;
i!         dimA: type=int; val=[256,40,1,8];
i!         format: type=cudnnTensorFormat_t; val=CUDNN_TENSOR_NCHW (0);
i!     nbDims: type=int; val=4;
i! Time: 2019-11-11T08:40:26.721595 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnCreateTensorDescriptor() called:
i! Time: 2019-11-11T08:40:26.721730 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnSetTensorNdDescriptor() called:
i!     dataType: type=cudnnDataType_t; val=CUDNN_DATA_FLOAT (0);
i!     nbDims: type=int; val=4;
i!     dimA: type=int; val=[1,256,1,206];
i!     strideA: type=int; val=[52736,206,206,1];
i! Time: 2019-11-11T08:40:26.721770 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnCreate() called:
i! Time: 2019-11-11T08:40:26.721837 (0d+0h+0m+9s since start)
i! Process=7337; Thread=7356; GPU=NULL; Handle=NULL; StreamId=NULL.


I! CuDNN (v7603) function cudnnSetStream() called:
i!     handle: type=cudnnHandle_t; streamId=(nil) (defaultStream);
i!     streamId: type=cudaStream_t; streamId=0x55fad1cf1290;
i! Time: 2019-11-11T08:40:27.808954 (0d+0h+0m+10s since start)
i! Process=7337; Thread=7356; GPU=0; Handle=0x7f9aa09d3940; StreamId=(nil) (defaultStream).

terminate called after throwing an instance of 'std::runtime_error'
  what():  CUDNN_STATUS_MAPPING_ERROR
*** Aborted at 1573461627 (unix time) try "date -d @1573461627" if you are using GNU date ***
PC: @     0x7f9b5f725e97 gsignal
*** SIGABRT (@0x1ca9) received by PID 7337 (TID 0x7f9af5efe700) from PID 7337; stack trace: ***
    @     0x7f9ba6d39890 (unknown)
    @     0x7f9b5f725e97 gsignal
    @     0x7f9b5f727801 abort
    @     0x7f9b5fd7c957 (unknown)
    @     0x7f9b5fd82ab6 (unknown)
    @     0x7f9b5fd82af1 std::terminate()
    @     0x7f9b5fd82d79 __cxa_rethrow
    @     0x55facf353640 fl::getCudnnHandle()
    @     0x55facf350ab1 fl::conv2d()
    @     0x55facf314031 fl::Conv2D::forward()
    @     0x55facf328f3e fl::UnaryModule::forward()
    @     0x55facf312f7a fl::Sequential::forward()
    @     0x55facf0ef86c _ZN8AsrModel6decodeB5cxx11Ev
    @     0x55facf0cd529 AsrAsync::CallData::asyncHandle()
    @     0x55facefc29e8 AsrAsync::CallData::Proceed()
    @     0x7f9b5fdad66f (unknown)
    @     0x7f9ba6d2e6db start_thread
    @     0x7f9b5f80888f clone

***
14: Socket closed

MWE sources (CLICK ME!)

server.h

#ifndef ASR_SERVER_H
#define ASR_SERVER_H

#include "model.h"

class AsrAsync final {
public:
    AsrAsync(std::shared_ptr<AsrModel> asrModel, uint64 threadPoolSize): model(asrModel), threadPool(threadPoolSize) {}

    ~AsrAsync() {
        server_->Shutdown();
        cq_->Shutdown();
    }

    void Run(const std::string& addr) {

        ServerBuilder builder;
        builder.AddListeningPort(addr, grpc::InsecureServerCredentials());
        builder.RegisterService(&service_);
        cq_ = builder.AddCompletionQueue();
        server_ = builder.BuildAndStart();

        HandleRpcs();
    }

private:
    class CallData {
    public:
        Status asyncHandle(const AsrRequest& request_, AsrResult& reply_, ServerContext& ctx_);

        CallData(asr::AsrServicer::AsyncService* service, ServerCompletionQueue* cq, std::shared_ptr<AsrModel> asrModel)
                : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE), model(asrModel) {
            Proceed();
        }

        void Proceed() {
            if (status_ == CREATE) {
                status_ = PROCESS;

                service_->RequestRecognizeSpeech(&ctx_, &request_, &responder_, cq_, cq_, this);
            } else if (status_ == PROCESS) {

                new CallData(service_, cq_, this->model);

                auto status = asyncHandle(request_, reply_, ctx_);
                
                status_ = FINISH;
                responder_.Finish(reply_, status, this);
            } else {
                GPR_ASSERT(status_ == FINISH);
                delete this;
            }
        }

    private:
        asr::AsrServicer::AsyncService* service_;
        ServerCompletionQueue* cq_;
        ServerContext ctx_;

        AsrRequest request_;
        AsrResult reply_;

        ServerAsyncResponseWriter<AsrResult> responder_;

        std::shared_ptr<AsrModel> model;

        enum CallStatus { CREATE, PROCESS, FINISH };
        CallStatus status_;
    };

    // This can be run in multiple threads if needed.
    void HandleRpcs() {
        new CallData(&service_, cq_.get(), this->model);
        void* tag;
        bool ok;

        while (cq_->Next(&tag, &ok)) {
            GPR_ASSERT(ok);

            auto t = std::thread(std::bind(&CallData::Proceed, static_cast<CallData*>(tag)));
            t.detach();

            // Uncomment for use only in on thread
            // static_cast<CallData*>(tag)->Proceed();
        }
    }

private:
    std::unique_ptr<grpc::ServerCompletionQueue> cq_;
    asr::AsrServicer::AsyncService service_;
    std::unique_ptr<Server> server_;

    std::shared_ptr<AsrModel> model;

    fl::ThreadPool threadPool;
};


#endif //ASR_SERVER_H

server.cc

#include "server.h"


Status AsrAsync::CallData::asyncHandle(const AsrRequest &request_, AsrResult &reply_, ServerContext &ctx_)
{
    RepeatedPtrField<std::string> urls = request_.url();
    LOG(INFO) << "Got request with batch of " << urls.size() << " urls";

    // URLs downloading....

    auto result = this->model->decode();

    for (int i = 0; i < urls.size(); ++i)
    {
    	reply_.add_text();
        reply_.set_text(i, result[i]);
    }

    return Status::OK;
}

model.h

#ifndef ASR_MODEL_H
#define ASR_MODEL_H

#include <flashlight/flashlight.h>

class AsrModel {
public:
    AsrModel(int argc, char *argv[]);
    ~AsrModel() {}
    std::vector<std::string> decode();
    void runDecoder(int tid, int start, int end,
    	const w2l::EmissionSet& emissionSet,
    	std::vector<int>& shuffle_order,
    	std::vector<std::string>& result
    );

private:
    std::mutex decodeMutex;

    std::shared_ptr<fl::Module> network_;
    std::shared_ptr<w2l::LM> lm_;
    std::shared_ptr<w2l::Trie> trie_;

    w2l::DecoderOptions decoderOpt_;
    int unkWordIdx_;
    int blankIdx_;
    int silIdx_;

    w2l::Dictionary tokenDict_;
    w2l::LexiconMap lexicon_;
    w2l::Dictionary wordDict_;
    w2l::DictionaryMap dicts_;
};


#endif //ASR_MODEL_H

model.cc

#include "model.h"


AsrModel::AsrModel(int argc, char *argv[])
{
    auto flagsfile = w2l::FLAGS_flagsfile;
    if (!flagsfile.empty())
    {
        LOG(INFO) << "Reading flags from file " << flagsfile;
        gflags::ReadFromFlagsFile(flagsfile, argv[0], true);
        // Re-parse command line flags to override values in the flag file.
        gflags::ParseCommandLineFlags(&argc, &argv, false);
    }

    std::unordered_map<std::string, std::string> cfg;

    if (!w2l::FLAGS_am.empty())
    {
        LOG(INFO) << "[Network] Reading acoustic model from " << w2l::FLAGS_am;
        af::setDevice(0);
        w2l::W2lSerializer::load(w2l::FLAGS_am, cfg, this->network_);
        this->network_->eval();
        DLOG(INFO) << "[Network] " << this->network_->prettyString();
        LOG(INFO) << "[Network] Number of params: " << w2l::numTotalParams(this->network_);
    }
    else
    {
        LOG(FATAL) << "[Network] Fail to load network. Flag --am is not provided.";
    }

    auto flags = cfg.find(w2l::kGflags);
    if (flags == cfg.end())
    {
        LOG(FATAL) << "[Network] Invalid config loaded from " << w2l::FLAGS_am;
    }
    LOG(INFO) << "[Network] Updating flags from config file: " << w2l::FLAGS_am;
    gflags::ReadFlagsFromString(flags->second, gflags::GetArgv0(), true);

    // override with user-specified flags
    gflags::ParseCommandLineFlags(&argc, &argv, false);
    if (!flagsfile.empty())
    {
        gflags::ReadFromFlagsFile(flagsfile, argv[0], true);
        // Re-parse command line flags to override values in the flag file.
        gflags::ParseCommandLineFlags(&argc, &argv, false);
    }

    this->decoderOpt_ = w2l::DecoderOptions(
        w2l::FLAGS_beamsize, static_cast<float>(w2l::FLAGS_beamthreshold), static_cast<float>(w2l::FLAGS_lmweight),
        static_cast<float>(w2l::FLAGS_wordscore), static_cast<float>(w2l::FLAGS_unkweight), w2l::FLAGS_logadd,
        static_cast<float>(w2l::FLAGS_silweight), w2l::CriterionType::CTC);

    LOG(INFO) << "Reading lexicon and token dictionary";

    w2l::Dictionary tokenDict(w2l::pathsConcat(w2l::FLAGS_tokensdir, w2l::FLAGS_tokens));

    for (int64_t r = 1; r <= w2l::FLAGS_replabel; ++r)
    {
        tokenDict.addEntry(std::to_string(r));
    }
    tokenDict.addEntry(w2l::kBlankToken);
    this->tokenDict_ = tokenDict;

    this->lexicon_ = w2l::loadWords(w2l::FLAGS_lexicon, w2l::FLAGS_maxword);
    this->wordDict_ = w2l::createWordDict(this->lexicon_);
    this->dicts_ = {{w2l::kTargetIdx, this->tokenDict_}, {w2l::kWordIdx, this->wordDict_}};

    this->unkWordIdx_ = this->wordDict_.getIndex(w2l::kUnkToken);
    this->blankIdx_ = w2l::FLAGS_criterion == w2l::kCtcCriterion ? this->tokenDict_.getIndex(w2l::kBlankToken) : -1;
    this->silIdx_ = this->tokenDict_.getIndex(w2l::FLAGS_wordseparator);

    LOG(INFO) << "Building a language model";

    this->lm_ = std::make_shared<w2l::KenLM>(w2l::FLAGS_lm, this->wordDict_);
    if (!this->lm_) {
        LOG(FATAL) << "[LM constructing] Failed to load LM: " << w2l::FLAGS_lm;
    }
    LOG(INFO) << "LM constructed";

    int silIdx = this->tokenDict_.getIndex(w2l::FLAGS_wordseparator);
    this->trie_ = std::make_shared<w2l::Trie>(this->tokenDict_.indexSize(), silIdx);

    auto startState = this->lm_->start(false);

    for (auto &it : this->lexicon_)
    {
        const std::string &word = it.first;
        int usrIdx = this->wordDict_.getIndex(word);
        float score = -1;
        w2l::LMStatePtr dummyState;
        std::tie(dummyState, score) = this->lm_->score(startState, usrIdx);

        for (auto &tokens : it.second)
        {
            auto tokensTensor = tkn2Idx(tokens, this->tokenDict_, w2l::FLAGS_replabel);
            this->trie_->insert(tokensTensor, usrIdx, score);
        }
    }
    this->trie_->smear(w2l::SmearingMode::MAX);
    LOG(INFO) << "[Decoder] Trie planted";
}

void repair_order(std::vector<std::string> &result, const std::vector<int> &shuffle_order)
{
    for (int i = 0; i < result.size(); ++i)
    {
        std::swap(result[i], result[shuffle_order[i]]);
    }
}

void AsrModel::runDecoder(int tid, int start, int end, const w2l::EmissionSet &emissionSet,
                          std::vector<int> &shuffle_order, std::vector<std::string> &result)
{
    try
    {
        auto decoder = std::make_unique<w2l::WordLMDecoder>(this->decoderOpt_, this->trie_, this->lm_, this->silIdx_,
                                                            this->blankIdx_, this->unkWordIdx_, emissionSet.transition);

        for (int s = start; s < end; s++)
        {
            auto emission = emissionSet.emissions[s];
            auto sampleId = emissionSet.sampleIds[s];
            auto T = emissionSet.emissionT[s];
            auto N = emissionSet.emissionN;
            auto results = decoder->decode(emission.data(), T, N);
            auto &rawWordPrediction = results[0].words;

            rawWordPrediction = w2l::validateIdx(rawWordPrediction, this->wordDict_.getIndex(w2l::kUnkToken));
            auto wordPrediction = wrdIdx2Wrd(rawWordPrediction, this->wordDict_);

            std::lock_guard<std::mutex> guard(this->decodeMutex);

            auto wordPredictionStr = w2l::join(" ", wordPrediction);
            shuffle_order.push_back(std::stoi(sampleId) - 1);
            result.push_back(wordPredictionStr);
        }
    }
    catch (const std::exception &exc)
    {
        LOG(FATAL) << "Exception in thread " << tid << "\n" << exc.what();
    }
}

std::vector<std::string> AsrModel::decode()
{
    auto decodePreparationTimer = fl::TimeMeter();
    decodePreparationTimer.resume();

    int worldRank = 0;
    int worldSize = 1;
    int batchSize = 1;

    auto ds = w2l::createDataset(w2l::FLAGS_test, this->dicts_, this->lexicon_, batchSize, worldRank, worldSize);
    int cnt = 0;
    w2l::EmissionSet emissionSet;

    LOG(INFO) << "[Serialization] Running forward pass ...";

    for (auto &sample : *ds)
    {
//        af::print("Input array", sample[w2l::kInputIdx]);
        auto rawEmission = this->network_->forward({fl::input(sample[w2l::kInputIdx])}).front();
        int N = rawEmission.dims(0);
        int T = rawEmission.dims(1);

        auto emission = w2l::afToVector<float>(rawEmission);
        auto tokenTarget = w2l::afToVector<int>(sample[w2l::kTargetIdx]);
        auto wordTarget = w2l::afToVector<int>(sample[w2l::kWordIdx]);

        std::vector<std::string> wordTargetStr = w2l::wrdIdx2Wrd(wordTarget, this->wordDict_);

        emissionSet.emissions.emplace_back(emission);
        emissionSet.wordTargets.emplace_back(wordTargetStr);
        emissionSet.tokenTargets.emplace_back(tokenTarget);
        emissionSet.emissionT.emplace_back(T);
        emissionSet.emissionN = N;
        emissionSet.sampleIds.emplace_back(w2l::readSampleIds(sample[w2l::kSampleIdx]).front());

        ++cnt;
        if (cnt == w2l::FLAGS_maxload)
        {
            break;
        }
    }

    int nSample = emissionSet.emissions.size();

    auto timeForDecodePreparation = static_cast<float>(decodePreparationTimer.value());
    LOG(INFO) << "Total time spent for decoding preparation: " << timeForDecodePreparation
              << " sec. Per sample: " << timeForDecodePreparation / nSample << " sec.";

    LOG(INFO) << "Decoding begin";

    // Decoding
    std::vector<int> shuffle_order(nSample);
    std::vector<std::string> result(nSample);

    int nSamplePerThread = std::ceil(nSample / static_cast<float>(w2l::FLAGS_nthread_decoder));
    LOG(INFO) << "[Dataset] Number of samples per thread: " << nSamplePerThread;

    af::deviceGC();

    auto startThreads = [&]() {
        auto decodeFunc = std::bind(&AsrModel::runDecoder, this, _1, _2, _3, std::cref(emissionSet),
                                    std::ref(shuffle_order), std::ref(result));

        if (w2l::FLAGS_nthread_decoder == 1)
        {
            decodeFunc(0, 0, nSample);
        }
        else if (w2l::FLAGS_nthread_decoder > 1)
        {
            fl::ThreadPool threadPool(w2l::FLAGS_nthread_decoder);
            for (int i = 0; i < w2l::FLAGS_nthread_decoder; i++)
            {
                int start = i * nSamplePerThread;
                if (start >= nSample)
                {
                    break;
                }
                int end = std::min((i + 1) * nSamplePerThread, nSample);
                threadPool.enqueue(decodeFunc, i, start, end);
            }
        }
    };

    auto timer = fl::TimeMeter();
    timer.resume();
    startThreads();
    repair_order(result, shuffle_order);
    auto timeForDecode = static_cast<float>(timer.value());
    LOG(INFO) << "Total time spent for decoding: " << timeForDecode
              << " sec. Per sample: " << timeForDecode / result.size() << " sec.";

    return result;
}

main.cpp

#include <arrayfire.h>
#include <glog/logging.h>
#include <iostream>
#include <thread>

#include "health.h"
#include "model.h"
#include "server.h"

DEFINE_string(host, "0.0.0.0", "What host to listen on");
DEFINE_uint64(port, 5000, "What port to listen on");
DEFINE_uint64(grpc_threads, 8, "Max size of gRPC ThreadPool");
DEFINE_uint64(urls_threads, 16, "Size URL decoder ThreadPool");

std::string getEnv(const std::string &key, const std::string &def)
{
    const char *httpHost = std::getenv(key.c_str());
    return httpHost ? httpHost : def;
}

int main(int argc, char *argv[])
{
    FLAGS_alsologtostderr = 1;
    google::InitGoogleLogging(argv[0]);
    gflags::SetUsageMessage("Auto speech recognition (ASR) gRPC service");
    google::InstallFailureSignalHandler();
    gflags::ParseCommandLineFlags(&argc, &argv, false);

    auto httpHost = getEnv("PPUSERAPI_PROXY_ADDR", DEFAULT_HOST);

    LOG(INFO) << "";
    af::info();
    LOG(INFO) << "gRPC thread pool size=" << FLAGS_grpc_threads;
    LOG(INFO) << "URLs decoder thread pool size=" << FLAGS_urls_threads;
    LOG(INFO) << "PPUSERAPI_PROXY_ADDR=" << httpHost;

    auto model = std::make_shared<AsrModel>(argc, argv);
    auto serverAddr = std::string(FLAGS_host) + ":" + std::to_string(FLAGS_port);

    AsrAsync server(model, FLAGS_grpc_threads);

    DLOG(INFO) << "Gflags after parsing \n" << w2l::serializeGflags("; ");
    LOG(INFO) << "Service started at " << serverAddr;

    server.Run(serverAddr);
}

How to do forward from different threads? Is there a way to share fl::Sequential between different threads?

@yutkin
Copy link
Author

yutkin commented Nov 12, 2019

Just noticed, this simple code causes the same error:

#include "flashlight/autograd/backend/cuda/CudnnUtils.h"

int main(int argc, char *argv[])
{
    auto handle = fl::getCudnnHandle();
}

Error:

root@0a7fa3d60766:/app# ./asr
terminate called after throwing an instance of 'std::runtime_error'
  what():  CUDNN_STATUS_MAPPING_ERROR
Aborted (core dumped)

It fails on cudnnSetStream(handle, fl::cuda::getActiveStream()).

System info:

root@b712fc25d5c3:/app# nvidia-smi
Tue Nov 12 14:21:59 2019
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 430.14       Driver Version: 430.14       CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX TIT...  Off  | 00000000:0B:00.0 Off |                  N/A |
| 22%   33C    P8    17W / 250W |      0MiB / 12212MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
|   1  GeForce GTX TIT...  Off  | 00000000:13:00.0 Off |                  N/A |
| 22%   33C    P8    15W / 250W |      0MiB / 12212MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+

+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|  No running processes found                                                 |
+-----------------------------------------------------------------------------+
root@b712fc25d5c3:/app# nvcc --version
nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2019 NVIDIA Corporation
Built on Sun_Jul_28_19:07:16_PDT_2019
Cuda compilation tools, release 10.1, V10.1.243

@yutkin
Copy link
Author

yutkin commented Nov 14, 2019

Invocation of af::setDevice(af::getDevice()); in each thread of the thread pool solved this problem.

@yutkin yutkin closed this as completed Nov 14, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant