Skip to content

Commit

Permalink
add seen timings
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Feb 11, 2022
1 parent af27379 commit 00c245d
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 5 deletions.
10 changes: 9 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ if config_env() == :prod do
user_bucket: System.fetch_env!("AWS_S3_BUCKET"),
static_bucket: System.fetch_env!("AWS_S3_BUCKET_STATIC"),
static_cdn: System.fetch_env!("STATIC_CDN")

config :t, T.Events, bucket: System.fetch_env!("AWS_S3_BUCKET_EVENTS")
end

if config_env() == :dev do
Expand Down Expand Up @@ -199,6 +201,11 @@ if config_env() == :dev do
static_bucket: System.fetch_env!("AWS_S3_BUCKET_STATIC"),
static_cdn: System.fetch_env!("STATIC_CDN")

# if events_bucket = System.get_env("AWS_S3_BUCKET_EVENTS") do
# config :t, T.Events, bucket: events_bucket
# end

config :t, T.Events, disabled?: true
config :t, T.Media.Static, disabled?: !!System.get_env("DISABLE_MEDIA")
config :t, T.Periodics, disabled?: !!System.get_env("DISABLE_PERIODICS")
end
Expand Down Expand Up @@ -250,7 +257,7 @@ if config_env() == :test do
room_id: String.to_integer("-1234")

config :t, T.PushNotifications.APNS, default_topic: "app.topic"

config :t, T.Events, disabled?: true
config :t, T.Periodics, disabled?: true
config :t, Finch, disabled?: true
end
Expand All @@ -265,6 +272,7 @@ if config_env() == :bench do
config :t, T.PushNotifications.ScheduledPushes, disabled?: true
config :t, T.Matches.TimeslotPruner, disabled?: true
config :t, Finch, disabled?: true
config :t, T.Events, disabled?: true

