Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 151 additions & 23 deletions lib/mix/lib/mix/sync/lock.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ defmodule Mix.Sync.Lock do
# successfully link to lock_N. At that point we can clean up all
# the files, so we perform these steps:
#
# * move our port_P to lock_0
# * remove all the other port_P files
# * replace lock_0 content with port P
# * remove all port_P files
# * remove all lock_1+ files
#
# It is important to perform these steps in this order, to avoid
Expand All @@ -44,14 +44,14 @@ defmodule Mix.Sync.Lock do
# the port_P file will no longer exist (once lock_N is removed).
#
# Finally, note that we do not remove the lock file in `unlock/1`.
# If we did that, another process could try to connect and fail
# because the file would not exist, in such case the process would
# assume the file is stale and needs to be replaced, therefore
# possibly replacing another process who successfully links at the
# empty spot. This means we effectively always leave a stale file,
# however, in order to shortcut the port check for future processes,
# we atomically replace the file content with port 0, to indicate
# the file is stale.
# If we did that, another process could read it before removal,
# then it may connect and fail (once the socket is closed), in such
# case the process would assume the file is stale and needs to be
# replaced, therefore possibly replacing another process who
# successfully links at the empty spot. This means we effectively
# always leave a stale file, however, in order to shortcut the port
# check for future processes, we atomically replace the file content
# with port 0, to indicate the file is stale.
#
# The main caveat of using ephemeral TCP ports is that they are not
# unique. This creates a theoretical scenario where the lock holder
Expand All @@ -72,6 +72,7 @@ defmodule Mix.Sync.Lock do
@probe_data "mixlock"
@probe_data_size byte_size(@probe_data)
@probe_timeout_ms 5_000
@version 2

