/
stream.ex
165 lines (129 loc) · 4.68 KB
/
stream.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
defmodule File.Stream do
@moduledoc """
Defines a `File.Stream` struct returned by `File.stream!/3`.
The following fields are public:
* `path` - the file path
* `modes` - the file modes
* `raw` - a boolean indicating if bin functions should be used
* `line_or_bytes` - if reading should read lines or a given number of bytes
"""
defstruct path: nil, modes: [], line_or_bytes: :line, raw: true
@type t :: %__MODULE__{}
@doc false
def __build__(path, modes, line_or_bytes) do
raw = :lists.keyfind(:encoding, 1, modes) == false
modes =
case raw do
true ->
if :lists.keyfind(:read_ahead, 1, modes) == {:read_ahead, false} do
[:raw | modes]
else
[:raw, :read_ahead | modes]
end
false ->
modes
end
%File.Stream{path: path, modes: modes, raw: raw, line_or_bytes: line_or_bytes}
end
defimpl Collectable do
def into(%{path: path, modes: modes, raw: raw} = stream) do
modes = for mode <- modes, mode not in [:read], do: mode
case :file.open(path, [:write | modes]) do
{:ok, device} ->
{:ok, into(device, stream, raw)}
{:error, reason} ->
raise File.Error, reason: reason, action: "stream", path: path
end
end
defp into(device, stream, raw) do
fn
:ok, {:cont, x} ->
case raw do
true -> IO.binwrite(device, x)
false -> IO.write(device, x)
end
:ok, :done ->
# If delayed_write option is used and the last write failed will
# MatchError here as {:error, _} is returned.
:ok = :file.close(device)
stream
:ok, :halt ->
# If delayed_write option is used and the last write failed will
# MatchError here as {:error, _} is returned.
:ok = :file.close(device)
end
end
end
defimpl Enumerable do
@read_ahead_size 64 * 1024
def reduce(%{path: path, modes: modes, line_or_bytes: line_or_bytes, raw: raw}, acc, fun) do
start_fun = fn ->
case :file.open(path, read_modes(modes)) do
{:ok, device} ->
if :trim_bom in modes, do: trim_bom(device), else: device
{:error, reason} ->
raise File.Error, reason: reason, action: "stream", path: path
end
end
next_fun =
case raw do
true -> &IO.each_binstream(&1, line_or_bytes)
false -> &IO.each_stream(&1, line_or_bytes)
end
Stream.resource(start_fun, next_fun, &:file.close/1).(acc, fun)
end
def count(%{path: path, modes: modes, line_or_bytes: :line} = stream) do
pattern = :binary.compile_pattern("\n")
counter = &count_lines(&1, path, pattern, read_function(stream), 0)
case File.open(path, read_modes(modes), counter) do
{:ok, count} ->
{:ok, count}
{:error, reason} ->
raise File.Error, reason: reason, action: "stream", path: path
end
end
def count(%{path: path, line_or_bytes: bytes}) do
case File.stat(path) do
{:ok, %{size: 0}} ->
{:error, __MODULE__}
{:ok, %{size: size}} ->
{:ok, div(size, bytes) + if(rem(size, bytes) == 0, do: 0, else: 1)}
{:error, reason} ->
raise File.Error, reason: reason, action: "stream", path: path
end
end
def member?(_stream, _term) do
{:error, __MODULE__}
end
def slice(_stream) do
{:error, __MODULE__}
end
defp trim_bom(device) do
header = IO.binread(device, 4)
{:ok, _new_pos} = :file.position(device, bom_length(header))
device
end
defp bom_length(<<239, 187, 191, _rest::binary>>), do: 3
defp bom_length(<<254, 255, _rest::binary>>), do: 2
defp bom_length(<<255, 254, _rest::binary>>), do: 2
defp bom_length(<<0, 0, 254, 255, _rest::binary>>), do: 4
defp bom_length(<<254, 255, 0, 0, _rest::binary>>), do: 4
defp bom_length(_binary), do: 0
defp read_modes(modes) do
for mode <- modes, mode not in [:write, :append, :trim_bom], do: mode
end
defp count_lines(device, path, pattern, read, count) do
case read.(device) do
data when is_binary(data) ->
count_lines(device, path, pattern, read, count + count_lines(data, pattern))
:eof ->
count
{:error, reason} ->
raise File.Error, reason: reason, action: "stream", path: path
end
end
defp count_lines(data, pattern), do: length(:binary.matches(data, pattern))
defp read_function(%{raw: true}), do: &IO.binread(&1, @read_ahead_size)
defp read_function(%{raw: false}), do: &IO.read(&1, @read_ahead_size)
end
end