-
Notifications
You must be signed in to change notification settings - Fork 5
/
inet_cidr.ex
278 lines (238 loc) · 8.98 KB
/
inet_cidr.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
defmodule InetCidr do
@external_resource "README.md"
@moduledoc "README.md"
|> File.read!()
|> String.split("<!-- MDOC !-->")
|> Enum.fetch!(1)
import Bitwise
@doc """
Parses a string containing either an IPv4 or IPv6 CIDR block using the
notation like `192.168.0.0/16` or `2001:abcd::/32`. It returns a tuple with the
start address, end address and cidr length.
You can optionally pass true as the second argument to adjust the start `IP`
address if it is not consistent with the cidr length.
For example, `192.168.0.0/0` would be adjusted to have a start IP of `0.0.0.0`
instead of `192.168.0.0`. The default behavior is to be more strict and raise
an exception when this occurs.
"""
@deprecated "Use `parse_cidr!/2` instead (or `parse_cidr/2` for {:ok, {start,end,prefix}} / {:error,msg} tuples)"
def parse(cidr_string, adjust \\ false) do
{start_address, prefix_length} = parse_cidr_block!(cidr_string, adjust)
end_address = calc_end_address!(start_address, prefix_length)
{start_address, end_address, prefix_length}
end
@doc since: "1.0.6"
@doc """
Parses a string containing either an IPv4 or IPv6 CIDR block using the
notation like `192.168.0.0/16` or `2001:abcd::/32`.
You can optionally pass true as the second argument to adjust the start `IP`
address if it is not consistent with the cidr length.
For example, `192.168.0.0/0` would be adjusted to have a start IP of `0.0.0.0`
instead of `192.168.0.0`.
It returns an `{:ok, {start address, end address, cidr length}}` tuple if the string contains a valid IP address.
It returns an `{:error, reason}` tuple if the it cannot be parsed.
"""
def parse_cidr(cidr_string, adjust \\ false) do
try do
{:ok, parse_cidr!(cidr_string, adjust)}
rescue
e ->
# do NOT double wrap error tuples
case e do
{:error, err} -> {:error, err}
err -> {:error, err}
end
end
end
@doc since: "1.0.6"
@doc """
Parses a string containing either an IPv4 or IPv6 CIDR block using the
notation like `192.168.0.0/16` or `2001:abcd::/32`. It returns a tuple with the
start address, end address and cidr length.
You can optionally pass true as the second argument to adjust the start `IP`
address if it is not consistent with the cidr length.
For example, `192.168.0.0/0` would be adjusted to have a start IP of `0.0.0.0`
instead of `192.168.0.0`. The default behavior is to be more strict and raise
an exception when this occurs.
"""
def parse_cidr!(cidr_string, adjust \\ false) do
parse(cidr_string, adjust)
end
@doc since: "1.0.6"
@doc """
Convenience function that takes an IPv4 or IPv6 address as a string and
returns the address.
It returns an `{:ok, address}` tuple if the string contains a valid IP address.
It returns an `{:error, reason}` tuple if the string
does not contain a valid IP address.
"""
def parse_address(prefix) do
try do
{:ok, parse_address!(prefix)}
rescue
e ->
# do NOT double wrap error tuples
case e do
{:error, err} -> {:error, err}
err -> {:error, err}
end
end
end
@doc since: "1.0.6"
@doc """
Convenience function that takes an IPv4 or IPv6 address as a string and
returns the address. It raises an exception if the string does not contain
a valid IP address.
"""
def parse_address!(prefix) do
case prefix |> String.to_charlist() |> :inet.parse_address() do
{:ok, start_address} -> start_address
{:error, _} -> raise "Invalid address: #{prefix}"
end
end
@doc """
Prints the CIDR block to a string such that it can be parsed back to a CIDR
block by this module.
"""
def to_string({start_address, _end_address, cidr_length}) do
"#{:inet.ntoa(start_address)}/#{cidr_length}"
end
@doc """
The number of IP addresses included in the CIDR block.
"""
def address_count(ip, len) do
1 <<< (bit_count(ip) - len)
end
@doc """
The number of bits in the address family (32 for IPv4 and 128 for IPv6)
"""
def bit_count({_, _, _, _}), do: 32
def bit_count({_, _, _, _, _, _, _, _}), do: 128
@doc """
Returns true if the CIDR block contains the IP address, false otherwise.
"""
def contains?({{a, b, c, d}, {e, f, g, h}, _prefix_length}, {w, x, y, z}) do
w in a..e and
x in b..f and
y in c..g and
z in d..h
end
def contains?(
{{a, b, c, d, e, f, g, h}, {i, j, k, l, m, n, o, p}, _prefix_length},
{r, s, t, u, v, w, x, y}
) do
r in a..i and
s in b..j and
t in c..k and
u in d..l and
v in e..m and
w in f..n and
x in g..o and
y in h..p
end
def contains?(_, _), do: false
@doc """
Returns true if the value passed in is an IPv4 address, false otherwise.
"""
def v4?({a, b, c, d}) when a in 0..255 and b in 0..255 and c in 0..255 and d in 0..255, do: true
def v4?(_), do: false
@doc """
Returns true if the value passed in is an IPv6 address, false otherwise.
"""
def v6?({a, b, c, d, e, f, g, h})
when a in 0..65535 and b in 0..65535 and c in 0..65535 and d in 0..65535 and e in 0..65535 and
f in 0..65535 and g in 0..65535 and h in 0..65535,
do: true
def v6?(_), do: false
# internal functions
defp parse_cidr_block!(cidr_string, adjust) do
[prefix, prefix_length_str] = String.split(cidr_string, "/", parts: 2)
start_address = parse_address!(prefix)
{prefix_length, _} = Integer.parse(prefix_length_str)
# if something 'nonsensical' is passed in like 192.168.0.0/0
# we have three choices:
# a) leave it alone (we do NOT allow this)
# b) adjust the start ip (to 0.0.0.0 in this case) - when adjust == true
# c) raise an exception - when adjust != true
masked = band_with_mask(start_address, start_mask(start_address, prefix_length))
if not adjust and masked != start_address do
raise "Invalid CIDR: #{cidr_string}"
end
{masked, prefix_length}
end
@doc since: "1.0.7"
@doc """
Calculates the end of a CIDR block given the start address (tuple) and prefix length.
Returns an `{:ok, end_address}` tuple if the start address and prefix length are valid.
Returns {:error, reason} if the start address or prefix length are invalid.
"""
def calc_end_address(start_address, prefix_length) do
try do
{:ok, calc_end_address!(start_address, prefix_length)}
rescue
e ->
# do NOT double wrap error tuples
case e do
{:error, err} -> {:error, err}
err -> {:error, err}
end
end
end
@doc since: "1.0.7"
@doc """
Calculates the end of a CIDR block given the start address (tuple) and prefix length.
Assumes valid start address and prefix length. Raises an exception if either is invalid.
"""
def calc_end_address!(start_address, prefix_length) do
bor_with_mask(start_address, end_mask(start_address, prefix_length))
end
defp start_mask(s = {_, _, _, _}, len) when len in 0..32 do
{a, b, c, d} = end_mask(s, len)
{bnot(a), bnot(b), bnot(c), bnot(d)}
end
defp start_mask(s = {_, _, _, _, _, _, _, _}, len) when len in 0..128 do
{a, b, c, d, e, f, g, h} = end_mask(s, len)
{bnot(a), bnot(b), bnot(c), bnot(d), bnot(e), bnot(f), bnot(g), bnot(h)}
end
defp end_mask({_, _, _, _}, len) when len in 0..32 do
cond do
len == 32 -> {0, 0, 0, 0}
len >= 24 -> {0, 0, 0, bmask(len, 8)}
len >= 16 -> {0, 0, bmask(len, 8), 0xFF}
len >= 8 -> {0, bmask(len, 8), 0xFF, 0xFF}
len >= 0 -> {bmask(len, 8), 0xFF, 0xFF, 0xFF}
end
end
defp end_mask({_, _, _, _, _, _, _, _}, len) when len in 0..128 do
cond do
len == 128 -> {0, 0, 0, 0, 0, 0, 0, 0}
len >= 112 -> {0, 0, 0, 0, 0, 0, 0, bmask(len, 16)}
len >= 96 -> {0, 0, 0, 0, 0, 0, bmask(len, 16), 0xFFFF}
len >= 80 -> {0, 0, 0, 0, 0, bmask(len, 16), 0xFFFF, 0xFFFF}
len >= 64 -> {0, 0, 0, 0, bmask(len, 16), 0xFFFF, 0xFFFF, 0xFFFF}
len >= 48 -> {0, 0, 0, bmask(len, 16), 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF}
len >= 32 -> {0, 0, bmask(len, 16), 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF}
len >= 16 -> {0, bmask(len, 16), 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF}
len >= 0 -> {bmask(len, 16), 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF, 0xFFFF}
end
end
defp bmask(i, 8) when i in 0..32 do
0xFF >>> rem(i, 8)
end
defp bmask(i, 16) when i in 0..128 do
0xFFFF >>> rem(i, 16)
end
defp bor_with_mask({a, b, c, d}, {e, f, g, h}) do
{bor(a, e), bor(b, f), bor(c, g), bor(d, h)}
end
defp bor_with_mask({a, b, c, d, e, f, g, h}, {i, j, k, l, m, n, o, p}) do
{bor(a, i), bor(b, j), bor(c, k), bor(d, l), bor(e, m), bor(f, n), bor(g, o), bor(h, p)}
end
defp band_with_mask({a, b, c, d}, {e, f, g, h}) do
{band(a, e), band(b, f), band(c, g), band(d, h)}
end
defp band_with_mask({a, b, c, d, e, f, g, h}, {i, j, k, l, m, n, o, p}) do
{band(a, i), band(b, j), band(c, k), band(d, l), band(e, m), band(f, n), band(g, o),
band(h, p)}
end
end