@typedoc """
Options for `with_lock/3`.
Expand Down Expand Up @@ -132,7 +133,11 @@ defmodule Mix.Sync.Lock do
defp base_path do
# We include user in the dir to avoid permission conflicts across users
user = System.get_env("USER", "default")
Path.join(System.tmp_dir!(), "mix_lock_#{Base.url_encode64(user, padding: false)}")

Path.join(
System.tmp_dir!(),
"mix_lock_#{@version}_#{Base.url_encode64(user, padding: false)}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bumped a version to make sure we don't try to read old lock files, since the expected format changed.

)
end

defp lock_disabled?(), do: System.get_env("MIX_OS_CONCURRENCY_LOCK") in ~w(0 false)
Expand Down Expand Up @@ -177,7 +182,7 @@ defmodule Mix.Sync.Lock do
port_path = Path.join(path, "port_#{port}")
os_pid = System.pid()

File.write!(port_path, <<port::unsigned-integer-32, os_pid::binary>>, [:raw])
switch_file_create!(port_path, encode_lock_info(port, os_pid))

case grab_lock(path, port_path, 0) do
{:ok, 0} ->
Expand All @@ -186,7 +191,7 @@ defmodule Mix.Sync.Lock do

{:ok, _n} ->
# We grabbed lock_1+, so we need to replace lock_0 and clean up
take_over(path, port_path)
take_over(path, port, os_pid)
%{socket: socket, path: path}

{:taken, probe_socket, os_pid} ->
Expand Down Expand Up @@ -253,10 +258,15 @@ defmodule Mix.Sync.Lock do
end

defp fetch_probe_port(port_path) do
case File.read(port_path) do
{:ok, <<0::unsigned-integer-32>>} -> {:error, :ignore}
{:ok, <<port::unsigned-integer-32, os_pid::binary>>} -> {:ok, port, os_pid}
{:error, reason} -> {:error, reason}
case switch_file_read(port_path) do
{:ok, data} ->
case decode_lock_info(data) do
{0, _os_pid} -> {:error, :ignore}
{port, os_pid} -> {:ok, port, os_pid}
end

{:error, reason} ->
{:error, reason}
end
end

Expand Down Expand Up @@ -305,15 +315,14 @@ defmodule Mix.Sync.Lock do
end
end

defp take_over(path, port_path) do
defp take_over(path, port, os_pid) do
# The operations here must happen in precise order, so if anything
# fails, we keep the files as is and the next process that grabs
# the lock will do the cleanup

lock_path = Path.join(path, "lock_0")

# We linked to lock_N successfully, so port_path should exist
File.rename!(port_path, lock_path)
switch_file_replace!(lock_path, encode_lock_info(port, os_pid))

names = File.ls!(path)

Expand Down Expand Up @@ -342,14 +351,133 @@ defmodule Mix.Sync.Lock do
end

defp unlock(lock) do
port_path = Path.join(lock.path, "port_0")
lock_path = Path.join(lock.path, "lock_0")

File.write!(port_path, <<0::unsigned-integer-32>>, [:raw])
File.rename!(port_path, lock_path)
switch_file_replace!(lock_path, encode_lock_info(0, ""))
after
# Closing the socket will cause the accepting process to finish
# and all accepted sockets (tied to that process) will get closed
:gen_tcp.close(lock.socket)
end

defp encode_lock_info(port, os_pid) do
os_pid_size = byte_size(os_pid)

if os_pid_size > 32 do
Mix.raise("unexpectedly long PID: #{inspect(os_pid)}")
end
Comment on lines +366 to +368
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I picked an arbitrary limit. Ideally we would store OS PID it as an integer, but my understanding is that it's not guaranteed to be one.


# The info needs to have fixed size, so we pad os_pid to maximum
# of 32 bytes (we expect it to be a few bytes).
padding_size = 32 - os_pid_size
padding = :binary.copy(<<0>>, padding_size)

<<
port::unsigned-integer-32,
padding_size::unsigned-integer-8,
padding::binary,
os_pid::binary
>>
end

defp decode_lock_info(data) do
<<
port::unsigned-integer-32,
padding_size::unsigned-integer-8,
_padding::binary-size(padding_size),
os_pid::binary
>> = data

{port, os_pid}
end

# We need a mechanism to atomically replace file content. Typically,
# we could use File.rename/2 to do that, however File.rename/2 is
# not atomic on Windows, if the destination exists [1].
#
# As an alternative approach we use a switch-file. The file content
# consists of 1 switch byte (either 0 or 1) and two content segments
# with fixed, equal lengths. The switch byte indicates which segment
# is currently active. To replace the file content, we write to the
# non-active segment and call :file.sync/1 to ensure the segment is
# persisted, then we toggle the switch byte. While we cannot write
# multiple bytes atomically (since they may reside in multiple disk
# sectors), if we toggle only a single byte, there is no intermediate
# invalid state, which gives us the atomic replace we need.
#
# Note that file content can be replaced only by a single process
# at a time.
#
# [1]: https://github.com/elixir-lang/elixir/pull/14793#issuecomment-3338665065
defp switch_file_create!(path, content) do
data = <<0, content::binary, content::binary>>
File.write!(path, data, [:raw])
end

defp switch_file_replace!(path, new_content) do
file = File.open!(path, [:read, :write, :binary, :raw])

content_size = byte_size(new_content)

<<switch_byte>> = read_bytes!(file, 1)

try do
inactive_content_position =
case switch_byte do
0 -> 1 + content_size
1 -> 1
end

# Write new data
file_pwrite_sync!(file, inactive_content_position, new_content)

# Toggle switch byte - it's a single byte so the content changes
# atomically
file_pwrite_sync!(file, 0, <<1 - switch_byte>>)
after
File.close(file)
end
end

defp switch_file_read(path) do
with {:ok, data} <- File.read(path) do
<<switch_byte, rest::binary>> = data
content_size = rest |> byte_size() |> div(2)
<<content1::binary-size(^content_size), content2::binary-size(^content_size)>> = rest

case switch_byte do
0 -> {:ok, content1}
1 -> {:ok, content2}
end
end
end

defp read_bytes!(file, bytes) do
case :file.read(file, bytes) do
{:ok, data} ->
data

:eof ->
raise "unexpected EOF of file when reading file"

{:error, reason} ->
raise File.Error, reason: reason, action: "read file"
end
end

defp file_pwrite_sync!(file, position, bytes) do
case :file.pwrite(file, position, bytes) do
:ok ->
case :file.sync(file) do
:ok ->
:ok

{:error, reason} ->
raise File.Error, reason: reason, action: "sync file"
end

{:error, {_n, reason}} ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file:pwrite/3 documentation and specs say it returns ok or an {error, atom()} tuple so dialyzer warns this pattern will never match

raise File.Error, reason: reason, action: "write to file at position"
end
end
end
Loading