Skip to content

Commit ceeb3ef

Browse files
andylau-55caszkguigithub-actions[bot]
authored
fix(all): version 0.8 (#572)
Co-authored-by: 田常@蚂蚁 <zhengke.gzk@antgroup.com> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent da1fa61 commit ceeb3ef

File tree

225 files changed

+11353
-864
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

225 files changed

+11353
-864
lines changed

builder/model/src/main/java/com/antgroup/openspg/builder/model/record/SubGraphRecord.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import lombok.Data;
2323
import lombok.EqualsAndHashCode;
2424
import lombok.Getter;
25+
import lombok.Setter;
2526

2627
@Getter
28+
@Setter
2729
@AllArgsConstructor
2830
@EqualsAndHashCode(callSuper = false)
2931
public class SubGraphRecord extends BaseRecord {

builder/runner/local/src/main/java/com/antgroup/openspg/builder/runner/local/physical/sink/impl/Neo4jSinkWriter.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,14 @@
2929
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.record.LPGPropertyRecord;
3030
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.record.VertexRecord;
3131
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.schema.EdgeTypeName;
32+
import com.antgroup.openspg.cloudext.interfaces.graphstore.util.TypeChecker;
3233
import com.antgroup.openspg.server.common.model.project.Project;
3334
import com.google.common.collect.Lists;
3435
import java.math.BigDecimal;
3536
import java.util.ArrayList;
3637
import java.util.List;
3738
import java.util.Map;
38-
import java.util.concurrent.ExecutionException;
39-
import java.util.concurrent.ExecutorService;
40-
import java.util.concurrent.Future;
41-
import java.util.concurrent.LinkedBlockingQueue;
42-
import java.util.concurrent.RejectedExecutionHandler;
43-
import java.util.concurrent.ThreadPoolExecutor;
44-
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.*;
4540
import java.util.function.Consumer;
4641
import lombok.extern.slf4j.Slf4j;
4742
import org.apache.commons.lang3.StringUtils;
@@ -188,7 +183,11 @@ private void writeNode(SubGraphRecord.Node node) {
188183
List<VertexRecord> vertexRecords = Lists.newArrayList();
189184
List<LPGPropertyRecord> properties = Lists.newArrayList();
190185
for (Map.Entry<String, Object> entry : node.getProperties().entrySet()) {
191-
properties.add(new LPGPropertyRecord(entry.getKey(), entry.getValue()));
186+
Object entryValue = entry.getValue();
187+
if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) {
188+
entryValue = JSON.toJSONString(entryValue);
189+
}
190+
properties.add(new LPGPropertyRecord(entry.getKey(), entryValue));
192191
}
193192
VertexRecord vertexRecord = new VertexRecord(node.getId(), label, properties);
194193
vertexRecords.add(vertexRecord);
@@ -226,7 +225,11 @@ private void writeEdge(SubGraphRecord.Edge edge) {
226225
List<EdgeRecord> edgeRecords = Lists.newArrayList();
227226
List<LPGPropertyRecord> properties = Lists.newArrayList();
228227
for (Map.Entry<String, Object> entry : edge.getProperties().entrySet()) {
229-
properties.add(new LPGPropertyRecord(entry.getKey(), entry.getValue()));
228+
Object entryValue = entry.getValue();
229+
if (!TypeChecker.isArrayOrCollectionOfPrimitives(entryValue)) {
230+
entryValue = JSON.toJSONString(entryValue);
231+
}
232+
properties.add(new LPGPropertyRecord(entry.getKey(), entryValue));
230233
}
231234

232235
EdgeTypeName edgeTypeName =

cloudext/impl/graph-store/tugraph/src/main/java/com/antgroup/openspg/cloudext/impl/graphstore/tugraph/TuGraphStoreClient.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import com.antgroup.openspg.cloudext.interfaces.graphstore.model.lpg.schema.operation.SchemaAtomicOperationEnum;
4949
import com.antgroup.openspg.cloudext.interfaces.graphstore.util.TypeNameUtils;
5050
import com.antgroup.openspg.core.schema.model.type.BasicTypeEnum;
51-
import com.antgroup.openspg.server.api.facade.ApiConstants;
5251
import com.antgroup.tugraph.TuGraphDbRpcClient;
5352
import com.google.common.collect.Lists;
5453
import java.io.InputStream;
@@ -78,13 +77,16 @@ public class TuGraphStoreClient extends BaseLPGGraphStoreClient {
7877
@Getter private final LPGTypeNameConvertor typeNameConvertor;
7978
@Getter private final String connUrl;
8079

80+
private static final String ACCESS_ID = "accessId";
81+
private static final String ACCESS_KEY = "accessKey";
82+
private static final String TIMEOUT = "timeout";
83+
8184
public TuGraphStoreClient(String connUrl, LPGTypeNameConvertor typeNameConvertor) {
8285
UriComponents uriComponents = UriComponentsBuilder.fromUriString(connUrl).build();
8386
this.connUrl = connUrl;
8487
this.graphName = uriComponents.getQueryParams().getFirst(TuGraphConstants.GRAPH_NAME);
8588
this.timeout =
86-
Double.parseDouble(
87-
String.valueOf(uriComponents.getQueryParams().getFirst(ApiConstants.TIMEOUT)));
89+
Double.parseDouble(String.valueOf(uriComponents.getQueryParams().getFirst(TIMEOUT)));
8890
this.client = initTuGraphClient(uriComponents);
8991
this.internalIdGenerator = new NoChangedIdGenerator();
9092
this.typeNameConvertor = typeNameConvertor;
@@ -149,8 +151,8 @@ public void close() throws Exception {
149151

150152
private TuGraphDbRpcClient initTuGraphClient(UriComponents uriComponents) {
151153
String host = String.format("%s:%s", uriComponents.getHost(), uriComponents.getPort());
152-
String accessId = uriComponents.getQueryParams().getFirst(ApiConstants.ACCESS_ID);
153-
String accessKey = uriComponents.getQueryParams().getFirst(ApiConstants.ACCESS_KEY);
154+
String accessId = uriComponents.getQueryParams().getFirst(ACCESS_ID);
155+
String accessKey = uriComponents.getQueryParams().getFirst(ACCESS_KEY);
154156
TuGraphDbRpcClient client = null;
155157
try {
156158
client = new TuGraphDbRpcClient(host, accessId, accessKey);

cloudext/impl/object-storage/minio/src/main/java/com/antgroup/openspg/cloudext/impl/objectstorage/minio/MinioClient.java

Lines changed: 83 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import java.io.OutputStream;
3838
import java.net.URI;
3939
import java.nio.charset.StandardCharsets;
40+
import java.util.ArrayList;
4041
import java.util.Date;
42+
import java.util.List;
4143
import lombok.Getter;
4244
import lombok.extern.slf4j.Slf4j;
4345
import org.apache.commons.compress.utils.IOUtils;
@@ -72,6 +74,7 @@ private io.minio.MinioClient initMinioClient(UriComponents uriComponents) {
7274

7375
@Override
7476
public Boolean saveData(String bucketName, byte[] data, String fileKey) {
77+
Long start = System.currentTimeMillis();
7578
ByteArrayInputStream inputStream = null;
7679
try {
7780
inputStream = new ByteArrayInputStream(data);
@@ -87,19 +90,23 @@ public Boolean saveData(String bucketName, byte[] data, String fileKey) {
8790
log.error(e.getMessage(), e);
8891
throw new RuntimeException("minio saveData Exception:" + e.getMessage(), e);
8992
} finally {
93+
log.info("minio saveData cons:{} fileKey:{}", System.currentTimeMillis() - start, fileKey);
9094
IOUtils.closeQuietly(inputStream);
9195
}
9296
}
9397

9498
@Override
9599
public byte[] getData(String bucketName, String fileKey) {
100+
Long start = System.currentTimeMillis();
96101
try {
97102
GetObjectArgs getObjectArgs =
98103
GetObjectArgs.builder().bucket(bucketName).object(fileKey).build();
99104
return inputStreamToByteArray(minioClient.getObject(getObjectArgs));
100105
} catch (Exception e) {
101106
log.error(e.getMessage(), e);
102107
throw new RuntimeException("minio getData Exception:" + e.getMessage(), e);
108+
} finally {
109+
log.info("minio getData cons:{} fileKey:{}", System.currentTimeMillis() - start, fileKey);
103110
}
104111
}
105112

@@ -239,10 +246,14 @@ public String getUrlWithoutExpiration(String bucketName, String fileKey) {
239246
@Override
240247
public Boolean removeObject(String bucketName, String fileKey) {
241248
try {
242-
RemoveObjectArgs removeObjectArgs =
243-
RemoveObjectArgs.builder().bucket(bucketName).object(fileKey).build();
244-
minioClient.removeObject(removeObjectArgs);
245-
return true;
249+
if (isDirectory(bucketName, fileKey)) {
250+
return removeDirectory(bucketName, fileKey);
251+
} else {
252+
RemoveObjectArgs removeObjectArgs =
253+
RemoveObjectArgs.builder().bucket(bucketName).object(fileKey).build();
254+
minioClient.removeObject(removeObjectArgs);
255+
return true;
256+
}
246257
} catch (Exception e) {
247258
log.error(e.getMessage(), e);
248259
throw new RuntimeException("minio removeObject Exception:" + e.getMessage(), e);
@@ -252,26 +263,64 @@ public Boolean removeObject(String bucketName, String fileKey) {
252263
@Override
253264
public Boolean removeDirectory(String bucketName, String directoryPath) {
254265
try {
255-
Iterable<Result<Item>> objects =
266+
List<String> files = getAllFilesRecursively(bucketName, directoryPath);
267+
268+
for (String file : files) {
269+
log.info("minio Deleting: " + file);
270+
if (file.startsWith(directoryPath)) {
271+
removeObject(bucketName, file);
272+
}
273+
}
274+
return true;
275+
} catch (Exception e) {
276+
log.error(e.getMessage(), e);
277+
throw new RuntimeException("minio removeDirectory Exception:" + e.getMessage(), e);
278+
}
279+
}
280+
281+
@Override
282+
public Boolean isDirectory(String bucketName, String path) {
283+
try {
284+
Iterable<Result<Item>> items =
285+
minioClient.listObjects(
286+
ListObjectsArgs.builder().bucket(bucketName).prefix(path).recursive(false).build());
287+
for (Result<Item> result : items) {
288+
Item item = result.get();
289+
if (item.isDir() || !item.objectName().equals(path)) {
290+
return true;
291+
}
292+
}
293+
return false;
294+
} catch (Exception e) {
295+
log.error(e.getMessage(), e);
296+
throw new RuntimeException("minio isDirectory Exception:" + e.getMessage(), e);
297+
}
298+
}
299+
300+
@Override
301+
public List<String> getAllFilesRecursively(String bucketName, String directoryPath) {
302+
List<String> filePaths = new ArrayList<>();
303+
try {
304+
Iterable<Result<Item>> items =
256305
minioClient.listObjects(
257306
ListObjectsArgs.builder()
258307
.bucket(bucketName)
259308
.prefix(directoryPath)
260-
.recursive(true)
309+
.recursive(false)
261310
.build());
262-
263-
for (Result<Item> result : objects) {
311+
for (Result<Item> result : items) {
264312
Item item = result.get();
265-
log.info("minio Deleting: " + item.objectName());
266-
if (item.objectName().startsWith(directoryPath)) {
267-
removeObject(bucketName, item.objectName());
313+
if (item.isDir()) {
314+
filePaths.addAll(getAllFilesRecursively(bucketName, item.objectName()));
315+
} else {
316+
filePaths.add(item.objectName());
268317
}
269318
}
270-
return true;
271319
} catch (Exception e) {
272320
log.error(e.getMessage(), e);
273-
throw new RuntimeException("minio removeDirectory Exception:" + e.getMessage(), e);
321+
throw new RuntimeException("minio getAllFilesRecursively Exception: " + e.getMessage(), e);
274322
}
323+
return filePaths;
275324
}
276325

277326
@Override
@@ -287,6 +336,27 @@ public Long getContentLength(String bucketName, String objectName) {
287336
}
288337
}
289338

339+
@Override
340+
public long getStorageSize(String bucketName, String path) {
341+
try {
342+
Iterable<Result<Item>> items =
343+
minioClient.listObjects(
344+
ListObjectsArgs.builder().bucket(bucketName).prefix(path).recursive(true).build());
345+
346+
long totalSizeInBytes = 0;
347+
for (Result<Item> itemResult : items) {
348+
Item item = itemResult.get();
349+
if (!item.isDir()) {
350+
totalSizeInBytes += item.size();
351+
}
352+
}
353+
return totalSizeInBytes;
354+
} catch (Exception e) {
355+
log.error(e.getMessage(), e);
356+
throw new RuntimeException("minio getStorageSize Exception:" + e.getMessage(), e);
357+
}
358+
}
359+
290360
public void makeBucket(String bucketName) {
291361
try {
292362
boolean isExist =

cloudext/impl/object-storage/oss/src/main/java/com/antgroup/openspg/cloudext/impl/objectstorage/oss/OSSClient.java

Lines changed: 85 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import java.io.InputStream;
3434
import java.io.OutputStream;
3535
import java.nio.charset.StandardCharsets;
36+
import java.util.ArrayList;
3637
import java.util.Date;
38+
import java.util.List;
3739
import lombok.Getter;
3840
import lombok.extern.slf4j.Slf4j;
3941
import org.springframework.web.util.UriComponents;
@@ -42,6 +44,8 @@
4244
@Slf4j
4345
public class OSSClient implements ObjectStorageClient {
4446

47+
private static final String SLASH = "/";
48+
4549
private final OSS ossClient;
4650

4751
@Getter private final String connUrl;
@@ -209,8 +213,12 @@ public String getUrlWithoutExpiration(String bucketName, String fileKey) {
209213
@Override
210214
public Boolean removeObject(String bucketName, String fileKey) {
211215
try {
212-
ossClient.deleteObject(bucketName, fileKey);
213-
return true;
216+
if (isDirectory(bucketName, fileKey)) {
217+
return removeDirectory(bucketName, fileKey);
218+
} else {
219+
ossClient.deleteObject(bucketName, fileKey);
220+
return true;
221+
}
214222
} catch (Exception e) {
215223
log.error(e.getMessage(), e);
216224
throw new RuntimeException("OSS removeObject Exception:" + e.getMessage(), e);
@@ -220,25 +228,67 @@ public Boolean removeObject(String bucketName, String fileKey) {
220228
@Override
221229
public Boolean removeDirectory(String bucketName, String directoryPath) {
222230
try {
223-
ObjectListing objectListing;
231+
List<String> files = getAllFilesRecursively(bucketName, directoryPath);
232+
233+
for (String file : files) {
234+
log.info("OSS Deleting: " + file);
235+
if (file.startsWith(directoryPath)) {
236+
removeObject(bucketName, file);
237+
}
238+
}
239+
return true;
240+
} catch (Exception e) {
241+
log.error(e.getMessage(), e);
242+
throw new RuntimeException("OSS removeDirectory Exception: " + e.getMessage(), e);
243+
}
244+
}
224245

246+
@Override
247+
public Boolean isDirectory(String bucketName, String path) {
248+
try {
249+
ObjectListing objectListing = ossClient.listObjects(bucketName, path);
250+
if (!objectListing.getCommonPrefixes().isEmpty()
251+
|| !objectListing.getObjectSummaries().isEmpty()) {
252+
if (path.endsWith(SLASH)) {
253+
return true;
254+
}
255+
}
256+
for (OSSObjectSummary objectSummary : objectListing.getObjectSummaries()) {
257+
String objectName = objectSummary.getKey();
258+
if (objectName.endsWith(SLASH) || !objectName.equals(path)) {
259+
return true;
260+
}
261+
}
262+
return false;
263+
} catch (Exception e) {
264+
log.error(e.getMessage(), e);
265+
throw new RuntimeException("OSS isDirectory Exception: " + e.getMessage(), e);
266+
}
267+
}
268+
269+
@Override
270+
public List<String> getAllFilesRecursively(String bucketName, String directoryPath) {
271+
List<String> filePaths = new ArrayList<>();
272+
try {
273+
String nextMarker;
225274
do {
226-
objectListing = ossClient.listObjects(bucketName, directoryPath);
227-
for (OSSObjectSummary objectSummary : objectListing.getObjectSummaries()) {
228-
String objectName = objectSummary.getKey();
229-
log.info("OSS Deleting: " + objectName);
230-
if (objectName.startsWith(directoryPath)) {
231-
removeObject(bucketName, objectName);
275+
ObjectListing objectListing = ossClient.listObjects(bucketName, directoryPath);
276+
List<OSSObjectSummary> objectSummaries = objectListing.getObjectSummaries();
277+
for (OSSObjectSummary summary : objectSummaries) {
278+
if (summary.getKey().endsWith(SLASH)) {
279+
filePaths.addAll(getAllFilesRecursively(bucketName, summary.getKey()));
280+
} else {
281+
filePaths.add(summary.getKey());
232282
}
233283
}
234-
objectListing.getNextMarker();
235-
} while (objectListing.isTruncated());
284+
nextMarker = objectListing.getNextMarker();
285+
} while (nextMarker != null);
236286

237-
return true;
238287
} catch (Exception e) {
239-
log.error(e.getMessage(), e);
240-
throw new RuntimeException("OSS removeDirectory Exception:" + e.getMessage(), e);
288+
log.error("OSS getAllFilesRecursively Exception: " + e.getMessage(), e);
289+
throw new RuntimeException("OSS getAllFilesRecursively Exception: " + e.getMessage(), e);
241290
}
291+
return filePaths;
242292
}
243293

244294
@Override
@@ -251,4 +301,25 @@ public Long getContentLength(String bucketName, String objectName) {
251301
throw new RuntimeException("OSS getContentLength Exception:" + e.getMessage(), e);
252302
}
253303
}
304+
305+
@Override
306+
public long getStorageSize(String bucketName, String path) {
307+
long totalSizeInBytes = 0L;
308+
309+
try {
310+
String nextMarker;
311+
do {
312+
ObjectListing objectListing = ossClient.listObjects(bucketName, path);
313+
for (OSSObjectSummary objectSummary : objectListing.getObjectSummaries()) {
314+
totalSizeInBytes += objectSummary.getSize();
315+
}
316+
nextMarker = objectListing.getNextMarker();
317+
} while (nextMarker != null);
318+
return totalSizeInBytes;
319+
320+
} catch (Exception e) {
321+
log.error(e.getMessage(), e);
322+
throw new RuntimeException("OSS getStorageSize Exception:" + e.getMessage(), e);
323+
}
324+
}
254325
}

0 commit comments

Comments
 (0)