## Count word(naive)

In [32]:
defmodule Timemeasure do
  def count_time(func) do
    start_time = Time.utc_now()
    func.()
    end_time = Time.utc_now()
    Time.diff(end_time, start_time, :microsecond)
  end
end

{:module, Timemeasure, <<70, 79, 82, 49, 0, 0, 4, 236, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 0, 173, 0, 0, 0, 17, 18, 69, 108, 105, 120, 105, 114, 46, 84, 105, 109, 101, 109, 101, 97, 115, 117, 114, 101, 8, 95, 95, 105, ...>>, {:count_time, 1}}

Unfortunately, the implementation above is not very efficient, as Enum.flat_map/2 will build a list with all the words in the document before reducing it. If the document is, for example, 2GB, we will load 2GB of data into memory.

In [33]:
defmodule CountWord do
  def naive_version do
    File.read!("/home/demouser/Notes/shakespeare.txt")
    |> String.split("\n")
    |> Enum.flat_map(fn line ->
        String.split(line, " ")
       end)
    |> Enum.reduce(%{}, fn word, acc ->
        Map.update(acc, word, 1, & &1 + 1)
       end)
    |> Enum.to_list()
  end
end

Timemeasure.count_time(&CountWord.naive_version/0)

15164215

Now instead of loading the whole set into memory, we will only keep the current line in memory while we process it. While this allows us to process the whole data set efficiently, it does not leverage concurrency. Flow solves that:

In [18]:
a = Time.utc_now()

~T[13:23:06.153086]

In [19]:
b = Time.utc_now()

~T[13:23:06.229813]

In [20]:
Time.diff(b, a, :microsecond)

76727

In [28]:
defmodule CountWord do
  def stream_version do
File.stream!("/home/demouser/Notes/shakespeare.txt")
|> Stream.flat_map(fn line ->
    String.split(line, " ")
   end)
|> Enum.reduce(%{}, fn word, acc ->
    Map.update(acc, word, 1, & &1 + 1)
   end)
|> Enum.to_list()
end
end

{:module, CountWord, <<70, 79, 82, 49, 0, 0, 7, 100, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 1, 67, 0, 0, 0, 28, 16, 69, 108, 105, 120, 105, 114, 46, 67, 111, 117, 110, 116, 87, 111, 114, 100, 8, 95, 95, 105, 110, 102, ...>>, {:stream_version, 0}}

In [30]:
Timemeasure.count_time(&CountWord.stream_version/0)

10087096

The line_or_bytes argument configures how the file is read when streaming, by :line (default) or by a given number of bytes.

To convert from Stream to Flow, we have made two changes:

1. We have replaced the calls to `Stream` to `Flow`
2. We call `partition/1` so words are properly partitined between stages

In [34]:
File.stream!("/home/demouser/Notes/shakespeare.txt")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

UndefinedFunctionError: 1

850000

The example above will use all available cores and will keep an ongoing flow of data instead of traversing them line by line. Once all data is computed, it is sent to process which invoded `Enum.to_list/1`