Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support asynchronous logging #40

Open
ablaom opened this issue Sep 24, 2023 · 15 comments
Open

Support asynchronous logging #40

ablaom opened this issue Sep 24, 2023 · 15 comments
Assignees
Labels
invalid This doesn't seem right

Comments

@ablaom
Copy link
Member

ablaom commented Sep 24, 2023

It seems one cannot log runs to a single experiment asynchronously:

using MLJModels
using MLJBase
using MLFlowClient
using MLJFlow

logger = MLFlowLogger("http://127.0.0.1:5000", experiment_name="white moon")

X, y = @load_iris

using .Threads
model = (@iload DecisionTreeClassifier pkg=DecisionTree)()
nthreads()
# 5

@sync for i in 1:5
    Threads.@spawn evaluate(model, X, y; logger)
end

    nested task error: HTTP.Exceptions.StatusError(400, "POST", "/api/2.0/mlflow/experiments/create", HTTP.Messages.Response:
    """
    HTTP/1.1 400 Bad Request
    Server: gunicorn
    Date: Sun, 24 Sep 2023 20:35:24 GMT
    Connection: close
    Content-Type: application/json
    Content-Length: 95
    
    {"error_code": "RESOURCE_ALREADY_EXISTS", "message": "Experiment 'white moon' already exists."}""")
    Stacktrace:
      [1] mlfpost(mlf::MLFlow, endpoint::String; kwargs::Base.Pairs{Symbol, Union{Missing, Nothing, String}, Tuple{Symbol, Symbol, Symbol}, NamedTuple{(:name, :artifact_location, :tags), Tuple{String, Nothing, Missing}}})
        @ MLFlowClient ~/.julia/packages/MLFlowClient/Szkbv/src/utils.jl:74
      [2] mlfpost
        @ ~/.julia/packages/MLFlowClient/Szkbv/src/utils.jl:66 [inlined]
      [3] createexperiment(mlf::MLFlow; name::String, artifact_location::Nothing, tags::Missing)                                                                                    
        @ MLFlowClient ~/.julia/packages/MLFlowClient/Szkbv/src/experiments.jl:21
      [4] createexperiment
        @ ~/.julia/packages/MLFlowClient/Szkbv/src/experiments.jl:16 [inlined]
      [5] #getorcreateexperiment#7
        @ ~/.julia/packages/MLFlowClient/Szkbv/src/experiments.jl:103 [inlined]
      [6] log_evaluation(logger::MLFlowLogger, performance_evaluation::PerformanceEvaluation
{MLJDecisionTreeInterface.DecisionTreeClassifier, Vector{LogLoss{Float64}}, Vector{Float64}, Vector{typeof(predict)}, Vector{Vector{Float64}}, Vector{Vector{Vector{Float64}}}, Vector{NamedTuple{(:tree, :raw_tree, :encoding, :features), Tuple{DecisionTree.InfoNode{Float64, UInt32}, DecisionTree.Root{Float64, UInt32}, Dict{UInt32, CategoricalArrays.CategoricalValue{String, UInt32}}, Vector{Symbol}}}}, Vector{NamedTuple{(:classes_seen, :print_tree, :features), Tuple{CategoricalArrays.CategoricalVector{String, UInt32, String, CategoricalArrays.CategoricalValue{String, UInt32}, Union{}}, MLJDecisionTreeInterface.TreePrinter{DecisionTree.Root{Float64, UInt32}}, Vector{Symbol}}}}, CV})
        @ MLJFlow ~/.julia/packages/MLJFlow/TqEtw/src/base.jl:2
      [7] evaluate!(mach::Machine{MLJDecisionTreeInterface.DecisionTreeClassifier, true}, resampling::Vector{Tuple{Vector{Int64}, UnitRange{Int64}}}, weights::Nothing, class_weights::Nothing, rows::Nothing, verbosity::Int64, repeats::Int64, measures::Vector{LogLoss{Float64}}, operations::Vector{typeof(predict)}, acceleration::CPU1{Nothing}, force::Bool, logger::MLFlowLogger, user_resampling::CV)                                                          
        @ MLJBase ~/.julia/packages/MLJBase/ByFwA/src/resampling.jl:1314
      [8] evaluate!(::Machine{MLJDecisionTreeInterface.DecisionTreeClassifier, true}, ::CV, ::Nothing, ::Nothing, ::Nothing, ::Int64, ::Int64, ::Vector{LogLoss{Float64}}, ::Vector{typeof(predict)}, ::CPU1{Nothing}, ::Bool, ::MLFlowLogger, ::CV)                           
        @ MLJBase ~/.julia/packages/MLJBase/ByFwA/src/resampling.jl:1335
      [9] evaluate!(mach::Machine{MLJDecisionTreeInterface.DecisionTreeClassifier, true}; resampling::CV, measures::Nothing, measure::Nothing, weights::Nothing, class_weights::Nothing, operations::Nothing, operation::Nothing, acceleration::CPU1{Nothing}, rows::Nothing, repeats::Int64, force::Bool, check_measure::Bool, verbosity::Int64, logger::MLFlowLogger)   
        @ MLJBase ~/.julia/packages/MLJBase/ByFwA/src/resampling.jl:1015
     [10] evaluate(::MLJDecisionTreeInterface.DecisionTreeClassifier, ::NamedTuple{(:sepal_length, :sepal_width, :petal_length, :petal_width), NTuple{4, Vector{Float64}}}, ::Vararg{Any}; cache::Bool, kwargs::Base.Pairs{Symbol, MLFlowLogger, Tuple{Symbol}, NamedTuple{(:logger,), Tuple{MLFlowLogger}}})         
        @ MLJBase ~/.julia/packages/MLJBase/ByFwA/src/resampling.jl:1029
     [11] (::var"#7#8")()
        @ Main ./threadingconstructs.jl:373

