Skip to content

Commit

Permalink
Merge 7fe327a into 4a205c3
Browse files Browse the repository at this point in the history
  • Loading branch information
hit-lacus committed Nov 30, 2018
2 parents 4a205c3 + 7fe327a commit af0fd8a
Show file tree
Hide file tree
Showing 199 changed files with 1,748 additions and 1,209 deletions.
38 changes: 22 additions & 16 deletions assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
Expand Up @@ -22,12 +22,14 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;

import org.apache.commons.io.IOUtils;
Expand Down Expand Up @@ -76,7 +78,7 @@ public static void deployMetadata(String localMetaData) throws IOException {
CubeDescManager.getInstance(config()).updateCubeDesc(cube.getDescriptor());//enforce signature updating
}
}

public static void deployMetadata() throws IOException {
deployMetadata(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA);
}
Expand All @@ -92,7 +94,8 @@ public static void overrideJobJarLocations() {
private static String getPomVersion() {
try {
MavenXpp3Reader pomReader = new MavenXpp3Reader();
Model model = pomReader.read(new FileReader("../pom.xml"));
Model model = pomReader
.read(new InputStreamReader(new FileInputStream("../pom.xml"), StandardCharsets.UTF_8));
return model.getVersion();
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
Expand Down Expand Up @@ -138,9 +141,11 @@ public static void prepareTestDataForNormalCubes(String modelName) throws Except
deployTables(modelName);
}

public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords, String cubeName, StreamDataLoader streamDataLoader) throws IOException {
public static void prepareTestDataForStreamingCube(long startTime, long endTime, int numberOfRecords,
String cubeName, StreamDataLoader streamDataLoader) throws IOException {
CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
List<String> data = StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime, cubeInstance.getRootFactTable(), cubeInstance.getProject());
List<String> data = StreamingTableDataGenerator.generate(numberOfRecords, startTime, endTime,
cubeInstance.getRootFactTable(), cubeInstance.getProject());
//load into kafka
streamDataLoader.loadIntoKafka(data);
logger.info("Write {} messages into {}", data.size(), streamDataLoader.toString());
Expand All @@ -151,7 +156,8 @@ public static void prepareTestDataForStreamingCube(long startTime, long endTime,
TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, null);
StringBuilder sb = new StringBuilder();
for (String json : data) {
List<String> rowColumns = timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).get(0).getData();
List<String> rowColumns = timedJsonStreamParser
.parse(ByteBuffer.wrap(json.getBytes(StandardCharsets.UTF_8))).get(0).getData();
sb.append(StringUtils.join(rowColumns, ","));
sb.append(System.getProperty("line.separator"));
}
Expand Down Expand Up @@ -200,26 +206,26 @@ private static void deployTables(String modelName) throws Exception {

Set<TableRef> tables = model.getAllTables();
Set<String> TABLE_NAMES = new HashSet<String>();
for (TableRef tr:tables){
if (!tr.getTableDesc().isView()){
for (TableRef tr : tables) {
if (!tr.getTableDesc().isView()) {
String tableName = tr.getTableName();
String schema = tr.getTableDesc().getDatabase();
String identity = String.format("%s.%s", schema, tableName);
String identity = String.format(Locale.ROOT, "%s.%s", schema, tableName);
TABLE_NAMES.add(identity);
}
}
TABLE_NAMES.add(TABLE_SELLER_TYPE_DIM_TABLE); // the wrapper view VIEW_SELLER_TYPE_DIM need this table

// scp data files, use the data from hbase, instead of local files
File tempDir = Files.createTempDir();
String tempDirAbsPath = tempDir.getAbsolutePath();
for (String tablename : TABLE_NAMES) {
tablename = tablename.toUpperCase();
tablename = tablename.toUpperCase(Locale.ROOT);

File localBufferFile = new File(tempDirAbsPath + "/" + tablename + ".csv");
localBufferFile.createNewFile();

logger.info(String.format("get resource from hbase:/data/%s.csv", tablename));
logger.info(String.format(Locale.ROOT, "get resource from hbase:/data/%s.csv", tablename));
InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv").inputStream;
FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
IOUtils.copy(hbaseDataStream, localFileStream);
Expand All @@ -233,21 +239,21 @@ private static void deployTables(String modelName) throws Exception {

ISampleDataDeployer sampleDataDeployer = SourceManager.getSource(model.getRootFactTable().getTableDesc())
.getSampleDataDeployer();

// create hive tables
sampleDataDeployer.createSampleDatabase("EDW");
for (String tablename : TABLE_NAMES) {
logger.info(String.format("get table desc %s", tablename));
logger.info(String.format(Locale.ROOT, "get table desc %s", tablename));
sampleDataDeployer.createSampleTable(metaMgr.getTableDesc(tablename, model.getProject()));
}

// load data to hive tables
// LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
for (String tablename : TABLE_NAMES) {
logger.info(String.format("load data into %s", tablename));
logger.info(String.format(Locale.ROOT, "load data into %s", tablename));
sampleDataDeployer.loadSampleData(tablename, tempDirAbsPath);
}

// create the view automatically here
sampleDataDeployer.createWrapperView(TABLE_SELLER_TYPE_DIM_TABLE, VIEW_SELLER_TYPE_DIM);
}
Expand Down
Expand Up @@ -20,6 +20,7 @@

import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Random;

import org.apache.kylin.common.KylinConfig;
Expand Down Expand Up @@ -66,7 +67,7 @@ public static List<String> generate(int recordCount, long startTime, long endTim
kvs.clear();
kvs.put("timestamp", String.valueOf(time));
for (ColumnDesc columnDesc : tableDesc.getColumns()) {
String lowerCaseColumnName = columnDesc.getName().toLowerCase();
String lowerCaseColumnName = columnDesc.getName().toLowerCase(Locale.ROOT);
DataType dataType = columnDesc.getType();
if (dataType.isDateTimeFamily()) {
//TimedJsonStreamParser will derived minute_start,hour_start,day_start from timestamp
Expand All @@ -78,7 +79,7 @@ public static List<String> generate(int recordCount, long startTime, long endTim
int v = r.nextInt(10000);
kvs.put(lowerCaseColumnName, String.valueOf(v));
} else if (dataType.isNumberFamily()) {
String v = String.format("%.4f", r.nextDouble() * 100);
String v = String.format(Locale.ROOT, "%.4f", r.nextDouble() * 100);
kvs.put(lowerCaseColumnName, v);
}
}
Expand Down
Expand Up @@ -73,13 +73,14 @@ private void init(InputStream is) {
for (Entry<Object, Object> kv : props.entrySet()) {
String key = (String) kv.getKey();
String value = (String) kv.getValue();

if (key.equals(value))
continue; // no change

if (value.contains(key))
throw new IllegalStateException("New key '" + value + "' contains old key '" + key + "' causes trouble to repeated find & replace");

throw new IllegalStateException("New key '" + value + "' contains old key '" + key
+ "' causes trouble to repeated find & replace");

if (value.endsWith("."))
old2newPrefix.put(key, value);
else
Expand Down Expand Up @@ -122,7 +123,7 @@ public Properties check(Properties props) {
return result;
}

public OrderedProperties check(OrderedProperties props){
public OrderedProperties check(OrderedProperties props) {
OrderedProperties result = new OrderedProperties();
for (Entry<String, String> kv : props.entrySet()) {
result.setProperty(check(kv.getKey()), kv.getValue());
Expand All @@ -147,7 +148,7 @@ private static void generateFindAndReplaceScript(String kylinRepoPath, String ou
// generate sed file
File sedFile = new File(outputDir, "upgrade-old-config.sed");
try {
out = new PrintWriter(sedFile);
out = new PrintWriter(sedFile, "UTF-8");
for (Entry<String, String> e : bcc.old2new.entrySet()) {
out.println("s/" + quote(e.getKey()) + "/" + e.getValue() + "/g");
}
Expand All @@ -161,7 +162,7 @@ private static void generateFindAndReplaceScript(String kylinRepoPath, String ou
// generate sh file
File shFile = new File(outputDir, "upgrade-old-config.sh");
try {
out = new PrintWriter(shFile);
out = new PrintWriter(shFile, "UTF-8");
out.println("#!/bin/bash");
Stack<File> stack = new Stack<>();
stack.push(repoDir);
Expand All @@ -180,7 +181,7 @@ private static void generateFindAndReplaceScript(String kylinRepoPath, String ou
} finally {
IOUtils.closeQuietly(out);
}

System.out.println("Files generated:");
System.out.println(shFile);
System.out.println(sedFile);
Expand Down Expand Up @@ -211,6 +212,7 @@ else if (name.equals("KylinConfigTest.java"))
else if (name.endsWith("-site.xml"))
return false;
else
return name.endsWith(".java") || name.endsWith(".js") || name.endsWith(".sh") || name.endsWith(".properties") || name.endsWith(".xml");
return name.endsWith(".java") || name.endsWith(".js") || name.endsWith(".sh")
|| name.endsWith(".properties") || name.endsWith(".xml");
}
}
Expand Up @@ -23,6 +23,7 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
Expand Down Expand Up @@ -773,7 +774,7 @@ public Map<String, String> getHiveConfigOverride() {
}

public String getOverrideHiveTableLocation(String table) {
return getOptional("kylin.source.hive.table-location." + table.toUpperCase());
return getOptional("kylin.source.hive.table-location." + table.toUpperCase(Locale.ROOT));
}

public boolean isHiveKeepFlatTable() {
Expand Down Expand Up @@ -1173,7 +1174,6 @@ public Map<String, String> getSparkConfigOverrideWithSpecificName(String configN
return getPropertiesByPrefix("kylin.engine.spark-conf-" + configName + ".");
}


public double getDefaultHadoopJobReducerInputMB() {
return Double.parseDouble(getOptional("kylin.engine.mr.reduce-input-mb", "500"));
}
Expand Down Expand Up @@ -1708,7 +1708,7 @@ public boolean isKylinMetricsReporterForJobEnabled() {
}

public String getKylinMetricsPrefix() {
return getOptional("kylin.metrics.prefix", "KYLIN").toUpperCase();
return getOptional("kylin.metrics.prefix", "KYLIN").toUpperCase(Locale.ROOT);
}

public String getKylinMetricsActiveReservoirDefaultClass() {
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.kylin.common.lock;

import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;

public abstract class DistributedLockFactory {

Expand All @@ -35,9 +36,9 @@ public DistributedLock lockForCurrentProcess() {
private static String threadProcessAndHost() {
return Thread.currentThread().getId() + "-" + processAndHost();
}

private static String processAndHost() {
byte[] bytes = ManagementFactory.getRuntimeMXBean().getName().getBytes();
return new String(bytes);
byte[] bytes = ManagementFactory.getRuntimeMXBean().getName().getBytes(StandardCharsets.UTF_8);
return new String(bytes, StandardCharsets.UTF_8);
}
}
Expand Up @@ -18,21 +18,23 @@

package org.apache.kylin.common.metrics.common;

import java.util.Locale;

public final class MetricsNameBuilder {
public final static String METRICS = "metrics:";
public final static String PROJECT_TEMPLATE = METRICS + "project=%s";
public final static String CUBE_TEMPLATE = METRICS + "project=%s,cube=%s";

public static String buildMetricName(String prefix, String name) {
return String.format(prefix + ",name=%s", name);
return String.format(Locale.ROOT, prefix + ",name=%s", name);
}

public static String buildCubeMetricPrefix(String project) {
return String.format(PROJECT_TEMPLATE, project);
return String.format(Locale.ROOT, PROJECT_TEMPLATE, project);
}

public static String buildCubeMetricPrefix(String project, String cube) {
return String.format(CUBE_TEMPLATE, project, cube);
return String.format(Locale.ROOT, CUBE_TEMPLATE, project, cube);
}

}
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -450,7 +451,7 @@ private boolean initMetricsReporter() {
MetricsReporting reporter = null;
for (String metricsReportingName : metricsReporterNames) {
try {
reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase());
reporter = MetricsReporting.valueOf(metricsReportingName.trim().toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
LOGGER.error("Invalid reporter name " + metricsReportingName, e);
throw e;
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -100,7 +101,8 @@ public void run() {
BufferedWriter bw = null;
try {
fs.delete(tmpPath, true);
bw = new BufferedWriter(new OutputStreamWriter(fs.create(tmpPath, true)));
bw = new BufferedWriter(
new OutputStreamWriter(fs.create(tmpPath, true), StandardCharsets.UTF_8));
bw.write(json);
fs.setPermission(tmpPath, FsPermission.createImmutable((short) 0644));
} catch (IOException e) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
Expand Down Expand Up @@ -141,7 +142,7 @@ public static String cat(KylinConfig config, String path) throws IOException {
StringBuffer sb = new StringBuffer();
String line;
try {
br = new BufferedReader(new InputStreamReader(is));
br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
while ((line = br.readLine()) != null) {
System.out.println(line);
sb.append(line).append('\n');
Expand Down Expand Up @@ -202,8 +203,8 @@ public static void copy(KylinConfig srcConfig, KylinConfig dstConfig, boolean co
copy(srcConfig, dstConfig, "/", copyImmutableResource);
}

public static void copyR(ResourceStore src, ResourceStore dst, String path, TreeSet<String> pathsSkipChildrenCheck, boolean copyImmutableResource)
throws IOException {
public static void copyR(ResourceStore src, ResourceStore dst, String path, TreeSet<String> pathsSkipChildrenCheck,
boolean copyImmutableResource) throws IOException {

if (!copyImmutableResource && IMMUTABLE_PREFIX.contains(path)) {
return;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Locale;

import org.apache.commons.lang.time.FastDateFormat;
import org.apache.kylin.common.KylinVersion;
Expand All @@ -45,7 +46,7 @@ abstract public class RootPersistentEntity implements AclEntity, Serializable {

static final String DATE_PATTERN = "yyyy-MM-dd HH:mm:ss z";
static FastDateFormat format = FastDateFormat.getInstance(DATE_PATTERN);
static DateFormat df = new SimpleDateFormat(DATE_PATTERN);
static DateFormat df = new SimpleDateFormat(DATE_PATTERN, Locale.ROOT);

public static String formatTime(long millis) {
return format.format(millis);
Expand All @@ -58,7 +59,7 @@ public static String formatTime(long millis) {

@JsonProperty("last_modified")
protected long lastModified;

// if cached and shared, the object MUST NOT be modified (call setXXX() for example)
protected boolean isCachedAndShared = false;

Expand Down Expand Up @@ -101,7 +102,7 @@ public void setLastModified(long lastModified) {
public void updateRandomUuid() {
setUuid(RandomUtil.randomUUID().toString());
}

public boolean isCachedAndShared() {
return isCachedAndShared;
}
Expand Down

0 comments on commit af0fd8a

Please sign in to comment.