Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-22172 master #3726

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
efe1734
IGNITE-18991 Move stable/planned/pending assignments from table to
alievmirza Mar 18, 2024
70a4580
Fixed test after merge.
vldpyatkov Apr 8, 2024
7f88979
WIP
vldpyatkov Apr 8, 2024
6012c4d
WIP
vldpyatkov Apr 9, 2024
3c072ae
WIP
vldpyatkov Apr 9, 2024
85c758b
Resolved merged conflict.
vldpyatkov Apr 9, 2024
75752f2
Code style.
vldpyatkov Apr 9, 2024
7baea19
PMD
vldpyatkov Apr 9, 2024
e46518e
WIP
vldpyatkov Apr 9, 2024
b56904f
WIP
vldpyatkov Apr 9, 2024
267b167
Fixed test
vldpyatkov Apr 9, 2024
fd91aab
WIP
vldpyatkov Apr 9, 2024
b3f4f87
Fixed NPE
vldpyatkov Apr 9, 2024
b6d763f
WIP
vldpyatkov Apr 9, 2024
a9dc903
Removed debug log.
vldpyatkov Apr 10, 2024
f997ff4
Chnages after review form M.Efremov and M.Aliev
JAkutenshi Apr 12, 2024
945d5dd
Fixed compilation error.
vldpyatkov Apr 13, 2024
635ae63
Fixed after review for M.Aliev.
vldpyatkov Apr 16, 2024
e0d5ca9
Added busy lock checking in the index builder.
vldpyatkov Apr 16, 2024
6d6f075
IGNITE-18991 fix index NPE
alievmirza Apr 16, 2024
040da60
Revert "IGNITE-18991 fix index NPE"
vldpyatkov Apr 18, 2024
d2061b7
Merge branch 'main' into ignite-18991-WIP
vldpyatkov Apr 18, 2024
1aa29c3
Fixed merge artifacts.
vldpyatkov Apr 18, 2024
7e0474c
Fixed compilation.
vldpyatkov Apr 18, 2024
6a2557e
Unused import is removed
vldpyatkov Apr 18, 2024
77a8b61
Fixed update subgroups.
vldpyatkov Apr 23, 2024
1e5442a
Fixed NPE
vldpyatkov Apr 23, 2024
ddd04dd
Merge remote-tracking branch 'apache-ignite-3/main' into ignite-collo…
alievmirza Apr 29, 2024
aaee70b
IGNITE-21911 Change API usage of Placement driver in Index module fro…
alievmirza Apr 29, 2024
88e80dc
IGNITE-21912 Change API usage of Placement driver for InternalTableIm…
alievmirza May 6, 2024
e5d2dbe
IGNITE-22172 Make ignite-collocation-feature branch stable
vldpyatkov May 6, 2024
0f3ac44
Merge branch 'main' into ignite-22172
vldpyatkov May 8, 2024
cd6a707
WIP
vldpyatkov May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {
this.columns = copyOrNull(columns);
}

protected abstract CatalogIndexDescriptor createDescriptor(int indexId, int tableId, int creationCatalogVersion);
protected abstract CatalogIndexDescriptor createDescriptor(int indexId, int tableId, int zoneId, int creationCatalogVersion);

@Override
public List<UpdateEntry> get(Catalog catalog) {
Expand All @@ -84,7 +84,9 @@ public List<UpdateEntry> get(Catalog catalog) {
}

return List.of(
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), catalog.version() + 1), schemaName),
new NewIndexEntry(
createDescriptor(catalog.objectIdGenState(), table.id(), table.zoneId(), catalog.version() + 1), schemaName
),
new ObjectIdGenUpdateEntry(1)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ private CreateHashIndexCommand(String schemaName, String indexName, String table
}

