-
Notifications
You must be signed in to change notification settings - Fork 703
/
TableStatusReadCommittedScope.java
119 lines (102 loc) · 4.5 KB
/
TableStatusReadCommittedScope.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.readcommitter;
import java.io.IOException;
import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.mutate.UpdateVO;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.hadoop.conf.Configuration;
/**
* ReadCommittedScope for the managed carbon table
*/
@InterfaceAudience.Internal
@InterfaceStability.Stable
public class TableStatusReadCommittedScope implements ReadCommittedScope {
private static final long serialVersionUID = 2324397174595872738L;
private LoadMetadataDetails[] loadMetadataDetails;
private AbsoluteTableIdentifier identifier;
private transient Configuration configuration;
public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
Configuration configuration) throws IOException {
this.identifier = identifier;
this.configuration = configuration;
takeCarbonIndexFileSnapShot();
}
public TableStatusReadCommittedScope(AbsoluteTableIdentifier identifier,
LoadMetadataDetails[] loadMetadataDetails, Configuration configuration) {
this.identifier = identifier;
this.configuration = configuration;
this.loadMetadataDetails = loadMetadataDetails;
}
@Override public LoadMetadataDetails[] getSegmentList() throws IOException {
try {
if (loadMetadataDetails == null) {
takeCarbonIndexFileSnapShot();
}
return loadMetadataDetails;
} catch (IOException ex) {
throw new IOException("Problem encountered while reading the Table Status file.", ex);
}
}
@Override public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
Map<String, String> indexFiles;
if (segment.getSegmentFileName() == null) {
String path =
CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
} else {
SegmentFileStore fileStore =
new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
indexFiles = fileStore.getIndexOrMergeFiles();
}
return indexFiles;
}
public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo)
throws IOException {
SegmentRefreshInfo segmentRefreshInfo;
if (updateVo != null) {
segmentRefreshInfo = new SegmentRefreshInfo(updateVo.getCreatedOrUpdatedTimeStamp(), 0);
} else {
segmentRefreshInfo = new SegmentRefreshInfo(0L, 0);
}
return segmentRefreshInfo;
}
@Override public void takeCarbonIndexFileSnapShot() throws IOException {
// Only Segment Information is updated.
// File information will be fetched on the fly according to the fecthed segment info.
this.loadMetadataDetails = SegmentStatusManager
.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
}
@Override public Configuration getConfiguration() {
return configuration;
}
@Override public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
@Override public String getFilePath() {
return identifier.getTablePath();
}
}