Permalink
Browse files

Simplify redis pubsub

  • Loading branch information...
masonforest committed Oct 30, 2018
1 parent 479935c commit 4a39f21c1d52daea4cf8cc71c5ddfbfe1f6e4eee
@@ -7,6 +7,7 @@ defmodule Blacksmith.Application do

def start(_type, _args) do
import Supervisor.Spec
Process.flag(:trap_exit, true)
# List all child processes to be supervised
:pg2.create("websocket::blocks")

@@ -34,6 +35,17 @@ defmodule Blacksmith.Application do
Supervisor.start_link(children, opts)
# Do the work you desire here
end
def handle_info(payload, state) do
IO.inspect payload
{:noreply, state}
end
def handle_call(payload, _from, state) do
IO.inspect payload
{:noreply, state}
end




defp dispatch do
[

This file was deleted.

Oops, something went wrong.
@@ -30,7 +30,7 @@ defmodule Models.Block do
def hash(block), do: Crypto.hash(to_binary(block))

def forge() do
TransactionProccessor.proccess_transactions()
TransactionProccessor.proccess_transactions(1)
end


@@ -24,6 +24,17 @@ defmodule Redis do
GenServer.cast(Redis, {:delete, key})
end

def publish(channel, value) when is_list(value) do
value = value
|> Enum.map(fn item -> "#{item}" end)
|> Enum.join(" ")
publish(channel, value)
end

def publish(channel, value) do
GenServer.cast(Redis, {:publish, channel, value})
end

def set_binary(key, value) do
GenServer.cast(Redis, {:set_binary, key, value})
end
@@ -84,6 +95,16 @@ defmodule Redis do
{:noreply, redis}
end

def handle_cast({:publish, channel, value}, redis) do
Redix.command(redis, [
"PUBLISH",
channel,
value,
])

{:noreply, redis}
end

def handle_cast({:delete, key}, redis) do
Redix.command(redis, [
"DEL",
@@ -13,7 +13,8 @@ defmodule TransactionPool do
subscribers: %{},
processes: %{},
redis: redis,
results: %{}
results: %{},
auto_forge: false,
})}
end

@@ -42,11 +43,32 @@ defmodule TransactionPool do
{:add, transaction},
{_pid, _reference},
state = %{
redis: _redis
redis: _redis,
auto_forge: auto_forge,
}
) do
Redis.push("transactions::queued", transaction)

if auto_forge do
TransactionProccessor.proccess_transactions(1000)
end

{:reply, {:ok, nil}, state}
end

def enable_auto_forging() do
GenServer.cast(__MODULE__, {:enable_auto_forging})
end

def disable_auto_forging() do
GenServer.cast(__MODULE__, {:disable_auto_forging})
end

def handle_cast({:enable_auto_forging}, state) do
{:noreply, %{state | auto_forge: true}}
end

def handle_cast({:disable_auto_forging}, state) do
{:noreply, %{state | auto_forge: false}}
end
end
@@ -4,15 +4,17 @@ defmodule TransactionProccessor do
@crate "transaction_processor"

def start_link(opts) do
Port.open({:spawn_executable, path_to_executable},
args: ["redis://127.0.0.1/"]
)

GenServer.start_link(__MODULE__, %{}, opts)
end



def init(state) do
{:ok, redis} = Redix.start_link()
Port.open({:spawn_executable, path_to_executable()},
args: ["redis://127.0.0.1/"]
)
# Redis.subscribe("transaction_processor", "done", [__MODULE__, :done, []])

{:ok,
Map.merge(state, %{
@@ -21,29 +23,21 @@ defmodule TransactionProccessor do
}
end

def proccess_transactions() do
start_proccessing_transactions()
:timer.apply_after(15000, __MODULE__, :start_proccessing_transactions, [])
end

def start_proccessing_transactions do
GenServer.cast(__MODULE__, {:start_proccessing_transactions})
def proccess_transactions(duration) do
GenServer.cast(__MODULE__, {:proccess_transactions, duration})
end

def stop_proccessing_transactions do
GenServer.cast(__MODULE__, {:start_proccessing_transactions})
end

def handle_cast({:start_proccessing_transactions}, state) do
Redis.set_binary("continue_processing_transactions", true)
def handle_cast({:proccess_transactions, duration}, state) do
Redis.publish("transaction_processor", ["proccess_transactions", duration])
{:noreply, state}
end

def handle_cast({:stop_proccessing_transactions}, state) do
Redis.set_binary("continue_processing_transactions", false)
def handle_info({_port, {:data, message}}, state) do
IO.write message
{:noreply, state}
end


def path_to_executable() do
Path.expand("../priv/native/#{@crate}", __DIR__)
end

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
Oops, something went wrong.

0 comments on commit 4a39f21

Please sign in to comment.