Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear old version conditions #1143

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions lib/archethic/beacon_chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,7 @@ defmodule Archethic.BeaconChain do
@spec get_summary_slots(binary()) :: list(TransactionSummary.t())
def get_summary_slots(subset) when is_binary(subset) do
SummaryCache.stream_current_slots(subset)
|> Stream.map(fn
{slot, _} -> slot
slot -> slot
end)
|> Stream.map(fn {slot, _} -> slot end)
|> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} ->
transaction_summaries =
transaction_attestations
Expand Down
32 changes: 0 additions & 32 deletions lib/archethic/beacon_chain/slot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ defmodule Archethic.BeaconChain.Slot do
alias Archethic.P2P
alias Archethic.P2P.Node

alias Archethic.TaskSupervisor

alias Archethic.TransactionChain.TransactionSummary

alias Archethic.Utils
Expand Down Expand Up @@ -630,34 +628,4 @@ defmodule Archethic.BeaconChain.Slot do
do: true

def empty?(%__MODULE__{}), do: false

@doc """
Apply a tranformation of a transaction summary based on the blockchain version
"""
@spec transform(binary(), t()) :: t()
def transform(
"1.1.0",
slot = %__MODULE__{transaction_attestations: attestations}
) do
if Enum.any?(attestations, fn %ReplicationAttestation{version: version} -> version == 1 end) do
new_attestations =
Task.Supervisor.async_stream_nolink(
TaskSupervisor,
attestations,
fn attestation = %ReplicationAttestation{transaction_summary: summary} ->
new_summary = TransactionSummary.transform("1.1.0", summary)
%ReplicationAttestation{attestation | transaction_summary: new_summary}
end,
max_concurrency: System.schedulers_online() * 10
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, attestation} -> attestation end)

%__MODULE__{slot | transaction_attestations: new_attestations}
else
slot
end
end

def transform(_, slot), do: slot
end
8 changes: 1 addition & 7 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ defmodule Archethic.BeaconChain.Subset do
end

def handle_call(:get_current_slot, _from, state = %{current_slot: current_slot}) do
current_slot = Slot.transform("1.1.0", current_slot)
{:reply, current_slot, state}
end

Expand Down Expand Up @@ -356,8 +355,6 @@ defmodule Archethic.BeaconChain.Subset do
defp broadcast_beacon_slot(subset, next_time, slot) do
nodes = P2P.authorized_and_available_nodes(next_time, true)

slot = Slot.transform("1.1.0", slot)

subset
|> Election.beacon_storage_nodes(next_time, nodes)
|> P2P.broadcast_message(%NewBeaconSlot{slot: slot})
Expand All @@ -367,10 +364,7 @@ defmodule Archethic.BeaconChain.Subset do
beacon_slots =
subset
|> SummaryCache.stream_current_slots()
|> Stream.map(fn
{slot, _} -> slot
slot -> slot
end)
|> Stream.map(fn {slot, _} -> slot end)

if Enum.empty?(beacon_slots) do
:ok
Expand Down
28 changes: 6 additions & 22 deletions lib/archethic/beacon_chain/subset/summary_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,8 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
defp clean_previous_summary_cache(subset, previous_summary_time) do
subset
|> stream_current_slots()
|> Stream.filter(fn
{%Slot{slot_time: slot_time}, _} ->
DateTime.compare(slot_time, previous_summary_time) != :gt

%Slot{slot_time: slot_time} ->
DateTime.compare(slot_time, previous_summary_time) != :gt
|> Stream.filter(fn {%Slot{slot_time: slot_time}, _} ->
DateTime.compare(slot_time, previous_summary_time) != :gt
end)
|> Stream.each(fn item ->
:ets.delete_object(@table_name, {subset, item})
Expand Down Expand Up @@ -133,13 +129,8 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
content = File.read!(backup_file_path)

deserialize(content, [])
|> Enum.each(fn
{slot = %Slot{subset: subset}, node_public_key} ->
true = :ets.insert(@table_name, {subset, {slot, node_public_key}})

# Backward compatibility
{slot = %Slot{subset: subset}} ->
true = :ets.insert(@table_name, {subset, slot})
|> Enum.each(fn {slot = %Slot{subset: subset}, node_public_key} ->
true = :ets.insert(@table_name, {subset, {slot, node_public_key}})
end)
else
:ok
Expand All @@ -159,14 +150,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
{slot_size, rest} = VarInt.get_value(rest)
<<slot_bin::binary-size(slot_size), rest::binary>> = rest
{slot, _} = Slot.deserialize(slot_bin)

# Backward compatibility
try do
{node_public_key, rest} = Utils.deserialize_public_key(rest)
deserialize(rest, [{slot, node_public_key} | acc])
rescue
_ ->
deserialize(rest, [{slot} | acc])
end
{node_public_key, rest} = Utils.deserialize_public_key(rest)
deserialize(rest, [{slot, node_public_key} | acc])
end
end
258 changes: 0 additions & 258 deletions priv/migration_tasks/prod/1.1.0@migrate_old_tx_summaries.exs

This file was deleted.