config :t, T.Repo,
url: System.get_env("DATABASE_URL") || "ecto://postgres:postgres@localhost:5432/t_dev",
Expand Down
9 changes: 9 additions & 0 deletions lib/t/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule T.Application do
children =
[
{Task.Supervisor, name: T.TaskSupervisor},
maybe_events(),
APNS.Token,
# T.PromEx,
# TODO add apple keys endpoint and twilio (possibly aws as well)
Expand Down Expand Up @@ -84,6 +85,14 @@ defmodule T.Application do
end
end

defp maybe_events do
bucket = Keyword.get(Application.get_env(:t, T.Events), :bucket)

if bucket do
{T.Events.Buffer, bucket: bucket}
end
end

defp disabled?(mod) when is_atom(mod) do
get_in(Application.get_env(:t, mod), [:disabled?])
end
Expand Down
20 changes: 20 additions & 0 deletions lib/t/events.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule T.Events do
def bucket do
:t |> Application.fetch_env!(__MODULE__) |> Keyword.fetch!(:bucket)
end

# TODO
if Mix.env() == :prod do
def save_seen_timings(by_user_id, user_id, timings) do
alias T.Events.Buffer
alias NimbleCSV.RFC4180, as: CSV

row = CSV.dump_to_iodata([by_user_id, user_id, Jason.encode_to_iodata!(timings)])
GenServer.cast(Buffer, {:add, row})
end
else
def save_seen_timings(_by_user_id, _user_id, _timings) do
:ok
end
end
end
63 changes: 63 additions & 0 deletions lib/t/events/buffer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
defmodule T.Events.Buffer do
@moduledoc false
use GenServer

alias T.Events

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end

@impl true
def init(_opts) do
# TODO upload when file size reaches some threshold like 10MB
Process.send_after(self(), :upload, :timer.hours(1))
{:ok, open_buffer()}
end

@impl true
def handle_cast({:add, row}, fd) do
:file.write(fd, row)
{:noreply, fd}
end

@impl true
def handle_info(:upload, fd) do
close_buffer(fd)
to_upload = "#{:rand.uniform(1000)}.csv"
:ok = File.rename("events-buffer.csv", to_upload)
async_upload(to_upload)
{:noreply, open_buffer()}
end

defp open_buffer do
File.open!("events-buffer.csv", [:raw, :append, {:delayed_write, 512_000, 10_000}])
end

defp close_buffer(fd) do
case File.close(fd) do
:ok -> :ok
_error -> :ok = File.close(fd)
end
end

def async_upload(filename) do
Task.Supervisor.start_child(T.TaskSupervisor, fn ->
bucket = Events.bucket()

%DateTime{year: y, month: m, day: d, hour: h, minute: m, second: s} = DateTime.utc_now()
s3_key = "seen/#{y}/#{m}/#{d}/#{h}/#{m}/#{s}/" <> filename

filename
|> ExAws.S3.Upload.stream_file()
|> ExAws.S3.upload(bucket, s3_key)
|> ExAws.request!(region: "eu-north-1")
end)
end

@impl true
def terminate(_reason, fd) do
close_buffer(fd)
{:ok, fd}
end
end
12 changes: 9 additions & 3 deletions lib/t_web/channels/feed_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule TWeb.FeedChannel do
import TWeb.ChannelHelpers

alias TWeb.{FeedView, MatchView, ErrorView}
alias T.{Feeds, Calls, Matches, Accounts}
alias T.{Feeds, Calls, Matches, Accounts, Events}

@impl true
def join("feed:" <> user_id, params, socket) do
Expand Down Expand Up @@ -111,8 +111,14 @@ defmodule TWeb.FeedChannel do
end

# TODO possibly batch
def handle_in("seen", %{"user_id" => user_id}, socket) do
Feeds.mark_profile_seen(user_id, by: me_id(socket))
def handle_in("seen", %{"user_id" => user_id} = params, socket) do
me = me_id(socket)

if timings = params["timings"] do
Events.save_seen_timings(me, user_id, timings)
end

Feeds.mark_profile_seen(user_id, by: me)
{:reply, :ok, socket}
end

Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ defmodule T.MixProject do
{:cloud_watch, github: "getsince/cloud_watch", branch: "drop-httpoison"},
{:benchee, "~> 1.0", only: :bench},
{:dialyxir, "~> 1.1", only: :dev, runtime: false},
{:jose, "~> 1.11"}
{:jose, "~> 1.11"},
{:nimble_csv, "~> 1.2"}
]
end

Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"},
"mint": {:hex, :mint, "1.4.0", "cd7d2451b201fc8e4a8fd86257fb3878d9e3752899eb67b0c5b25b180bde1212", [:mix], [{:castore, "~> 0.1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "10a99e144b815cbf8522dccbc8199d15802440fc7a64d67b6853adb6fa170217"},
"mox": {:hex, :mox, "1.0.1", "b651bf0113265cda0ba3a827fcb691f848b683c373b77e7d7439910a8d754d6e", [:mix], [], "hexpm", "35bc0dea5499d18db4ef7fe4360067a59b06c74376eb6ab3bd67e6295b133469"},
"nimble_csv": {:hex, :nimble_csv, "1.2.0", "4e26385d260c61eba9d4412c71cea34421f296d5353f914afe3f2e71cce97722", [:mix], [], "hexpm", "d0628117fcc2148178b034044c55359b26966c6eaa8e2ce15777be3bbc91b12a"},
"nimble_options": {:hex, :nimble_options, "0.4.0", "c89babbab52221a24b8d1ff9e7d838be70f0d871be823165c94dd3418eea728f", [:mix], [], "hexpm", "e6701c1af326a11eea9634a3b1c62b475339ace9456c1a23ec3bc9a847bca02d"},
"nimble_pool": {:hex, :nimble_pool, "0.2.4", "1db8e9f8a53d967d595e0b32a17030cdb6c0dc4a451b8ac787bf601d3f7704c3", [:mix], [], "hexpm", "367e8071e137b787764e6a9992ccb57b276dc2282535f767a07d881951ebeac6"},
"oban": {:hex, :oban, "2.10.1", "202a90f2aed0130b7d750bdbfea8090c8321bce255bade10fd3699733565add0", [:mix], [{:ecto_sql, "~> 3.6", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "161cdd01194147cd6a3efdb1d6c3d9689309991412f799c1e242c18912e307c3"},
Expand Down

0 comments on commit 00c245d

Please sign in to comment.