Skip to content

Commit

Permalink
KYLIN-1660 Streaming/kafka config not match with table name
Browse files Browse the repository at this point in the history
  • Loading branch information
janzhongi committed May 9, 2016
1 parent 2fc706c commit 3542dd2
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
Expand Down Expand Up @@ -109,7 +110,7 @@ public StreamingRequest saveStreamingConfig(@RequestBody StreamingRequest stream
tableDesc.setUuid(UUID.randomUUID().toString());
MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
metaMgr.saveSourceTable(tableDesc);
cubeMgmtService.syncTableToProject(new String[]{tableDesc.getName()}, project);
cubeMgmtService.syncTableToProject(new String[]{tableDesc.getIdentity()}, project);
} catch (IOException e) {
throw new BadRequestException("Failed to add streaming table.");
}
Expand Down Expand Up @@ -231,6 +232,11 @@ private TableDesc deserializeTableDesc(StreamingRequest streamingRequest) {
logger.error("Failed to deal with the request.", e);
throw new InternalErrorException("Failed to deal with the request:" + e.getMessage(), e);
}

String [] dbTable = HadoopUtil.parseHiveTableName(desc.getName());
desc.setName(dbTable[1]);
desc.setDatabase(dbTable[0]);
desc.getIdentity();
return desc;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public List<TableDesc> getHiveTables(@RequestParam(value = "ext", required = fal
* @return Table metadata array
* @throws IOException
*/
@RequestMapping(value = "/{tableName}", method = { RequestMethod.GET })
@RequestMapping(value = "/{tableName}/load", method = { RequestMethod.GET })
@ResponseBody
public TableDesc getHiveTable(@PathVariable String tableName) {
return cubeMgmtService.getMetadataManager().getTableDesc(tableName);
Expand Down
49 changes: 2 additions & 47 deletions webapp/app/js/controllers/cubeEdit.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@


KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $location, $templateCache, $interpolate, MessageService, TableService, CubeDescService, CubeService, loadingRequest, SweetAlert, $log, cubeConfig, CubeDescModel, MetaModel, TableModel, ModelDescService, modelsManager, cubesManager, ProjectModel, StreamingModel, StreamingService) {
var STREAMING_SUFFIX = "_streaming";
$scope.cubeConfig = cubeConfig;

$scope.metaModel = {};
$scope.modelsManager = modelsManager;

Expand Down Expand Up @@ -168,9 +166,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
// ~ init
if ($scope.isEdit = !!$routeParams.cubeName) {

$scope.streamingMeta = StreamingModel.createStreamingConfig();
$scope.kafkaMeta = StreamingModel.createKafkaConfig();

CubeDescService.query({cube_name: $routeParams.cubeName}, function (detail) {
if (detail.length > 0) {
$scope.cubeMetaFrame = detail[0];
Expand All @@ -180,19 +175,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
if (!modelsManager.getModels().length) {
ModelDescService.query({model_name: $scope.cubeMetaFrame.model_name}, function (_model) {
$scope.metaModel.model = _model;

StreamingService.getConfig({table:$scope.metaModel.model.fact_table}, function (kfkConfigs) {
if(!!kfkConfigs[0]){
$scope.cubeState.isStreaming = true;
}
else{
return;
}
$scope.streamingMeta = kfkConfigs[0];
StreamingService.getKfkConfig({kafkaConfigName:$scope.streamingMeta.name}, function (streamings) {
$scope.kafkaMeta = streamings[0];
})
})
});
}

Expand All @@ -213,9 +195,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
//$scope.cubeMetaFrame.model_name = modelName;
$scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true);

$scope.streamingMeta = StreamingModel.createStreamingConfig();
$scope.kafkaMeta = StreamingModel.createKafkaConfig();

}


Expand Down Expand Up @@ -244,15 +223,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio

$scope.state.cubeSchema = angular.toJson($scope.cubeMetaFrame, true);

//streaming meta
if($scope.cubeState.isStreaming == true){
$scope.streamingMeta.cubeName = $scope.cubeMetaFrame.name;
$scope.streamingMeta.name = $scope.cubeMetaFrame.name+STREAMING_SUFFIX;
$scope.kafkaMeta.name = $scope.cubeMetaFrame.name+STREAMING_SUFFIX;
}



};

$scope.cubeResultTmpl = function (notification) {
Expand All @@ -270,15 +240,6 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
return;
}

if (!$scope.cubeState.isStreaming) {
$scope.state.streamingCube = false;
} else {
$scope.state.streamingCube = true;
$scope.state.streamingMeta = angular.toJson($scope.streamingMeta, true);
$scope.state.kafkaMeta = angular.toJson($scope.kafkaMeta, true);
}


SweetAlert.swal({
title: '',
text: 'Are you sure to save the cube ?',
Expand All @@ -295,10 +256,7 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
CubeService.update({}, {
cubeDescData: $scope.state.cubeSchema,
cubeName: $routeParams.cubeName,
project: $scope.state.project,
streamingCube: $scope.state.streamingCube,
streamingData: $scope.state.streamingMeta,
kafkaData: $scope.state.kafkaMeta
project: $scope.state.project
}, function (request) {
if (request.successful) {
$scope.state.cubeSchema = request.cubeDescData;
Expand Down Expand Up @@ -337,10 +295,7 @@ KylinApp.controller('CubeEditCtrl', function ($scope, $q, $routeParams, $locatio
} else {
CubeService.save({}, {
cubeDescData: $scope.state.cubeSchema,
project: $scope.state.project,
streamingCube: $scope.state.streamingCube,
streamingData: $scope.state.streamingMeta,
kafkaData: $scope.state.kafkaMeta
project: $scope.state.project
}, function (request) {
if (request.successful) {
$scope.state.cubeSchema = request.cubeDescData;
Expand Down
11 changes: 0 additions & 11 deletions webapp/app/js/controllers/cubeSchema.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
if (UserService.hasRole("ROLE_ADMIN")) {
$scope.wizardSteps.push({title: 'Advanced Setting', src: 'partials/cubeDesigner/advanced_settings.html', isComplete: false,form:'cube_setting_form'});
}
//$scope.wizardSteps.push({title: 'Streaming', src: 'partials/cubeDesigner/streamingConfig.html', isComplete: false,form:'cube_streaming_form'});
$scope.wizardSteps.push({title: 'Configuration Overwrites ', src: 'partials/cubeDesigner/cubeOverwriteProp.html', isComplete: false,form:'cube_overwrite_prop_form'});
$scope.wizardSteps.push({title: 'Overview', src: 'partials/cubeDesigner/overview.html', isComplete: false,form:null});

Expand Down Expand Up @@ -147,16 +146,6 @@ KylinApp.controller('CubeSchemaCtrl', function ($scope, QueryService, UserServic
}
$scope.metaModel.model=modelsManager.getModel($scope.cubeMetaFrame.model_name);

StreamingService.getConfig({cubeName:$scope.cubeMetaFrame.name}, function (kfkConfigs) {
if(!!kfkConfigs[0]&&kfkConfigs[0].cubeName == $scope.cubeMetaFrame.name){
$scope.cubeState.isStreaming = true;
$scope.streamingMeta = kfkConfigs[0];
StreamingService.getKfkConfig({kafkaConfigName:$scope.streamingMeta.name}, function (streamings) {
$scope.kafkaMeta = streamings[0];
})
}
})

}
});

Expand Down
50 changes: 39 additions & 11 deletions webapp/app/js/controllers/cubes.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@

'use strict';

KylinApp
.controller('CubesCtrl', function ($scope, $q, $routeParams, $location, $modal, MessageService, CubeDescService, CubeService, JobService, UserService, ProjectService, SweetAlert, loadingRequest, $log, cubeConfig, ProjectModel, ModelService, MetaModel, CubeList,modelsManager,cubesManager,StreamingList,kylinConfig) {
KylinApp.controller('CubesCtrl', function ($scope, $q, $routeParams, $location, $modal, MessageService, CubeDescService, CubeService, JobService, UserService, ProjectService, SweetAlert, loadingRequest, $log, cubeConfig, ProjectModel, ModelService, MetaModel, CubeList,modelsManager,TableService) {

$scope.cubeConfig = cubeConfig;
$scope.cubeList = CubeList;

$scope.modelsManager = modelsManager;
//$scope.cubesManager = cubesManager;

$scope.listParams = {
cubeName: $routeParams.cubeName,
projectName: $routeParams.projectName
Expand Down Expand Up @@ -82,13 +79,44 @@ KylinApp
$scope.loading = true;

return CubeList.list(queryParam).then(function (resp) {
angular.forEach($scope.cubeList.cubes,function(item,index){
var result = StreamingList.checkCubeExist(item.name);
if(result.exist == true){
item.streaming = result.streaming;
var kfkConfig = StreamingList.getKafkaConfig(result.streaming.name);
item.kfkConfig = kfkConfig;
}
angular.forEach($scope.cubeList.cubes,function(cube,index){
cube.streaming = false;
CubeDescService.query({cube_name: cube.name}, {}, function (detail) {
if (detail.length > 0 && detail[0].hasOwnProperty("name")) {
cube.detail = detail[0];
ModelService.list({projectName:$scope.projectModel.selectedProject,modelName:cube.detail.model_name}, function (_models) {
if(_models && _models.length){
for(var i=0;i<=_models.length;i++){
if(_models[i].name == cube.detail.model_name){
cube.model = _models[i];
var factTable = cube.model.fact_table;
TableService.get({tableName:factTable},function(table){
if(table && table.source_type == 1){
cube.streaming = true;
}
})
break;
}
}
}

})
//cube.model = modelsManager.getModel(cube.detail.model_name);

defer.resolve(cube.detail);

} else {
SweetAlert.swal('Oops...', "No cube detail info loaded.", 'error');
}
}, function (e) {
if (e.data && e.data.exception) {
var message = e.data.exception;
var msg = !!(message) ? message : 'Failed to take action.';
SweetAlert.swal('Oops...', msg, 'error');
} else {
SweetAlert.swal('Oops...', "Failed to take action.", 'error');
}
});
})

$scope.loading = false;
Expand Down
2 changes: 1 addition & 1 deletion webapp/app/js/services/tables.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
KylinApp.factory('TableService', ['$resource', function ($resource, config) {
return $resource(Config.service.url + 'tables/:tableName/:action/:database', {}, {
list: {method: 'GET', params: {}, cache: true, isArray: true},
get: {method: 'GET', params: {}, isArray: false},
get: {method: 'GET', params: {action:'load'}, isArray: false},
getExd: {method: 'GET', params: {action: 'exd-map'}, isArray: false},
reload: {method: 'PUT', params: {action: 'reload'}, isArray: false},
loadHiveTable: {method: 'POST', params: {}, isArray: false},
Expand Down
8 changes: 3 additions & 5 deletions webapp/app/partials/cubes/cubes.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
</th>
<th>Actions</th>
<th ng-if="userService.hasRole('ROLE_ADMIN')">Admins</th>
<th></th>
<th>Streaming</th>
</tr>
</thead>
<!--Body-->
Expand Down Expand Up @@ -109,11 +109,9 @@
</ul>
</div>
</td>
<td ng-if="cube.streaming">
<label class="badge label-info" style="cursor:pointer;">STREAMING</label>
<td>
<label class="badge" ng-class="{'label-info':cube.streaming==true}" style="cursor:pointer;">{{cube.streaming}}</label>
</td>
<td ng-if="!cube.streaming">
</td>
</tr>
<tr ng-show="cube.showDetail">
<td colspan="9" style="padding: 10px 30px 10px 30px;">
Expand Down
3 changes: 2 additions & 1 deletion webapp/app/partials/tables/loadStreamingTable.html
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<div class="col-xs-6" ng-show="table.schemaChecked">
<ol class="text-info" style="margin-bottom: 30px;">
<li>Choose 'timestamp' type column for streaming table.</li>
<li>By default, system will choose 'Default' as database, you can specify database like this 'database.table'</li>
<li>derived time dimensions are calculated from timestamp field to help analysis against different time granularities.</li>
</ol>
<form class="form-horizontal" name="form.setStreamingSchema" novalidate>
Expand All @@ -59,7 +60,7 @@

<div class="col-xs-8"
ng-class="{'has-error':form.setStreamingSchema.streamingObject.$invalid && (form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$submitted)}">
<input type="text" name="streamingObject" required="" ng-model="table.name" class="form-control"/>
<input type="text" name="streamingObject" placeholder="database.table" required="" ng-model="table.name" class="form-control"/>
<small class="help-block"
ng-show="form.setStreamingSchema.streamingObject.$error.required&&(form.setStreamingSchema.streamingObject.$dirty||form.setStreamingSchema.$submitted)">
Table name is required.
Expand Down

0 comments on commit 3542dd2

Please sign in to comment.