Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-14830 Introduce index internal API #518

Merged
merged 54 commits into from Feb 7, 2022
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
5145e74
wip
Dec 15, 2021
729d33c
wip
Dec 15, 2021
5eecb41
Merge branch '!main' into ignite-14830
Dec 15, 2021
8206f92
wip
Dec 16, 2021
2996275
wip
Dec 17, 2021
291510d
wip
Dec 17, 2021
867c6f4
wip
Dec 17, 2021
32277c1
wip
Dec 20, 2021
fdee376
wip
Dec 21, 2021
2518dad
wip
Dec 22, 2021
fac862b
wip
Dec 22, 2021
d8eb2b1
Merge branch '!main' into ignite-14830
Dec 22, 2021
e22fa5b
wip
Dec 22, 2021
ecae759
wip
Dec 22, 2021
ea5ee74
wip
Dec 22, 2021
0328525
fix pom
Dec 22, 2021
3333143
fix license
Dec 23, 2021
7e33d28
fix api
Jan 28, 2022
1cd39de
move serializer into engine
Jan 28, 2022
9940e31
Merge branch 'ignite-14925-sorted-indexes' into ignite-14830
Jan 28, 2022
d87fb20
manual merge
Jan 28, 2022
4dd7748
manual merge #2
Jan 28, 2022
e4a0307
Merge branch 'ignite-14925-sorted-indexes' into ignite-14830
Jan 28, 2022
e652d84
wip
Jan 28, 2022
8e3292a
fix after merge
Jan 31, 2022
f6bbf43
remove unused
Jan 31, 2022
98b0148
fix test
Jan 31, 2022
91db2b1
fix index conditions
Jan 31, 2022
7f8d66b
remove unused
Jan 31, 2022
1c85d0d
remove idx condition
Jan 31, 2022
bbcd84c
Merge branch 'ignite-14925-sorted-indexes' into ignite-14830
Feb 1, 2022
15f59cc
fix review comments
Feb 1, 2022
c77534f
fix review
Feb 1, 2022
97a6d96
minor
Feb 1, 2022
fdd929c
minor
Feb 1, 2022
5be55e8
minor
Feb 1, 2022
3f87cbc
fix review comments
Feb 2, 2022
5709cf0
Add PK columns to index descriptor interface.
Feb 2, 2022
d29b357
unmute testCreateDropIndex
zstan Feb 3, 2022
e273ddd
Merge branch 'ignite-14830' of github.com:gridgain/apache-ignite-3 in…
zstan Feb 3, 2022
e7fb655
Fix idx manager
Feb 3, 2022
3504b20
remove debug
Feb 3, 2022
7f889fc
fix idx descriptor API
Feb 3, 2022
7b3a715
fix idx scan
Feb 3, 2022
0b1cd41
Update modules/index-api/src/main/java/org/apache/ignite/internal/idx…
Feb 3, 2022
d408fda
Update modules/index-api/src/main/java/org/apache/ignite/internal/idx…
Feb 3, 2022
b388cad
fix idx scan
Feb 3, 2022
afec867
Merge branch 'ignite-14925-sorted-indexes' into ignite-14830
Feb 3, 2022
cfca71a
manual merge
Feb 3, 2022
287ec46
fix manual merge
Feb 3, 2022
a06eda8
fix style
Feb 3, 2022
94ae9cd
minors
Feb 4, 2022
137dfa3
minors
Feb 4, 2022
66d7d2b
minors
Feb 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.manager;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;

/**
* Interface which can produce its events.
*/
public abstract class AbstractProducer<T extends Event, P extends EventParameters> implements Producer<T, P> {
/** All listeners. */
private ConcurrentHashMap<T, ConcurrentLinkedQueue<EventListener<P>>> listeners = new ConcurrentHashMap<>();

/** {@inheritDoc} */
@Override
public void listen(T evt, EventListener<P> closure) {
listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).offer(closure);
}

/** {@inheritDoc} */
@Override
public void removeListener(T evt, EventListener<P> closure) {
removeListener(evt, closure, null);
}

/** {@inheritDoc} */
@Override
public void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause) {
if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure)) {
closure.remove(cause == null ? new ListenerRemovedException() : cause.getCause() == null ? cause : cause.getCause());
}
}

