diff --git a/cdap-master/src/main/java/co/cask/cdap/data/tools/DatasetSpecificationUpgrader.java b/cdap-master/src/main/java/co/cask/cdap/data/tools/DatasetSpecificationUpgrader.java deleted file mode 100644 index 606762682bce..000000000000 --- a/cdap-master/src/main/java/co/cask/cdap/data/tools/DatasetSpecificationUpgrader.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright © 2015-2016 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package co.cask.cdap.data.tools; - -import co.cask.cdap.api.common.Bytes; -import co.cask.cdap.api.dataset.DatasetSpecification; -import co.cask.cdap.api.dataset.table.Table; -import co.cask.cdap.data2.datafabric.dataset.DatasetMetaTableUtil; -import co.cask.cdap.data2.util.TableId; -import co.cask.cdap.data2.util.hbase.HBaseTableUtil; -import co.cask.cdap.data2.util.hbase.ScanBuilder; -import co.cask.cdap.proto.id.NamespaceId; -import com.google.common.annotations.VisibleForTesting; -import com.google.gson.Gson; -import com.google.inject.Inject; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nullable; - -/** - * Upgrading from CDAP version < 3.3 to CDAP version 3.3. - * This requires updating the TTL property for DatasetSpecification in the DatasetInstanceMDS table. - */ -public class DatasetSpecificationUpgrader { - private static final Logger LOG = LoggerFactory.getLogger(DatasetSpecificationUpgrader.class); - private static final Gson GSON = new Gson(); - private static final String TTL_UPDATED = "table.ttl.migrated.to.seconds"; - - private final HBaseTableUtil tableUtil; - private final Configuration conf; - - @Inject - public DatasetSpecificationUpgrader(HBaseTableUtil tableUtil, Configuration conf) { - this.tableUtil = tableUtil; - this.conf = conf; - } - - /** - * Updates the TTL in the {@link co.cask.cdap.data2.datafabric.dataset.service.mds.DatasetInstanceMDS} - * table for CDAP versions prior to 3.3. - *

- * The TTL for {@link DatasetSpecification} was stored in milliseconds. - * Since the spec (as of CDAP version 3.3) is in seconds, the instance MDS entries must be updated. - * This is to be called only if the current CDAP version is < 3.3. - *

- * @throws Exception - */ - public void upgrade() throws Exception { - TableId datasetSpecId = tableUtil.createHTableId(NamespaceId.SYSTEM, DatasetMetaTableUtil.INSTANCE_TABLE_NAME); - try (HBaseAdmin hBaseAdmin = new HBaseAdmin(conf)) { - if (!tableUtil.tableExists(hBaseAdmin, datasetSpecId)) { - LOG.error("Dataset instance table does not exist: {}. Should not happen", datasetSpecId); - return; - } - } - - HTable specTable = tableUtil.createHTable(conf, datasetSpecId); - - try { - ScanBuilder scanBuilder = tableUtil.buildScan(); - scanBuilder.setTimeRange(0, HConstants.LATEST_TIMESTAMP); - scanBuilder.setMaxVersions(); - try (ResultScanner resultScanner = specTable.getScanner(scanBuilder.build())) { - Result result; - while ((result = resultScanner.next()) != null) { - Put put = new Put(result.getRow()); - for (Map.Entry>> familyMap : - result.getMap().entrySet()) { - for (Map.Entry> columnMap : familyMap.getValue().entrySet()) { - for (Map.Entry columnEntry : columnMap.getValue().entrySet()) { - Long timeStamp = columnEntry.getKey(); - byte[] colVal = columnEntry.getValue(); - // a deleted dataset can still show up here since BufferingTable doesn't actually delete, but - // writes a null value. The fact that we need to know that implementation detail here is bad. - // If we could use Table here instead of HTable, this would be hidden from us. - if (colVal == null || colVal.length == 0) { - continue; - } - String specEntry = Bytes.toString(colVal); - DatasetSpecification specification = GSON.fromJson(specEntry, DatasetSpecification.class); - DatasetSpecification updatedSpec = updateTTLInSpecification(specification, null); - colVal = Bytes.toBytes(GSON.toJson(updatedSpec)); - put.add(familyMap.getKey(), columnMap.getKey(), timeStamp, colVal); - } - } - } - // might not need to put anything if all columns were skipped because they are delete markers. - if (put.size() > 0) { - specTable.put(put); - } - } - } - } finally { - specTable.flushCommits(); - specTable.close(); - } - } - - private Map updatedProperties(Map properties) { - if (properties.containsKey(Table.PROPERTY_TTL) && !properties.containsKey(TTL_UPDATED)) { - SortedMap updatedProperties = new TreeMap<>(properties); - long updatedValue = TimeUnit.MILLISECONDS.toSeconds(Long.valueOf(updatedProperties.get(Table.PROPERTY_TTL))); - updatedProperties.put(Table.PROPERTY_TTL, String.valueOf(updatedValue)); - updatedProperties.put(TTL_UPDATED, "true"); - return updatedProperties; - } - return properties; - } - - @VisibleForTesting - DatasetSpecification updateTTLInSpecification(DatasetSpecification specification, @Nullable String parentName) { - Map properties = updatedProperties(specification.getProperties()); - List updatedSpecs = new ArrayList<>(); - for (DatasetSpecification datasetSpecification : specification.getSpecifications().values()) { - updatedSpecs.add(updateTTLInSpecification(datasetSpecification, specification.getName())); - } - - String specName = specification.getName(); - if (parentName != null && specification.getName().startsWith(parentName)) { - specName = specification.getName().substring(parentName.length() + 1); - } - return DatasetSpecification.builder(specName, - specification.getType()).properties(properties).datasets(updatedSpecs).build(); - } -} diff --git a/cdap-master/src/main/java/co/cask/cdap/data/tools/UpgradeTool.java b/cdap-master/src/main/java/co/cask/cdap/data/tools/UpgradeTool.java index 7c260a194fa6..e98847ec37c4 100644 --- a/cdap-master/src/main/java/co/cask/cdap/data/tools/UpgradeTool.java +++ b/cdap-master/src/main/java/co/cask/cdap/data/tools/UpgradeTool.java @@ -122,7 +122,6 @@ public class UpgradeTool { private final StreamStateStoreUpgrader streamStateStoreUpgrader; private final DatasetUpgrader dsUpgrade; private final QueueAdmin queueAdmin; - private final DatasetSpecificationUpgrader dsSpecUpgrader; private final ExistingEntitySystemMetadataWriter existingEntitySystemMetadataWriter; private final UpgradeDatasetServiceManager upgradeDatasetServiceManager; private final NamespaceStore nsStore; @@ -177,7 +176,6 @@ private String getDescription() { this.dsFramework = injector.getInstance(DatasetFramework.class); this.streamStateStoreUpgrader = injector.getInstance(StreamStateStoreUpgrader.class); this.dsUpgrade = injector.getInstance(DatasetUpgrader.class); - this.dsSpecUpgrader = injector.getInstance(DatasetSpecificationUpgrader.class); this.queueAdmin = injector.getInstance(QueueAdmin.class); this.nsStore = injector.getInstance(NamespaceStore.class); this.authorizationService = injector.getInstance(AuthorizationEnforcementService.class); @@ -419,9 +417,6 @@ private void performUpgrade() throws Exception { LOG.info("Upgrading AppMetadatastore..."); store.upgradeAppVersion(); - LOG.info("Upgrading Dataset Specification..."); - dsSpecUpgrader.upgrade(); - LOG.info("Upgrading stream state store table..."); streamStateStoreUpgrader.upgrade(); diff --git a/cdap-master/src/test/java/co/cask/cdap/data/tools/DatasetSpecificationUpgradeTest.java b/cdap-master/src/test/java/co/cask/cdap/data/tools/DatasetSpecificationUpgradeTest.java deleted file mode 100644 index 7cfa56ac5645..000000000000 --- a/cdap-master/src/test/java/co/cask/cdap/data/tools/DatasetSpecificationUpgradeTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright © 2015 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package co.cask.cdap.data.tools; - -import co.cask.cdap.api.dataset.DatasetSpecification; -import co.cask.cdap.api.dataset.table.Table; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test {@link co.cask.cdap.data.tools.DatasetSpecificationUpgrader} - */ -public class DatasetSpecificationUpgradeTest { - private static final String TTL_UPDATED = "table.ttl.migrated.to.seconds"; - - @Test - public void testDatasetSpecificationUpgrade() throws Exception { - // we can pass null for these parameters as those are not used - DatasetSpecificationUpgrader datasetSpecificationUpgrader = new DatasetSpecificationUpgrader(null, null); - - DatasetSpecification specification = DatasetSpecification.builder("dataset1", "Table"). - properties(ImmutableMap.of(Table.PROPERTY_TTL, "3600000")). - datasets(ImmutableList.of()).build(); - - // TTL should be converted to seconds - DatasetSpecification expected = DatasetSpecification.builder("dataset1", "Table"). - properties(ImmutableMap.of(Table.PROPERTY_TTL, "3600", TTL_UPDATED, "true")). - datasets(ImmutableList.of()).build(); - - Assert.assertEquals(expected, datasetSpecificationUpgrader.updateTTLInSpecification(specification, null)); - // calling it again to test idempotence - Assert.assertEquals(expected, datasetSpecificationUpgrader.updateTTLInSpecification(specification, null)); - - DatasetSpecification nestedSpec = DatasetSpecification.builder("dataset2", "Table"). - properties(ImmutableMap.of(Table.PROPERTY_TTL, "7200000")).datasets(specification).build(); - - // TTL should be converted to seconds for both parent and nested specs - DatasetSpecification expected2 = DatasetSpecification.builder("dataset2", "Table"). - properties(ImmutableMap.of(Table.PROPERTY_TTL, "7200", TTL_UPDATED, "true")). - datasets(DatasetSpecification.builder("dataset1", "Table"). - properties(ImmutableMap.of(Table.PROPERTY_TTL, "3600", TTL_UPDATED, "true")). - datasets(ImmutableList.of()). - build()). - build(); - Assert.assertEquals(expected2, datasetSpecificationUpgrader.updateTTLInSpecification(nestedSpec, null)); - - // calling it again to test idempotence - Assert.assertEquals(expected2, datasetSpecificationUpgrader.updateTTLInSpecification(nestedSpec, null)); - - specification = DatasetSpecification.builder("dataset3", "Table").build(); - Assert.assertEquals(specification, datasetSpecificationUpgrader.updateTTLInSpecification(specification, null)); - - // calling it again to test idempotence - Assert.assertEquals(specification, datasetSpecificationUpgrader.updateTTLInSpecification(specification, null)); - } -}