<repeats several times>
@ablaom
Copy link
Member Author

ablaom commented Sep 24, 2023

@deyandyankov Could this possibly originate from a limitation of MLFlowClient.jl or mlflow itself?

@deyandyankov
Copy link
Collaborator

@ablaom was at least one experiment recorded? Seems to me that the first time evaluate was spawned, an experiment was created (white moon), and then the other executions try to create an experiment with the same name, which mlflow is denying:

    {"error_code": "RESOURCE_ALREADY_EXISTS", "message": "Experiment 'white moon' already exists."}""")

There is no strict limitation in MLFlowClient to explicitly disable concurrency as far as I can remember.

@pebeto
Copy link
Member

pebeto commented Sep 28, 2023

@deyandyankov Could it be related with the way mlflow handles this pseudo-random names? Maybe they use the unix timestamp.

@ablaom
Copy link
Member Author

ablaom commented Sep 28, 2023

| was at least one experiment recorded?

Yes.

If we send a message to the service to create a new experiment, don t' we need to block logging until both the experiment is created and an experiment name is allocated? I don't see any blocking happening at present. https://docs.julialang.org/en/v1/manual/asynchronous-programming/#Communicating-with-Channels

@ablaom
Copy link
Member Author

ablaom commented Oct 5, 2023

Looks like mlflow does not (or at least at one point did not) support asynchronous actions: mlflow/mlflow#1550 (comment)

@pebeto
Copy link
Member

pebeto commented Oct 5, 2023

In that case, we can generate the random names by ourselves, solving the naming problem you identified.

@ablaom
Copy link
Member Author

ablaom commented Oct 5, 2023

I'm not sure I follow. Perhaps I misunderstand the problem. Be great if you can post a PR to test your theory.

@pebeto
Copy link
Member

pebeto commented Nov 4, 2023

mlflow 2.8.0 was released with an experimental async logging for metrics, params and tags. Maybe we can take this again.

https://mlflow.org/news/2023/10/29/2.8.0-release/index.html

@ablaom
Copy link
Member Author

ablaom commented Nov 5, 2023

Thanks for flagging the update!

I'm guessing this won't "just work" and that we need to buy into some new messaging, or something?

@pebeto
Copy link
Member

pebeto commented Dec 31, 2023

This is not already fixed by mlflow. Including the response you posted, I'm getting four experiments with the same name (that must be impossible).

I suggest that we can handle something like a queue in MLFlowClient to avoid this kind of issues, or simply disallowing concurrency in our project. Below are the code we need to be aware of.

function mlfget(mlf, endpoint; kwargs...)
    apiuri = uri(mlf, endpoint, kwargs)
    apiheaders = headers(mlf, Dict("Content-Type" => "application/json"))
    try
        response = HTTP.get(apiuri, apiheaders)
        return JSON.parse(String(response.body))
    catch e
        throw(e)
    end
end

function mlfpost(mlf, endpoint; kwargs...)
    apiuri = uri(mlf, endpoint)
    apiheaders = headers(mlf, Dict("Content-Type" => "application/json"))
    body = JSON.json(kwargs)
    try
        response = HTTP.post(apiuri, apiheaders, body)
        return JSON.parse(String(response.body))
    catch e
        throw(e)
    end
end

function getexperiment(mlf::MLFlow, experiment_id::Integer)
    try
        result = _getexperimentbyid(mlf, experiment_id)
        return MLFlowExperiment(result)
    catch e
        if isa(e, HTTP.ExceptionRequest.StatusError) && e.status == 404
            return missing
        end
        throw(e)
    end
end

function createexperiment(mlf::MLFlow; name=missing, artifact_location=missing, tags=missing)
    endpoint = "experiments/create"
    if ismissing(name)
        name = string(UUIDs.uuid4())
    end
    result = mlfpost(mlf, endpoint; name=name, artifact_location=artifact_location, tags=tags)
    experiment_id = parse(Int, result["experiment_id"])
    getexperiment(mlf, experiment_id)
end

function getorcreateexperiment(mlf::MLFlow, experiment_name::String; artifact_location=missing, tags=missing)
    exp = getexperiment(mlf, experiment_name)
    if ismissing(exp)
        exp = createexperiment(mlf, name=experiment_name, artifact_location=artifact_location, tags=tags)
    end
    exp
end

I don't know if we have something like Python async/await in Julia. Do you know someone who can help us with that? @ablaom

@pebeto pebeto self-assigned this Dec 31, 2023
@ablaom
Copy link
Member Author

ablaom commented Dec 31, 2023

Julia fully supports ansynchronous programming: https://docs.julialang.org/en/v1/manual/asynchronous-programming/

What you call a "queue" is called a Channel.

@ablaom
Copy link
Member Author

ablaom commented Jan 3, 2024

I have asked @OkonSamuel to have a look into this. He has expertise in this area (but is also quite busy),

@pebeto
Copy link
Member

pebeto commented Jan 3, 2024

Adding more information:

mlflow is not fully accepting async operations. I can't say this is completely true, but sometimes it reports three experiments with the same name. This must be impossible by its own documentation. This could be not something related to us, but can be solved using channels (not sure).

@ablaom
Copy link
Member Author

ablaom commented Jan 16, 2024

mlflow is not fully accepting async operations

Did you mean "`mlflow is now fully accepting async operations?

@ablaom ablaom transferred this issue from JuliaAI/MLJFlow.jl Mar 6, 2024
@ablaom ablaom added invalid This doesn't seem right and removed bug Something isn't working labels Mar 6, 2024
@ablaom
Copy link
Member Author

ablaom commented Mar 6, 2024

The proposal referenced above may resolve this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
invalid This doesn't seem right
Projects
None yet
Development

No branches or pull requests

3 participants