-
Notifications
You must be signed in to change notification settings - Fork 994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PHOENIX-4009 Run UPDATE STATISTICS command by using MR integration on… #419
PHOENIX-4009 Run UPDATE STATISTICS command by using MR integration on… #419
Conversation
First of all, apologies for loooong PR. (Most of it is refactoring but still its hard to review) Here's the high level idea
Finally, This is the v1 version for some initial feedback. Please comment wherever its not clear. Coming up: |
@twdsilva @ChinmaySKulkarni @joshelser @chrajeshbabu @dbwong @BinShi-SecularBird Please review. |
@karanmehta93 can you provide URLs to some design docs/basic stats docs? Will help understand the overall idea much better for someone not so familiar with stats. |
Design doc: https://salesforce.quip.com/fjMhA2bbBkK4 (is also linked to Jira) |
@gjacoby126 @vincentpoon If you guys have some time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks good to me.
@@ -154,7 +154,8 @@ | |||
|
|||
public enum SchemaType { | |||
TABLE, | |||
QUERY; | |||
QUERY, | |||
UPDATE_STATS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not very familiar with the TABLE/QUERY enum, whats the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also not quite sure about the exact use case. The SchemaType.TABLE enum value is only used by phoenix-hive module. The other option is SchemaType.QUERY which is default option to run regular queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the code. THE QUERY is generally used for phoenix map reduce jobs. I doubt that we should reuse SchemaType and add UPDATE_STATS here, because now for every place where it checks whether schema type is QUERY you might need to check if it can be applied to UPDATE_STATS too, as UPDATE_STATS is also select statement and MR job. I prefer to add one more new property in configuration to indicate whether this is UPDATE STATS or not instead of adding a new UPDATE_STATS enum type to SchemaType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that adding a new SchemaType
is cleaner way to handle this. I agree that everywhere we check for the SchemaType
, we need to do for this one as well, but we don't have it in many places and it provides an insight to the user about the type of MR job it is. If we add aggregation support in future, this can also be extended similarly. The name SchemaType
is not really appropriate name here, however I didn't change it since it is public enum. It should be something like PhoenixMRJobType
.
Adding a config property would not eliminate the checks that we have through the code and I feel that it would make it harder to manipulate in future if new stuff is added. @BinShi-SecularBird Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is that the meanings of QUERY and UPDATE_STATS have overlap -- QUERY indicates Phoenix MR jobs and UPDATE_STAS indicates Phoenix MR jobs plus update statistics, which will bring unclean code to the code base. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you check the usages of SchemaType in the whole code base, you'll find SchemaType is mainly used by Phoenix-Pig and has its specific meaning in Phoenix-Pig. TABLE and QUERY maps to two PhoenixHBaseLoader scheme "hbase://table/" and "hbase://query", and it also maps to corresponding resource schema in Phoenix-Pig. That's why it seems to be a bad idea to define new schema type and pollute its usage in Phoenix_Pig. I prefer to add a new configuration or new Parameter in PhoenixMapReduceUtil.setInput to indicate this is UPDATE_STATISTICS MR job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BinShi-SecularBird Although its primarily being used for phoenix-pig module, phoenix-core also uses it to determine the query it needs to run for MR job. I want to extend the meaning of SchemaType
enum to suggest this as well since that can be extended again if we have new types/use cases in future. Adding a configuration each time doesn't seem good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my last concern about your change. I object to reuse SchemaType for indicating UPDATE_STATISTICS MR job, because we need to keep logical independence of data structure and data type. I don't want to block your progress on your work. If you insist on making this change, you can ignore my comment and proceed with commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, we will add a new enum named PhoenixMRJobType
to indicate that it is a stats collection job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@BinShi-SecularBird Updated the patch with the MRJobType
enum.
@@ -23,6 +23,7 @@ | |||
|
|||
import java.util.Map; | |||
|
|||
import org.apache.phoenix.jdbc.PhoenixStatement.Operation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revert these changes
@karanmehta93 I'll try and take a look tonight. |
import java.util.Collection; | ||
import java.util.Map; | ||
|
||
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_TASK_HANDLING_INTERVAL_MS; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import, delete it.
new Object[][] { { false, null, false, false }, { false, null, true, false } }); | ||
new Object[][] { | ||
// Collect stats on snapshots using UpdateStatisticsTool | ||
{ false, true }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the second column "collectStatsOnSnapshot" in { false, true } set to true? In my understanding, this is DisableStats test, so it should be set to false, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The disabled here is for namespaces and not stats, hence it is that way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see.
@@ -20,23 +20,29 @@ | |||
import java.util.Arrays; | |||
import java.util.Collection; | |||
|
|||
import org.apache.phoenix.schema.stats.StatsCollectorIT; | |||
import org.apache.phoenix.schema.stats.BaseStatsCollectorIT; | |||
import org.apache.phoenix.util.TestUtil; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"import org.apache.phoenix.util.TestUtil;" is unused import. Delete it.
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; | ||
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; | ||
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; | ||
import org.apache.phoenix.query.QueryServices; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused import "import org.apache.phoenix.query.QueryServices;" at line 46
|
||
@Override | ||
public int run(String[] args) throws Exception { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the empty line at line 85
"Phoenix Table Name"); | ||
private static final Option SNAPSHOT_NAME_OPTION = new Option("s", "snapshot", true, | ||
"HBase Snapshot Name"); | ||
private static final Option RESTORE_DIR_OPTION = new Option("d", "restore-dir", true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we provide default restore directory or root directory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if there is an easy way of doing that. Snapshots cannot be restored to hbase.rootDir
, which is the only configuration parameter I know for getting HDFS path. I will dig up a bit to see if default option is possible here since that will ease its operationalization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding restore directory, is it a full path or relative path to the root path? Can it be a relative path to ease its operationalization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we cannot use any path relative to hbase.rootDir
according to the code. However after digging in, I figured out that we can use fs.defaultFS
config to find the hdfs.rootDir
and append /tmp
to it, thus ensuring that we restore it in that directory always. This will help ease operationalization. Does this seem fine?
Relevant code from RestoreSnapshotHelper
class
} else if (restoreDir.toUri().getPath().startsWith(rootDir.toUri().getPath())) {
throw new IllegalArgumentException("Restore directory cannot be a sub directory of HBase root directory. RootDir: " + rootDir + ", restoreDir: " + restoreDir);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the patch to reflect this new change.
@@ -108,6 +109,7 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx | |||
final PhoenixInputSplit pSplit = (PhoenixInputSplit)split; | |||
final List<Scan> scans = pSplit.getScans(); | |||
try { | |||
LOG.info("Generating iterators for " + scans.size() + " scans in keyrange: " + pSplit.getKeyRange()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this log or change it to debug log? The same question for the other two info logs you added below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Record reader is initialized once per mapper job and the map jobs are limited by number of regions. Also these logs will be distributed on several machines and not one. It would be good to have some insight here on the iterators each reader is trying to generate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, that's fine then.
Preconditions.checkNotNull(selectStatement); | ||
|
||
final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); | ||
// Optimize the query plan so that we potentially use secondary indexes | ||
final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement); | ||
final Scan scan = queryPlan.getContext().getScan(); | ||
|
||
if (schemaType == SchemaType.UPDATE_STATS) { | ||
StatisticsUtil.setScanAttributes(scan, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you check whether you need to set "scan.setAttribute(RUN_UPDATE_STATS_ASYNC_ATTRIB, FALSE_BYTES);" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scan.setAttribute(RUN_UPDATE_STATS_ASYNC_ATTRIB, FALSE_BYTES);
attribute is applicable fo case when stats are being collected using UPDATE STATISTICS SQL
. If false
, the client has to wait till it runs on all the machines. If true
, the client is given an ACK back instantly and RS continues in the background. Hence it is not applicable here.
long clientTimeStamp, byte[] family, byte[] gp_width_bytes, | ||
byte[] gp_per_region_bytes) { | ||
super(conf, region, tableName, | ||
clientTimeStamp, family, gp_width_bytes, gp_per_region_bytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the above two lines (59, 60) need to be two lines?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it doesn't look clean, do you have any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, I want that the instantiation of this class also happens through the StatisticsCollectorFactory
instead of the direct constructor call in the SnapshotScanner
. Any ideas on that one?
|
||
/** | ||
* Determine the GPW for statistics collection for the table. | ||
* The order of priority is as follows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you highlight the the order of priority is listed from high to low as follows?
if (guidepostWidth >= 0) { | ||
this.guidePostDepth = guidepostWidth; | ||
LOG.info("Guide post depth determined from SYSTEM.CATALOG: " + guidePostDepth); | ||
} else { | ||
// Last use global config value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment "// Last use global config value" isn't necessary any more as you have already commented the order of priority on the function.
throw e; | ||
} | ||
for (ImmutableBytesPtr fam : fams) { | ||
if (delete) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from line 228 to line 237, you removed the conditional debug logs and add info log, not very sure this is better and necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe it is better to still use debug log instead of info log to record mutation size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the log lines were changed to INFO since they will be produced only once per mapper (and a mapper would correspond to a region). These lines would really help us in debugging if something goes wrong. We can also export these as metrics if required.
} | ||
|
||
@Override | ||
protected long getGuidePostDepthFromSystemCatalog() throws IOException, SQLException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MapperStatisticsCollector. getGuidePostDepthFromSystemCatalog() and RegionServerStatisticsCollector.getGuidePostDepthFromSystemCatalog() still have a lot of duplicate code (almost the same), can we avoid duplication in these two functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree with you on that point, but it was left as is deliberately. The portions of code are bit complicated and I was not sure if I was covering every case properly. What do you suggest? Shall we refactor it in a future PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked MapperStatisticsCollector. getGuidePostDepthFromSystemCatalog() and RegionServerStatisticsCollector.getGuidePostDepthFromSystemCatalog(), it looks like they are exactly the same? Can we just move this function to the base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They look almost same, however there is a subtle difference in how we get Table
object to access the SYSTEM.CATALOG
table. This difference arises since mapper has a different way of establishing connection to hbase cluster as compared to region server.
I totally agree with you that more refactoring is better, its just that this is a single unit of code that I didn't want to partition and hence I kept it that way. I understand that it is possible to standardize the access of Table
api and change it only for different implementations.
We have a similar issue for StatisticsWriter
class as well, where we have two separate static methods that instantiate newWriter
. We could combine them in one and improve there as well, its just something by choice that I haven't done as I felt that we can carry those out in a new Jira.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we get the table object in the caller of getGuidePostDepthFromSystemCatalog() and pass the object to the function. This is a small factor, and can we do it in this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just think we can avoid a large amount of duplicated production code with small effort, so we should do it in this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, that makes sense. Thanks @BinShi-SecularBird , I have updated the patch accordingly.
I didn't see my concern "if time since last update is less than certain threshold, the job doesn't need to update the regions' stats again. Suppose one MR job failed but some regions still get updated, the rerun job only needs to update the regions that the first job didn't update." described in the PHOENIX-4009 being addressed in this change, and I still think it should be addressed. |
The first aspect should be fairly simple to add. We can update it as part of this Jira or the PHOENIX-5091. The second aspect of re-running only the necessary tasks is bit complicated. The mappers would retry for particular region when they fail (upto limit of max attempts). However I dont really feel the need for that optimization as of now. Let me know what your thoughts are. |
|
||
private static final Option TABLE_NAME_OPTION = new Option("t", "table", true, | ||
"Phoenix Table Name"); | ||
private static final Option SNAPSHOT_NAME_OPTION = new Option("s", "snapshot", true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a given table, how do we know which snapshot to take and what's the name of snapshot? If we always use the latest snapshot or always take a new snapshot, can we avoid this SNAPSHOT_NAME_OPTION thus reduce operation cost?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be addressed soon as part of PHOENIX-5091.
The PR will be put up once this gets in.
I didn't mean "The second aspect of re-running only the necessary tasks". What I mean is -- assume the first MR job scheduled 100 mappers, 90 of them succeeded and 10 them failed, so the first job successfully updated 90 regions but the whole job failed. The second job (the retry job, assume it succeed) still schedule 100 mappers but only 10 mapper should actually update the stats on the 10 regions which failed in the first job and the other 90 mapper should skip stats update and succeed. |
this is a common case - we always have some jobs failing after they finish most of work (updated > 90% regions) but fail due to some reason, the retry job should just do minimal work instead of update stats of the whole table again. |
This should not be common case. Common case should be that few mappers can fail due to some reason. We have retries for that. The job should NOT fail as a whole. Even MR framework doesn't persist data between jobs and can get really tricky depending on use cases. Better way is to make the job idempotent so that re-run doesn't affect it. Also it is hard in this case since we don't know what changed between retries. If snapshot name changed, that can potentially affect region boundaries. We would need another level of orchestration that persists stats MR job information in some table and we look it up before running the current job. These cases would be difficult to handle. I would prefer that we try to avoid that complexity here. |
There are always some jobs (might be a few in a day) failing, because few mappers in a job continuously failing and even retries can't get over the issue which causes the whole job to fail -- this is the case I'm talking about, and it happens more frequently when some bad thing happen in the cluster. In this case, I want the retry job to skip the regions whose stats have already been updated and only do minimal work, so it wouldn't worsen the bad situation in the cluster and we can easily catch up to avoid missing SLA. As the current phase, I'm ok to proceed without this skip check. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general looks good to me. I still would prefer a larger refactoring of the statistics collector and subclasses. Also for some of these new classes can we please create unit tests?
} else { | ||
long guidepostWidth = getGuidePostDepthFromSystemCatalog(getHTableForSystemCatalog()); | ||
if (guidepostWidth >= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: > 0, as 0 is logically a bad width
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having it 0 disables the stats collection here. So it stats were already present previously, this task will attempt to delete all of them from SYSTEM.STATS table. Not reading up 0 as legal value would mean the fallback to default value (which can potentially cause issue).
When a table is created, the GUIDE_POSTS_WIDTH
column in SYSTEM.CATALOG
is null, so it always fall back to default global value (and sometimes generates 1 GPW in the table). Having it to 0 explicitly disables it and hence it is put up here.
I will add a comment here at the top.
* Implementation for DefaultStatisticsCollector when running inside Hadoop MR job mapper | ||
* Triggered via UpdateStatisticsTool class | ||
*/ | ||
public class MapperStatisticsCollector extends DefaultStatisticsCollector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline I still prefer an approach of injecting a statswriter if possible to this inheritance pattern.
|
||
@Override | ||
public void write(PreparedStatement preparedStatement) throws SQLException { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Comment for do nothing intentionally.
@@ -62,7 +62,7 @@ public static ResourceSchema getResourceSchema(final Configuration configuration | |||
Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration"); | |||
final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration); | |||
columns = function.apply(sqlQuery); | |||
} else { | |||
} else if (SchemaType.TABLE.equals(schemaType)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Earlier you used == for enum comparison, is there a standard for Phoenix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I agree that we should standardize it. There is no pre-defined standard here and both of them work in theory. Any particular prefs you suggest for one over other? Also can you point me to other location (if you remember)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer == as you won't accidentally order the equality wrong and risk a NPE.
I understand your concern and I also agree that it can happen often. As you already pointed out, the simplest way to combat that is to retry the whole job again (or at certain intervals) and hope that it eventually succeeds. If not, we can raise appropriate alerts using monitoring infrastructure.
I understand the idea. Determining which regions data is missing from SYSTEM.STATS table is not possible (as part of this code) since the snapshot might have changed between the two jobs. |
@karanmehta93, the change looks good to me. Signed off. |
…Writer can be injected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like some improvement and got rid of some of the new classes which is a great simplification.
Thanks @BinShi-SecularBird @dbwong and @twdsilva for the review. I will push it to this branch. |
… snapshots