In [1]:
using Plots, Statistics, StatsBase, DataFrames, DataFramesMeta
using CoinbasePro

In [2]:
tickerStats = CoinbasePro.ticker("BTC-USD")

Row,ask,bid,price,size,time,trade_id,volume
Unnamed: 0_level_1,Float64,Float64,Float64,Float64,String,Int64,Float64
1,16081.3,16079.2,16081.3,0.00087516,2022-11-21T12:56:14.164764Z,458588513,28633.3


In [3]:
struct Trade
    id::String
    time::String
    price::Float64
    size::Float64
    side::Int64
    ask::Float64
    bid::Float64
    exchange::String
end

Trade() = Trade("", "", NaN, NaN, 0, "")

Base.isempty(x::Trade) = x.id == ""

In [4]:
using Distributed

const trades = RemoteChannel(()->Channel{Trade}(500));

In [10]:
using JSON
using Dates
using WebSockets

coinbase_url = "wss://ws-feed.pro.coinbase.com"
coinbase_subscribe_string = JSON.json(Dict(:type=>"subscribe", 
                         :product_ids=>["BTC-USD"], 
                         :channels=>["ticker", "heartbeat"]))

function parse_coinbase_data(x)
    if (get(x, "type", "") == "heartbeat") || (haskey(x, "channels"))
        println("Worker $(myid()): Coinbase Heartbeat")
        return Trade()
    end
    
    ts = get(x, "time", "")
    ask = get(x, "ask", "")
    bid = get(x, "bid", "")
    
    side = get(x, "side", "")
    tradedprice = parse(Float64, get(x, "price", "NaN"))
    size = parse(Float64, get(x, "last_size", "NaN"))
    id = get(x, "trade_id", "")
    
    Trade(string(id), ts, tradedprice, size, lowercase(side) == "buy" ? 1 : -1, ask, bid,"Coinbase")
end

function save_coinbase_trades(coinbase_url, coinbase_subscribe_string)

    WebSockets.open(coinbase_url) do ws
        write(ws, coinbase_subscribe_string)
        data, success = readguarded(ws)
        println("Entering Loop")
        while true
            data, success = readguarded(ws)
            jdata = JSON.parse(String(data))
            clean_data = parse_coinbase_data(jdata)
            if !isempty(clean_data)
              put!(trades, clean_data)
            end
        end
    end
    
end

save_coinbase_trades (generic function with 1 method)

In [11]:
using Printf

function parse_timestamp(ts::String)
    
    p1, p2 = split(ts, ".")
    
    ut = datetime2unix(DateTime(p1)) * 1e9
    ns = Nanosecond(rpad(chop(String(p2), tail=1), 9, "0"))
    
    @sprintf "%.0f" ut + ns.value 
end

function build_payload(x::Trade)
    buff = IOBuffer()
    write(buff, "coinbase_trades2,")
    write(buff, "exchange=$(getfield(x, :exchange)), ")
    for field in [:id, :price, :size]
        val = getfield(x, field)
        write(buff, "$(field)=$(val),")
    end
    write(buff, "side=$(getfield(x, :side)) ")
    
    tspretty = parse_timestamp(getfield(x, :time))
    
    write(buff, tspretty)
    write(buff, "\n")
    String(take!(buff))
end

build_payload (generic function with 1 method)

In [12]:
using Sockets
function save_trades_quest(trades)
    cs = connect("localhost", 9000)
    while true
        payload = build_payload(take!(trades))
        write(cs, (payload))
    end
    close(cs)
end

save_trades_quest (generic function with 1 method)

In [16]:
@async save_coinbase_trades(coinbase_url, coinbase_subscribe_string)
@async save_trades_quest(trades)

Task (runnable) @0x000001b0d3271f10

Entering Loop
Worker 1: Coinbase Heartbeat