@Override
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, int creationCatalogVersion) {
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, int zoneId, int creationCatalogVersion) {
return new CatalogHashIndexDescriptor(
indexId, indexName, tableId, unique, creationCatalogVersion, columns
indexId, indexName, tableId, unique, creationCatalogVersion, zoneId, columns
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private CreateSortedIndexCommand(String schemaName, String indexName, String tab
}

@Override
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, int creationCatalogVersion) {
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, int zoneId, int creationCatalogVersion) {
var indexColumnDescriptors = new ArrayList<CatalogIndexColumnDescriptor>(columns.size());

for (int i = 0; i < columns.size(); i++) {
Expand All @@ -71,7 +71,7 @@ protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, int
}

return new CatalogSortedIndexDescriptor(
indexId, indexName, tableId, unique, creationCatalogVersion, indexColumnDescriptors
indexId, indexName, tableId, unique, creationCatalogVersion, zoneId, indexColumnDescriptors
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public List<UpdateEntry> get(Catalog catalog) {
ensureNoTableIndexOrSysViewExistsWithGivenName(schema, indexName);
int txWaitCatalogVersion = catalog.version() + 1;

CatalogIndexDescriptor pkIndex = createIndexDescriptor(txWaitCatalogVersion, indexName, pkIndexId, tableId);
CatalogIndexDescriptor pkIndex = createIndexDescriptor(txWaitCatalogVersion, indexName, pkIndexId, tableId, zone.id());

return List.of(
new NewTableEntry(table, schemaName),
Expand Down Expand Up @@ -200,7 +200,13 @@ private void validate() {
}
}

private CatalogIndexDescriptor createIndexDescriptor(int txWaitCatalogVersion, String indexName, int pkIndexId, int tableId) {
private CatalogIndexDescriptor createIndexDescriptor(
int txWaitCatalogVersion,
String indexName,
int pkIndexId,
int tableId,
int zoneId
) {
CatalogIndexDescriptor pkIndex;

if (primaryKey instanceof TableSortedPrimaryKey) {
Expand All @@ -221,6 +227,7 @@ private CatalogIndexDescriptor createIndexDescriptor(int txWaitCatalogVersion, S
true,
AVAILABLE,
txWaitCatalogVersion,
zoneId,
indexColumns
);
} else if (primaryKey instanceof TableHashPrimaryKey) {
Expand All @@ -232,6 +239,7 @@ private CatalogIndexDescriptor createIndexDescriptor(int txWaitCatalogVersion, S
true,
AVAILABLE,
txWaitCatalogVersion,
zoneId,
hashPrimaryKey.columns()
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,20 @@ public class CatalogHashIndexDescriptor extends CatalogIndexDescriptor {
* @param unique Unique flag.
* @param txWaitCatalogVersion Catalog version used in special index status updates to wait for RW transactions, started before
* this version, to finish.
* @param zoneId Zone id where table for the index is presented.
* @param columns A list of indexed columns. Must not contains duplicates.
* @throws IllegalArgumentException If columns list contains duplicates.
*/
public CatalogHashIndexDescriptor(int id, String name, int tableId, boolean unique, int txWaitCatalogVersion, List<String> columns) {
this(id, name, tableId, unique, CatalogIndexStatus.REGISTERED, txWaitCatalogVersion, columns, INITIAL_CAUSALITY_TOKEN);
public CatalogHashIndexDescriptor(
int id,
String name,
int tableId,
boolean unique,
int txWaitCatalogVersion,
int zoneId,
List<String> columns
) {
this(id, name, tableId, unique, CatalogIndexStatus.REGISTERED, txWaitCatalogVersion, zoneId, columns, INITIAL_CAUSALITY_TOKEN);
}

/**
Expand All @@ -72,9 +81,10 @@ public CatalogHashIndexDescriptor(
boolean unique,
CatalogIndexStatus status,
int txWaitCatalogVersion,
int zoneId,
List<String> columns
) {
this(id, name, tableId, unique, status, txWaitCatalogVersion, columns, INITIAL_CAUSALITY_TOKEN);
this(id, name, tableId, unique, status, txWaitCatalogVersion, zoneId, columns, INITIAL_CAUSALITY_TOKEN);
}

/**
Expand All @@ -98,10 +108,11 @@ private CatalogHashIndexDescriptor(
boolean unique,
CatalogIndexStatus status,
int txWaitCatalogVersion,
int zoneId,
List<String> columns,
long causalityToken
) {
super(CatalogIndexDescriptorType.HASH, id, name, tableId, unique, status, txWaitCatalogVersion, causalityToken);
super(CatalogIndexDescriptorType.HASH, id, name, tableId, unique, status, txWaitCatalogVersion, zoneId, causalityToken);

this.columns = List.copyOf(Objects.requireNonNull(columns, "columns"));
}
Expand All @@ -126,9 +137,10 @@ public CatalogHashIndexDescriptor readFrom(IgniteDataInput input) throws IOExcep
boolean unique = input.readBoolean();
CatalogIndexStatus status = CatalogIndexStatus.forId(input.readByte());
int txWaitCatalogVersion = input.readInt();
int zoneId = input.readInt();
List<String> columns = readStringCollection(input, ArrayList::new);

return new CatalogHashIndexDescriptor(id, name, tableId, unique, status, txWaitCatalogVersion, columns, updateToken);
return new CatalogHashIndexDescriptor(id, name, tableId, unique, status, txWaitCatalogVersion, zoneId, columns, updateToken);
}

@Override
Expand All @@ -140,6 +152,7 @@ public void writeTo(CatalogHashIndexDescriptor descriptor, IgniteDataOutput outp
output.writeBoolean(descriptor.unique());
output.writeByte(descriptor.status().id());
output.writeInt(descriptor.txWaitCatalogVersion());
output.writeInt(descriptor.zoneId());
writeStringCollection(descriptor.columns(), output);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,18 @@ public abstract class CatalogIndexDescriptor extends CatalogObjectDescriptor {
/** Index descriptor type. */
private final CatalogIndexDescriptorType indexType;

/** Zone id where table for the index is presented. */
private final int zoneId;

CatalogIndexDescriptor(CatalogIndexDescriptorType indexType, int id, String name, int tableId, boolean unique,
CatalogIndexStatus status, int txWaitCatalogVersion, long causalityToken) {
CatalogIndexStatus status, int txWaitCatalogVersion, int zoneId, long causalityToken) {
super(id, Type.INDEX, name, causalityToken);
this.indexType = indexType;
this.tableId = tableId;
this.unique = unique;
this.status = Objects.requireNonNull(status, "status");
this.txWaitCatalogVersion = txWaitCatalogVersion;
this.zoneId = zoneId;
}

/** Gets table ID. */
Expand All @@ -72,6 +76,11 @@ public int txWaitCatalogVersion() {
return txWaitCatalogVersion;
}

/** Return zone id where table for the index is presented. */
public int zoneId() {
return zoneId;
}

/** Returns catalog index descriptor type. */
public CatalogIndexDescriptorType indexType() {
return indexType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class CatalogSortedIndexDescriptor extends CatalogIndexDescriptor {
* @param unique Unique flag.
* @param txWaitCatalogVersion Catalog version used in special index status updates to wait for RW transactions, started before
* this version, to finish.
* @param zoneId Zone id where table for the index is presented.
* @param columns A list of columns descriptors.
* @throws IllegalArgumentException If columns list contains duplicates or columns size doesn't match the collations size.
*/
Expand All @@ -55,9 +56,10 @@ public CatalogSortedIndexDescriptor(
int tableId,
boolean unique,
int txWaitCatalogVersion,
int zoneId,
List<CatalogIndexColumnDescriptor> columns
) {
this(id, name, tableId, unique, REGISTERED, txWaitCatalogVersion, columns);
this(id, name, tableId, unique, REGISTERED, txWaitCatalogVersion, zoneId, columns);
}

/**
Expand All @@ -80,9 +82,10 @@ public CatalogSortedIndexDescriptor(
boolean unique,
CatalogIndexStatus status,
int txWaitCatalogVersion,
int zoneId,
List<CatalogIndexColumnDescriptor> columns
) {
this(id, name, tableId, unique, status, txWaitCatalogVersion, columns, INITIAL_CAUSALITY_TOKEN);
this(id, name, tableId, unique, status, txWaitCatalogVersion, zoneId, columns, INITIAL_CAUSALITY_TOKEN);
}

/**
Expand All @@ -106,10 +109,11 @@ private CatalogSortedIndexDescriptor(
boolean unique,
CatalogIndexStatus status,
int txWaitCatalogVersion,
int zoneId,
List<CatalogIndexColumnDescriptor> columns,
long causalityToken
) {
super(CatalogIndexDescriptorType.SORTED, id, name, tableId, unique, status, txWaitCatalogVersion, causalityToken);
super(CatalogIndexDescriptorType.SORTED, id, name, tableId, unique, status, txWaitCatalogVersion, zoneId, causalityToken);

this.columns = Objects.requireNonNull(columns, "columns");
}
Expand All @@ -134,9 +138,10 @@ public CatalogSortedIndexDescriptor readFrom(IgniteDataInput input) throws IOExc
boolean unique = input.readBoolean();
CatalogIndexStatus status = CatalogIndexStatus.forId(input.readByte());
int txWaitCatalogVersion = input.readInt();
int zoneId = input.readInt();
List<CatalogIndexColumnDescriptor> columns = readList(CatalogIndexColumnDescriptor.SERIALIZER, input);

return new CatalogSortedIndexDescriptor(id, name, tableId, unique, status, txWaitCatalogVersion, columns, updateToken);
return new CatalogSortedIndexDescriptor(id, name, tableId, unique, status, txWaitCatalogVersion, zoneId, columns, updateToken);
}

@Override
Expand All @@ -148,6 +153,7 @@ public void writeTo(CatalogSortedIndexDescriptor descriptor, IgniteDataOutput ou
output.writeBoolean(descriptor.unique());
output.writeByte(descriptor.status().id());
output.writeInt(descriptor.txWaitCatalogVersion());
output.writeInt(descriptor.zoneId());
writeList(descriptor.columns(), CatalogIndexColumnDescriptor.SERIALIZER, output);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ private static CatalogIndexDescriptor updateHashIndexStatus(
index.unique(),
newStatus,
txWaitCatalogVersion,
index.zoneId(),
index.columns()
);
}
Expand All @@ -125,6 +126,7 @@ private static CatalogIndexDescriptor updateSortedIndexStatus(
index.unique(),
newStatus,
txWaitCatalogVersion,
index.zoneId(),
index.columns()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private CatalogIndexDescriptor changeHashIndexName(CatalogHashIndexDescriptor in
index.unique(),
index.status(),
index.txWaitCatalogVersion(),
index.zoneId(),
index.columns()
);
}
Expand All @@ -113,6 +114,7 @@ private CatalogIndexDescriptor changeSortedIndexName(CatalogSortedIndexDescripto
index.unique(),
index.status(),
index.txWaitCatalogVersion(),
index.zoneId(),
index.columns()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ void exceptionIsThrownIfIndexHasInvalidPreviousStatus(CatalogIndexStatus invalid
false,
invalidPreviousIndexStatus,
version,
0,
List.of(columnName)
)
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ void testReplaceIndex() {
fooIndex.unique(),
fooIndex.status(),
fooIndex.txWaitCatalogVersion(),
fooIndex.zoneId(),
fooIndex.columns()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class CatalogHashIndexDescriptorTest {
@Test
void toStringContainsTypeAndFields() {
var descriptor = new CatalogHashIndexDescriptor(1, "index1", 2, false, 3, List.of("col"));
var descriptor = new CatalogHashIndexDescriptor(1, "index1", 2, false, 3, 0, List.of("col"));

String toString = descriptor.toString();

Expand All @@ -36,5 +36,6 @@ void toStringContainsTypeAndFields() {
assertThat(toString, containsString("name=index1"));
assertThat(toString, containsString("tableId=2"));
assertThat(toString, containsString("status=REGISTERED"));
assertThat(toString, containsString("zoneId=0"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class CatalogSortedIndexDescriptorTest {
@Test
void toStringContainsTypeAndFields() {
var descriptor = new CatalogSortedIndexDescriptor(1, "index1", 2, false, 3, List.of());
var descriptor = new CatalogSortedIndexDescriptor(1, "index1", 2, false, 3, 0, List.of());

String toString = descriptor.toString();

Expand All @@ -36,5 +36,6 @@ void toStringContainsTypeAndFields() {
assertThat(toString, containsString("name=index1"));
assertThat(toString, containsString("tableId=2"));
assertThat(toString, containsString("status=REGISTERED"));
assertThat(toString, containsString("zoneId=0"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,12 @@ private static CatalogSortedIndexDescriptor newSortedIndexDescriptor(String name
CatalogIndexColumnDescriptor idxCol4 = new CatalogIndexColumnDescriptor("C4", CatalogColumnCollation.ASC_NULLS_LAST);

return new CatalogSortedIndexDescriptor(
1, name, 12, false, CatalogIndexStatus.AVAILABLE, 1, List.of(idxCol1, idxCol2, idxCol3, idxCol4));
1, name, 12, false, CatalogIndexStatus.AVAILABLE, 1, 0, List.of(idxCol1, idxCol2, idxCol3, idxCol4));
}

private static CatalogHashIndexDescriptor newHashIndexDescriptor(String name) {
return new CatalogHashIndexDescriptor(
1, name, 12, true, CatalogIndexStatus.REGISTERED, 1, List.of("C1", "C2"));
1, name, 12, true, CatalogIndexStatus.REGISTERED, 1, 0, List.of("C1", "C2"));
}

private static CatalogTableDescriptor newTableDescriptor(String name, List<CatalogTableColumnDescriptor> columns) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import org.apache.ignite.internal.util.ExceptionUtils;
Expand Down Expand Up @@ -291,11 +292,13 @@ void stop() {

private void onPrimaryReplicaChanged(PrimaryReplicaEventParameters primaryReplicaEvent) {
inBusyLock(busyLock, () -> {
if (!(primaryReplicaEvent.groupId() instanceof TablePartitionId)) {
if (!(primaryReplicaEvent.groupId() instanceof ZonePartitionId)) {
return;
}

TablePartitionId tablePartitionId = (TablePartitionId) primaryReplicaEvent.groupId();
ZonePartitionId zonePartitionId = (ZonePartitionId) primaryReplicaEvent.groupId();

TablePartitionId tablePartitionId = new TablePartitionId(zonePartitionId.tableId(), zonePartitionId.partitionId());

updatePrimaryReplica(tablePartitionId, primaryReplicaEvent.startTime(), primaryReplicaEvent.leaseholder());
});
Expand Down