/**
* Notifies every listener that subscribed before.
*
* @param evt Event type.
* @param params Event parameters.
* @param err Exception when it was happened, or {@code null} otherwise.
*/
protected void fireEvent(T evt, P params, Throwable err) {
ConcurrentLinkedQueue<EventListener<P>> queue = listeners.get(evt);

if (queue == null) {
return;
}

EventListener<P> closure;

Iterator<EventListener<P>> iter = queue.iterator();

while (iter.hasNext()) {
closure = iter.next();

if (closure.notify(params, err)) {
iter.remove();
}
}
}
}
Expand Up @@ -20,7 +20,7 @@
/**
* The event cas which is produced by event producer component.
*
* @see Producer#fireEvent(Event, EventParameters, Throwable)
* @see Producer#listen(Event, EventListener)
*/
public interface Event {
}
Expand Up @@ -23,7 +23,7 @@
/**
* The listener handles events from a producer.
*
* @see Producer#listen(Event, EventListener)
* @see AbstractProducer#listen(Event, EventListener)
*/
public interface EventListener<P extends EventParameters> {
/**
Expand Down
Expand Up @@ -20,7 +20,7 @@
/**
* Event parameters. This type passed to the event listener.
*
* @see Producer#fireEvent(Event, EventParameters, Throwable)
* @see AbstractProducer#fireEvent(Event, EventParameters, Throwable)
*/
public interface EventParameters {
}
Expand Up @@ -17,39 +17,29 @@

package org.apache.ignite.internal.manager;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;

/**
* Interface which can produce its events.
*/
public abstract class Producer<T extends Event, P extends EventParameters> {
/** All listeners. */
private ConcurrentHashMap<T, ConcurrentLinkedQueue<EventListener<P>>> listeners = new ConcurrentHashMap<>();

public interface Producer<T extends Event, P extends EventParameters> {
/**
* Registers an event listener. When the event predicate returns true it would never invoke after, otherwise this predicate would
* receive an event again.
*
* @param evt Event.
* @param closure Closure.
*/
public void listen(T evt, EventListener<P> closure) {
listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).offer(closure);
}
void listen(T evt, EventListener<P> closure);

/**
* Removes a listener associated with the event.
*
* @param evt Event.
* @param closure Closure.
*/
public void removeListener(T evt, EventListener<P> closure) {
removeListener(evt, closure, null);
}
void removeListener(T evt, EventListener<P> closure);

/**
* Removes a listener associated with the event.
Expand All @@ -58,36 +48,5 @@ public void removeListener(T evt, EventListener<P> closure) {
* @param closure Closure.
* @param cause The exception that was a cause which a listener is removed.
*/
public void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause) {
if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure)) {
closure.remove(cause == null ? new ListenerRemovedException() : cause.getCause() == null ? cause : cause.getCause());
}
}

/**
* Notifies every listener that subscribed before.
*
* @param evt Event type.
* @param params Event parameters.
* @param err Exception when it was happened, or {@code null} otherwise.
*/
protected void fireEvent(T evt, P params, Throwable err) {
ConcurrentLinkedQueue<EventListener<P>> queue = listeners.get(evt);

if (queue == null) {
return;
}

EventListener<P> closure;

Iterator<EventListener<P>> iter = queue.iterator();

while (iter.hasNext()) {
closure = iter.next();

if (closure.notify(params, err)) {
iter.remove();
}
}
}
void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause);
}
91 changes: 91 additions & 0 deletions modules/index-api/pom.xml
@@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>

<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-parent</artifactId>
<version>1</version>
<relativePath>../../parent/pom.xml</relativePath>
</parent>

<artifactId>ignite-index-api</artifactId>
<version>3.0.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-schema</artifactId>
</dependency>

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-api</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-configuration-annotation-processor</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-configuration-annotation-processor</artifactId>
<version>${project.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.idx;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.configuration.schemas.table.TableIndexChange;
import org.apache.ignite.internal.idx.event.IndexEvent;
import org.apache.ignite.internal.idx.event.IndexEventParameters;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.IndexNotFoundException;
import org.apache.ignite.lang.NodeStoppingException;

/**
* Internal index manager facade provides low-level methods for indexes operations.
*/
public interface IndexManager extends Producer<IndexEvent, IndexEventParameters> {
/**
* Creates a new index with the specified name.
*
* @param idxCanonicalName Index canonical name.
* @param tblCanonicalName Table canonical name.
* @param idxChange Index configuration.
* @return Index.
* @throws IndexAlreadyExistsException if the index exists.
*/
InternalSortedIndex createIndex(
AMashenkov marked this conversation as resolved.
Show resolved Hide resolved
String idxCanonicalName,
String tblCanonicalName,
Consumer<TableIndexChange> idxChange
);

/**
* Create index asynchronously.
*
* @param idxCanonicalName Index canonical name.
* @param tblCanonicalName Table canonical name.
* @param idxChange Index configuration.
* @return Index future, that may be completed exceptionally with {@link IndexAlreadyExistsException} if the index exists.
*/
CompletableFuture<InternalSortedIndex> createIndexAsync(
String idxCanonicalName,
String tblCanonicalName,
Consumer<TableIndexChange> idxChange
);

/**
* Drop index.
*
* @param idxCanonicalName Index canonical name.
* @throws IndexAlreadyExistsException if the index doesn't exist.
*/
void dropIndex(String idxCanonicalName);

/**
* Drop index asynchronously.
*
* @param idxCanonicalName Index canonical name.
* @return Index future, that may be completed exceptionally with {@link IndexNotFoundException} if the index doesn't exist.
*/
CompletableFuture<Void> dropIndexAsync(String idxCanonicalName);

/**
* Gets indexes of the table.
*
* @param tblId Table identifier to lookup indexes.
* @return Indexes of the table.
* @throws NodeStoppingException If an implementation stopped before the method was invoked.
*/
List<InternalSortedIndex> indexes(UUID tblId);

/**
* Gets index of the table.
*
* @return Index of the table.
* @throws NodeStoppingException If an implementation stopped before the method was invoked.
*/
InternalSortedIndex index(String idxCanonicalName);

/**
* Gets index of the table asynchronously.
*
* @return Index future.
* @throws NodeStoppingException If an implementation stopped before the method was invoked.
*/
CompletableFuture<InternalSortedIndex> indexAsync(String idxCanonicalName);
}