-
Notifications
You must be signed in to change notification settings - Fork 6
/
migration.ex
1059 lines (844 loc) · 35.2 KB
/
migration.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
defmodule EctoTablestore.Migration do
@moduledoc """
Migrations are used to create your tables.
Support the partition key is autoincrementing based on this library's wrapper, for this use
case, we can use the migration to automatically create an another separated table to generate
the serial value when `:insert` (viz `ExAliyunOts.put_row/5`) or `:batch_write` (viz
`ExAliyunOts.batch_write/3`) with `:put` option.
In practice, we don't create migration files by hand either, we typically use `mix
ecto.ots.gen.migration` to generate the file with the proper timestamp and then we just fill in
its contents:
$ mix ecto.ots.gen.migration create_posts_table
And then we can fill the table definition details:
defmodule EctoTablestore.TestRepo.Migrations.CreatePostsTable do
use EctoTablestore.Migration
def change do
create table("ecto_ots_test_posts") do
add :post_id, :integer, partition_key: true, auto_increment: true
end
end
end
After we filled the above migration content, you can run the migration above by going to the
root of your project and typing:
$ mix ecto.ots.migrate
Finally, we successfully create the "ecto_ots_test_posts" table, since the above definition
added an autoincrementing column for the partition key, there will automatically create an
"ecto_ots_test_posts_seq" table to generate a serial integer for `:post_id` field when insert a
new record.
"""
require ExAliyunOts.Const.PKType, as: PKType
require Logger
alias EctoTablestore.{Sequence, Migration.Runner}
alias Ecto.MigrationError
alias ExAliyunOts.Var.Search
defmodule Table do
@moduledoc false
defstruct name: nil, prefix: nil, partition_key: true, meta: []
@type t :: %__MODULE__{
name: String.t(),
prefix: String.t() | nil,
partition_key: boolean(),
meta: Keyword.t()
}
end
defmodule SecondaryIndex do
@moduledoc false
defstruct table_name: nil,
index_name: nil,
prefix: nil,
include_base_data: true,
index_type: :global
@type t :: %__MODULE__{
table_name: String.t(),
index_name: String.t(),
prefix: String.t() | nil,
include_base_data: boolean(),
index_type: :global | :local
}
end
defmodule SearchIndex do
@moduledoc false
defstruct table_name: nil, index_name: nil, prefix: nil
@type t :: %__MODULE__{
table_name: String.t(),
index_name: String.t(),
prefix: String.t() | nil
}
end
@doc false
defmacro __using__(_) do
quote location: :keep do
import EctoTablestore.Migration,
only: [
table: 1,
table: 2,
secondary_index: 2,
secondary_index: 3,
search_index: 2,
search_index: 3,
create: 2,
drop: 1,
drop_if_exists: 1,
add: 2,
add: 3,
add_pk: 1,
add_pk: 2,
add_pk: 3,
add_column: 1,
add_column: 2,
add_index: 3,
add_index: 4
]
import ExAliyunOts.Search
def __migration__, do: :ok
end
end
@doc """
Returns a table struct that can be given to `create/2`.
Since Tablestore is a NoSQL service, there are up to 4 primary key(s) can be added when
creation, the first added key is partition key when set `partition_key` option as false.
## Examples
create table("products") do
add :name, :string
add :price, :integer
end
create table("products", partition_key: false) do
add :name, :string
add :price, :integer
end
## Options
* `:partition_key` - as `true` by default, and there will add an `:id` field as partition key
with type as a large autoincrementing integer (as `bigserial`), Tablestore does not support
`bigserial` type for primary keys, but can use the `ex_aliyun_ots` lib's wrapper - Sequence to
implement it; when `false`, a partition key field is not generated on table creation.
* `:prefix` - the prefix for the table.
* `:meta` - define the meta information when create table, can see Tablestore's document for details:
* `:reserved_throughput_write` - reserve the throughput for write when create table, an
integer, the default value is 0;
* `:reserved_throughput_read` - reserve the throughput for read when create table, an integer,
the default value is 0;
* `:time_to_live` - the survival time of the saved data, a.k.a TTL; an integer, unit as second,
the default value is -1 (permanent preservation);
* `:deviation_cell_version_in_sec` - maximum version deviation, the default value is 86400
seconds, which is 1 day;
* `stream_spec` - set the stream specification of Tablestore:
- `is_enabled`, open or close stream
- `expiration_time`, the expiration time of the table's stream
* `enable_local_txn` - specifies whether to enable the local transaction feature.
The value of this parameter is of the `:boolean` type. Default value: false.
If you want to enable the local transaction feature when you create a data table, set this parameter to true.
"""
def table(name, opts \\ [])
def table(name, opts) when is_atom(name) do
table(Atom.to_string(name), opts)
end
def table(name, opts) when is_binary(name) and is_list(opts) do
struct(%Table{name: name}, opts)
end
@doc """
Returns a secondary index struct that can be given to `create/2`.
For more information see the [Chinese Docs](https://help.aliyun.com/document_detail/91947.html) | [English Docs](https://www.alibabacloud.com/help/doc-detail/91947.html)
## Examples
create secondary_index("posts", "posts_owner") do
add_pk(:owner_id)
add_pk(:id)
add_column(:title)
add_column(:content)
end
create secondary_index("posts", "posts_title", index_type: :global) do
add_pk(:title)
add_pk(:id)
add_column(:content)
end
## Options
* `:include_base_data`, specifies whether the index table includes the existing data in the base table, if set it to
`true` means the index includes the existing data, if set it to `false` means the index excludes the existing data,
optional, by default it is `true`.
* `:index_type`, the type of the index, optional. Valid values: `:global` and `:local`. By default it is `:global`.
- If `:index_type` is not specified or is set to `:global`, the global secondary index feature is used.
If you use the global secondary index feature, Tablestore automatically synchronizes the columns to be indexed and data in primary key columns from a data table to an index table in asynchronous mode.
The synchronization latency is within a few milliseconds.
- If `:index_type` is set to `:local`, the local secondary index feature is used.
If you use the local secondary index feature, Tablestore automatically synchronizes data from the indexed columns and the primary key columns of a data table to the columns of an index table in synchronous mode.
After the data is written to the data table, you can query the data from the index table.
"""
def secondary_index(table_name, index_name, opts \\ [])
when is_binary(table_name) and is_binary(index_name) and is_list(opts) do
struct(%SecondaryIndex{table_name: table_name, index_name: index_name}, opts)
end
@doc """
Returns a search index struct that can be given to `create/2`.
For more information see the [Chinese Docs](https://help.aliyun.com/document_detail/117452.html) | [English Docs](https://www.alibabacloud.com/help/doc-detail/117452.html)
## Examples
create search_index("posts", "posts_owner") do
field_schema_keyword("title")
field_schema_keyword("content")
field_sort("title")
end
Please refer the [link](https://hexdocs.pm/ex_aliyun_ots/ExAliyunOts.Search.html#define-field-schema) to the available field schema definition options,
and the [link](https://hexdocs.pm/ex_aliyun_ots/ExAliyunOts.Search.html#sort) to the available sort options.
"""
def search_index(table_name, index_name, opts \\ [])
when is_binary(table_name) and is_binary(index_name) and is_list(opts) do
struct(%SearchIndex{table_name: table_name, index_name: index_name}, opts)
end
@doc """
Adds a primary key when creating a secondary index.
"""
defmacro add_pk(column) when is_binary(column), do: quote(do: {:pk, column})
defmacro add_pk(column) when is_atom(column),
do: quote(do: {:pk, unquote(Atom.to_string(column))})
defmacro add_pk(column) do
raise ArgumentError,
"error type when defining pk column: #{inspect(column)} for secondary_index, only supported one of type: [:binary, :atom]"
end
@doc """
Adds a column what already be pre-defined column when creating a secondary index.
"""
defmacro add_column(column) when is_binary(column), do: quote(do: {:column, column})
defmacro add_column(column) when is_atom(column),
do: quote(do: {:column, unquote(Atom.to_string(column))})
defmacro add_column(column) do
raise ArgumentError,
"error type when defining pre-defined column: #{inspect(column)} for secondary_index, only supported one of type: [:binary, :atom]"
end
@doc """
Define the primary key(s) of the table to create.
By default, the table will also include an `:id` primary key field (it is also partition key)
that has a type of `:integer` which is an autoincrementing column. Check the `table/2` docs for
more information.
There are up to 4 primary key(s) can be added when creation.
## Example
create table("posts") do
add :title, :string
end
# The above is equivalent to
create table("posts") do
add :id, :integer, partition_key: true, auto_increment: true
add :title, :string
end
"""
defmacro create(object, do: block), do: expand_create(object, block, __CALLER__)
defp expand_create(object, block, caller) do
columns_str = Macro.to_string(block)
columns =
Macro.prewalk(block, &Macro.expand(&1, caller))
|> unwrap_block()
|> List.wrap()
|> List.flatten()
quote do
map =
unquote(__MODULE__).__create__(unquote(object), unquote(columns))
|> Map.put(:columns_str, unquote(columns_str))
Runner.push_command(&unquote(__MODULE__).do_create(&1, map))
end
end
defp unwrap_block({:__block__, _, columns}) when is_list(columns) do
Enum.map(columns, &unwrap_block/1)
end
defp unwrap_block(block), do: block
def __create__(%Table{} = table, columns) do
{index_metas, columns} = Enum.split_with(columns, &is_tuple(&1))
%{primary_key: pk_columns, pre_defined_column: pre_defined_columns} =
Map.merge(
%{primary_key: [], pre_defined_column: []},
Enum.group_by(columns, & &1.column_type)
)
partition_key_count = Enum.count(pk_columns, & &1.partition_key)
pk_columns =
cond do
partition_key_count == 1 ->
pk_columns
# Make the partition key as `:id` and in an increment integer sequence
partition_key_count == 0 and table.partition_key ->
opts = Runner.repo_config(:migration_primary_key, [])
{name, opts} = Keyword.pop(opts, :name, "id")
{type, _opts} = Keyword.pop(opts, :type, :integer)
[
%{
name: name,
type: type,
column_type: :primary_key,
partition_key: true,
auto_increment: true
}
| pk_columns
]
# No partition key defined
partition_key_count == 0 ->
raise MigrationError,
"Please define at least one partition primary keys for table: #{table.name}"
# The partition key only can define one
true ->
raise MigrationError,
"The maximum number of partition primary keys is 1, now is #{partition_key_count} defined on " <>
"table: #{table.name} columns:\n" <> inspect(pk_columns)
end
case Enum.count(pk_columns) do
# The number of primary keys can not be more than 4
pk_count when pk_count > 4 ->
raise MigrationError,
"The maximum number of primary keys is 4, now is #{pk_count} defined on " <>
"table: #{table.name} columns:\n" <> inspect(pk_columns)
# Only support to define one primary key as auto_increment integer
_pk_count ->
%{hashids: hashids_count, auto_increment: auto_increment_count} =
Enum.reduce(pk_columns, %{hashids: 0, auto_increment: 0, none: 0}, fn
%{type: :hashids}, acc -> Map.update!(acc, :hashids, &(&1 + 1))
%{auto_increment: true}, acc -> Map.update!(acc, :auto_increment, &(&1 + 1))
_, acc -> Map.update!(acc, :none, &(&1 + 1))
end)
if (total_increment_count = auto_increment_count + hashids_count) > 1 do
raise MigrationError,
"The maximum number of [auto_increment & hashids] primary keys is 1, " <>
"but now find #{total_increment_count} primary keys defined on table: #{table.name}"
else
%{
table: table,
pk_columns: pk_columns,
pre_defined_columns: pre_defined_columns,
index_metas: index_metas,
create_seq_table?: auto_increment_count > 0 or hashids_count > 0
}
end
end
end
def __create__(%SecondaryIndex{} = secondary_index, columns) do
g_columns = Enum.group_by(columns, &elem(&1, 0), &elem(&1, 1))
case Map.keys(g_columns) -- [:pk, :column] do
[] ->
:ok
[missing] ->
raise MigrationError,
"Missing #{missing} definition when creating: #{inspect(secondary_index)}, please use add_#{missing}/1 when creating secondary index."
_ ->
raise MigrationError,
"Missing pk & column definition when creating: #{inspect(secondary_index)}, please use add_pk/1 and add_column/1 when creating secondary index."
end
%{
secondary_index: secondary_index,
primary_keys: g_columns.pk,
defined_columns: g_columns.column
}
end
def __create__(%SearchIndex{} = search_index, columns) do
group_key = fn column ->
if column.__struct__ in [
Search.PrimaryKeySort,
Search.FieldSort,
Search.GeoDistanceSort,
Search.ScoreSort
] do
:index_sorts
else
:field_schemas
end
end
g_columns = Enum.group_by(columns, group_key)
unless Map.get(g_columns, :field_schemas) do
raise MigrationError,
"Missing field_schemas definition when creating: #{inspect(search_index)}, please use field_schema_* functions when creating search index."
end
%{
search_index: search_index,
field_schemas: g_columns.field_schemas,
index_sorts: g_columns[:index_sorts] || []
}
end
@doc false
# create table
def do_create(repo, %{
table: table,
pk_columns: pk_columns,
pre_defined_columns: pre_defined_columns,
index_metas: index_metas,
create_seq_table?: create_seq_table?
}) do
table_name = get_table_name(table, repo.config())
table_name_str = IO.ANSI.format([:green, table_name, :reset])
repo_meta = Ecto.Adapter.lookup_meta(repo)
instance = repo_meta.instance
primary_keys = Enum.map(pk_columns, &transform_table_column/1)
defined_columns = Enum.map(pre_defined_columns, &transform_table_column/1)
print_description =
[
primary_keys: primary_keys,
defined_columns: defined_columns,
index_metas: index_metas
]
|> Enum.reject(&match?({_, []}, &1))
|> inspect(pretty: true, limit: :infinity)
Logger.info(">> creating table: #{table_name_str} by #{print_description}")
options =
Keyword.merge(table.meta,
max_versions: 1,
defined_columns: defined_columns,
index_metas: index_metas
)
case ExAliyunOts.create_table(instance, table_name, primary_keys, options) do
:ok ->
result_str = IO.ANSI.format([:green, "ok", :reset])
Logger.info(">>>> create table: #{table_name_str} result: #{result_str}")
create_seq_table!(create_seq_table?, table_name, instance)
:ok
{:error, error} ->
raise MigrationError, "create table: #{table_name} error: #{error.message}"
end
end
# create secondary_index
def do_create(repo, %{
secondary_index: secondary_index,
primary_keys: primary_keys,
defined_columns: defined_columns
}) do
{table_name, index_name} = get_index_name(secondary_index, repo.config())
table_name_str = IO.ANSI.format([:green, table_name, :reset])
index_name_str = IO.ANSI.format([:green, index_name, :reset])
index_type = secondary_index.index_type
include_base_data = secondary_index.include_base_data
repo_meta = Ecto.Adapter.lookup_meta(repo)
print_description =
inspect(
[
primary_keys: primary_keys,
defined_columns: defined_columns,
include_base_data: include_base_data
],
pretty: true,
limit: :infinity
)
Logger.info(
">> creating #{to_string(index_type)}_secondary_index: #{index_name_str} for table: #{table_name_str} by #{print_description}"
)
case ExAliyunOts.create_index(
repo_meta.instance,
table_name,
index_name,
primary_keys,
defined_columns,
index_type: index_type,
include_base_data: include_base_data
) do
:ok ->
result_str = IO.ANSI.format([:green, "ok", :reset])
Logger.info(
">>>> create #{to_string(index_type)}_secondary_index: #{index_name_str} for table: #{table_name_str} result: #{result_str}"
)
:ok
{:error, error} ->
raise MigrationError,
"create #{to_string(index_type)}_secondary index: #{index_name} for table: #{table_name} error: #{error.message}"
end
end
# create search_index
def do_create(repo, %{
search_index: search_index,
field_schemas: field_schemas,
index_sorts: index_sorts,
columns_str: columns_str
}) do
{table_name, index_name} = get_index_name(search_index, repo.config())
table_name_str = IO.ANSI.format([:green, table_name, :reset])
index_name_str = IO.ANSI.format([:green, index_name, :reset])
repo_meta = Ecto.Adapter.lookup_meta(repo)
Logger.info(
">> creating search index: #{index_name_str} for table: #{table_name_str} by defines:\n#{columns_str}"
)
case ExAliyunOts.create_search_index(
repo_meta.instance,
table_name,
index_name,
field_schemas: field_schemas,
index_sorts: index_sorts
) do
{:ok, _} ->
result_str = IO.ANSI.format([:green, "ok", :reset])
Logger.info(
">>>> create search index: #{index_name_str} for table: #{table_name_str} result: #{result_str}"
)
:ok
{:error, error} ->
raise MigrationError,
"create search index: #{index_name} for table: #{table_name} error: #{error.message}"
end
end
defp create_seq_table!(false, _table_name, _instance),
do: :ignore
defp create_seq_table!(true, table_name, instance) do
seq_table_name = Sequence.default_table()
sequence = %ExAliyunOts.Var.NewSequence{name: seq_table_name}
# check if not exists
with {:list_table, {:ok, %{table_names: table_names}}} <-
{:list_table, ExAliyunOts.list_table(instance)},
true <- seq_table_name not in table_names,
:ok <- ExAliyunOts.Sequence.create(instance, sequence) do
Logger.info(">> auto create table: #{seq_table_name} for table: #{table_name}")
:ok
else
{:list_table, {:error, error}} ->
raise MigrationError, "list_table error: #{error.message}"
{:error, error} ->
raise MigrationError, "create table: #{seq_table_name} error: #{error.message}"
false ->
:already_exists
end
end
@doc false
defp get_table_name(%{prefix: prefix, name: name}, repo_config) do
prefix = prefix || Keyword.get(repo_config, :migration_default_prefix)
if prefix do
prefix <> name
else
name
end
end
defp get_index_name(
%{prefix: prefix, table_name: table_name, index_name: index_name},
repo_config
) do
prefix = prefix || Keyword.get(repo_config, :migration_default_prefix)
if prefix do
{prefix <> table_name, prefix <> index_name}
else
{table_name, index_name}
end
end
defp transform_table_column(%{column_type: :pre_defined_column, name: field_name, type: type}) do
{field_name, type}
end
defp transform_table_column(%{
column_type: :primary_key,
name: field_name,
type: type,
partition_key: partition_key?,
auto_increment: auto_increment?
}) do
case type do
:integer when auto_increment? and not partition_key? ->
{field_name, PKType.integer(), PKType.auto_increment()}
_ ->
type_mapping = %{
hashids: PKType.string(),
integer: PKType.integer(),
string: PKType.string(),
binary: PKType.binary()
}
{field_name, type_mapping[type]}
end
end
@doc """
Adds a primary key when creating a table.
This function only accepts types as `:string` | `:binary` | `:integer` | `:hashids` | `:id`.
About `:auto_increment` option:
* set `:auto_increment` as `true` and its field is primary key of non-partitioned key, there
will use Tablestore's auto-increment column to process it.
* set `:auto_increment` as `true` and its field is partition key, there will use
`ex_aliyun_ots`'s built-in Sequence function, the actual principle behind it is to use the
atomic update operation though another separate table when generate serial integer, by default
there will add an `:id` partition key as `:integer` type, the initial value of the sequence is
0, and the increment step is 1.
Tablestore can only have up to 4 primary keys, meanwhile the first defined primary key is the
partition key, Please know that the order of the primary key definition will be directly mapped
to the created table.
About `:hashids` type to define the partition key:
* set `partition_key` as `true` is required.
* set `auto_increment` as `true` is required.
## Examples
The auto generated serial integer for partition key:
create table("posts") do
add :title, :string
end
# The above is equivalent to
create table("posts", partition_key: false) do
add :id, :integer, partition_key: true, auto_increment: true
add :title, :string
end
The explicitly defined field with `partition_key`:
create table("posts") do
add :title, :string
end
# The above is equivalent to
create table("posts") do
add :id, :integer, partition_key: true, auto_increment: true
add :title, :string
end
The `:auto_increment` integer for primary key of non-partitioned key:
create table("posts") do
add :tag, :integer, auto_increment: true
end
# The above is equivalent to
create table("posts", partition_key: false) do
add :id, :integer, partition_key: true, auto_increment: true
add :version, :integer, auto_increment: true
end
The `:hashids` type for the partition key with the built-in sequence feature:
create table("posts") do
add :id, :hashids, auto_increment: true, partition_key: true
end
The `:id` type for the partition key with the built-in sequence feature:
create table("posts") do
add :id, :id
end
# The above is equivalent to
create table("posts", partition_key: false) do
add :id, :integer, partition_key: true, auto_increment: true
end
## Options
* `:partition_key` - when `true`, marks this field as the partition key, only the first
explicitly defined field is available for this option.
* `:auto_increment` - when `true` and this field is non-partitioned key, Tablestore
automatically generates the primary key value, which is unique in the partition key, and which
increases progressively, when `true` and this field is a partition key, use `ex_aliyun_ots`'s
Sequence to build a serial number for this field, the `auto_increment: true` option only
allows binding of one primary key.
"""
defmacro add(column, type, opts \\ []), do: _add_pk(column, type, opts)
@doc """
Adds a primary key when creating a table.
Same as `add/2`, see `add/2` for more information.
"""
defmacro add_pk(column, type, opts \\ []), do: _add_pk(column, type, opts)
defp _add_pk(column, type, opts)
when (is_atom(column) or is_binary(column)) and is_list(opts) do
validate_pk_type!(column, type)
if type == :id do
quote location: :keep do
%{
name: unquote(to_string(column)),
type: :integer,
column_type: :primary_key,
partition_key: true,
auto_increment: true
}
end
else
quote location: :keep do
%{
name: unquote(to_string(column)),
type: unquote(type),
column_type: :primary_key,
partition_key: Keyword.get(unquote(opts), :partition_key, false),
auto_increment: Keyword.get(unquote(opts), :auto_increment, false)
}
end
end
end
defp validate_pk_type!(column, type) do
# more information can be found in the [documentation](https://help.aliyun.com/document_detail/106536.html)
if type in [:integer, :string, :binary, :hashids, :id] do
:ok
else
raise ArgumentError,
"#{inspect(type)} is not a valid primary key type for column: `#{inspect(column)}`, " <>
"please use an atom as :integer | :string | :binary | :hashids | :id."
end
end
@doc """
Adds a pre-defined column when creating a table.
This function only accepts types as `:integer` | `:double` | `:boolean` | `:string` | `:binary`.
For more information see the [Chinese Docs](https://help.aliyun.com/document_detail/91947.html) | [English Docs](https://www.alibabacloud.com/help/doc-detail/91947.html)
## Examples
create table("posts") do
add_pk(:id, :integer, partition_key: true)
add_pk(:owner_id, :string)
add_column(:title, :string)
add_column(:content, :string)
end
"""
defmacro add_column(column, type), do: _add_column(column, type)
defp _add_column(column, type) when is_atom(column) or is_binary(column) do
validate_pre_defined_col_type!(column, type)
quote location: :keep do
%{
name: unquote(to_string(column)),
type: unquote(type),
column_type: :pre_defined_column
}
end
end
defp validate_pre_defined_col_type!(column, type) do
# more information can be found in the [documentation](https://help.aliyun.com/document_detail/106536.html)
if type in [:integer, :double, :boolean, :string, :binary] do
:ok
else
raise ArgumentError,
"#{inspect(type)} is not a valid pre-defined column type for column: `#{inspect(column)}`, " <>
"please use an atom as :integer | :double | :boolean | :string | :binary ."
end
end
@doc """
Adds a secondary index when creating a table.
For more information see the [Chinese Docs](https://help.aliyun.com/document_detail/91947.html) | [English Docs](https://www.alibabacloud.com/help/doc-detail/91947.html)
## Examples
create table("posts") do
add_pk(:id, :integer, partition_key: true)
add_pk(:owner_id, :string)
add_column(:title, :string)
add_column(:content, :string)
add_index("posts_owner", [:owner_id, :id], [:title, :content])
add_index("posts_title", [:title, :id], [:content], type: :global)
end
## Options
* `:index_type`, the type of the index, optional. Valid values: `:global` and `:local`. By default it is `:global`.
- If `:index_type` is not specified or is set to `:global`, the global secondary index feature is used.
If you use the global secondary index feature, Tablestore automatically synchronizes the columns to be indexed and data in primary key columns from a data table to an index table in asynchronous mode.
The synchronization latency is within a few milliseconds.
- If `:index_type` is set to `:local`, the local secondary index feature is used.
If you use the local secondary index feature, Tablestore automatically synchronizes data from the indexed columns and the primary key columns of a data table to the columns of an index table in synchronous mode.
After the data is written to the data table, you can query the data from the index table.
"""
defmacro add_index(index_name, primary_keys, defined_columns, options \\ [])
when is_binary(index_name) and is_list(primary_keys) and is_list(defined_columns) and
is_list(options) do
check_and_transform_columns = fn columns ->
columns
|> Macro.prewalk(&Macro.expand(&1, __CALLER__))
|> Enum.map(fn
column when is_binary(column) ->
column
column when is_atom(column) ->
Atom.to_string(column)
column ->
raise ArgumentError,
"error type when defining column: #{inspect(column)} for add_index: #{index_name}, " <>
"only supported one of type: [:binary, :atom]"
end)
end
quote location: :keep do
{
unquote(index_name),
unquote(check_and_transform_columns.(primary_keys)),
unquote(check_and_transform_columns.(defined_columns)),
unquote(options)
}
end
end
@doc """
Drops a table or index if it exists.
Does not raise an error if the specified table or index does not exist.
## Examples
drop_if_exists table("posts")
drop_if_exists secondary_index("posts", "posts_owner")
drop_if_exists search_index("posts", "posts_index")
"""
def drop_if_exists(obj), do: drop(obj, true)
@doc """
Drops one of the following:
* a table
* a secondary index
* a search index
## Examples
drop table("posts")
drop secondary_index("posts", "posts_owner")
drop search_index("posts", "posts_index")
"""
def drop(obj), do: drop(obj, false)
def drop(%Table{} = table, if_exists) do
Runner.push_command(fn repo ->
table_name = get_table_name(table, repo.config())
table_name_str = IO.ANSI.format([:green, table_name, :reset])
repo_meta = Ecto.Adapter.lookup_meta(repo)
instance = repo_meta.instance
Logger.info(">> dropping table: #{table_name_str}")
case ExAliyunOts.delete_table(instance, table_name) do
:ok ->
result_str = IO.ANSI.format([:green, "ok", :reset])
Logger.info(">>>> dropping table: #{table_name_str} result: #{result_str}")
:ok
{:error, %{code: "OTSObjectNotExist"}} when if_exists ->
result_str = IO.ANSI.format([:green, "not exists", :reset])
Logger.info(">>>> dropping table: #{table_name_str} result: #{result_str}")
:ok
{:error, error} ->
raise MigrationError, "dropping table: #{table_name} error: #{error.message}"
end
end)
end
def drop(%SecondaryIndex{} = secondary_index, if_exists) do
Runner.push_command(fn repo ->
{table_name, index_name} = get_index_name(secondary_index, repo.config())
table_name_str = IO.ANSI.format([:green, table_name, :reset])
index_name_str = IO.ANSI.format([:green, index_name, :reset])
repo_meta = Ecto.Adapter.lookup_meta(repo)
Logger.info(
">> dropping secondary_index table: #{table_name_str}, index: #{index_name_str}"
)
case ExAliyunOts.delete_index(repo_meta.instance, table_name, index_name) do
:ok ->
result_str = IO.ANSI.format([:green, "ok", :reset])
Logger.info(
">>>> dropping secondary_index table: #{table_name_str}, index: #{index_name_str} result: #{result_str}"
)
:ok
{:error, %{code: "OTSObjectNotExist"}} when if_exists ->
result_str = IO.ANSI.format([:green, "not exists", :reset])
Logger.info(
">>>> dropping secondary_index table: #{table_name_str}, index: #{index_name_str} result: #{result_str}"
)
:ok
{:error, %{code: "OTSParameterInvalid", message: "Index does not exist" <> _}}
when if_exists ->
result_str = IO.ANSI.format([:green, "not exists", :reset])
Logger.info(
">>>> dropping secondary_index table: #{table_name_str}, index: #{index_name_str} result: #{result_str}"
)
:ok
{:error,
%{
code: "OTSParameterInvalid",
message:
<<"table [", ^table_name::binary-size(byte_size(table_name)), "] does not exist",
_::binary>>
}}
when if_exists ->
result_str = IO.ANSI.format([:green, "not exists", :reset])
Logger.info(
">>>> dropping secondary_index table: #{table_name_str}, index: #{index_name_str} result: #{result_str}"
)
:ok