Skip to content

Commit

Permalink
Fix process hanging bug. Add hive field ETL process.
Browse files Browse the repository at this point in the history
  • Loading branch information
SunZhaonan authored and jbai committed Mar 17, 2016
1 parent aff8f32 commit a0b7cb9
Show file tree
Hide file tree
Showing 13 changed files with 496 additions and 164 deletions.
2 changes: 1 addition & 1 deletion backend-service/app/actors/CmdUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static String generateCMD(EtlJobName etlJobName, int refId, long whEtlExe
}

String classPath = System.getProperty("java.class.path");
sb.append("-cp").append(" \"").append(classPath).append("\" ");
sb.append("-cp").append(" '").append(classPath).append("' ");
sb.append("metadata.etl.Launcher");

return sb.toString();
Expand Down
14 changes: 12 additions & 2 deletions backend-service/app/actors/EtlJobActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import akka.actor.UntypedActor;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand Down Expand Up @@ -52,14 +53,22 @@ public void onReceive(Object message)

process = Runtime.getRuntime().exec(cmd);


InputStream stdout = process.getInputStream();
InputStreamReader isr = new InputStreamReader(stdout);
BufferedReader br = new BufferedReader(isr);
String line = null;
while ( (line = br.readLine()) != null) {
Logger.info(line);
}

// wait until this process finished.
int execResult = process.waitFor();

// if the process failed, log the error and throw exception
if (execResult > 0) {
BufferedReader br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
br = new BufferedReader(new InputStreamReader(process.getErrorStream()));
String errString = "Error Details:\n";
String line;
while((line = br.readLine()) != null)
errString = errString.concat(line).concat("\n");
Logger.error("*** Process + " + getPid(process) + " failed, status: " + execResult);
Expand All @@ -78,6 +87,7 @@ public void onReceive(Object message)
ActorRegistry.treeBuilderActor.tell("flow", getSelf());
}
} catch (Throwable e) { // catch all throwable at the highest level.
e.printStackTrace();
Logger.error("ETL job {} got a problem", msg.toDebugString());
if (process.isAlive()) {
process.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ private static void scanPathHelper(Path path, FileSystem scanFs)
} else if (scanFs.listStatus(n).length > 0 || scanFs.getContentSummary(n).getLength() > 0) {
scanPath(n);
} else {
System.err.println("* scanPath() size = 0: " + curPath);
logger.info("* scanPath() size = 0: " + curPath);
}
} catch (AccessControlException e) {
System.err.println("* scanPath(e) Permission denied. Cannot access: " + curPath +
logger.error("* scanPath(e) Permission denied. Cannot access: " + curPath +
" owner:" + fstat.getOwner() + " group: " + fstat.getGroup() + "with current user " +
UserGroupInformation.getCurrentUser());
// System.err.println(e);
Expand Down Expand Up @@ -277,7 +277,7 @@ private static void traceTableInfo(Path path, FileSystem tranceFs)
}
}
} catch (AccessControlException e) {
System.err.println("* TblInfo() Cannot access " + fstat.getPath().toUri().getPath());
logger.error("* TblInfo() Cannot access " + fstat.getPath().toUri().getPath());
return;
}

Expand All @@ -286,7 +286,7 @@ private static void traceTableInfo(Path path, FileSystem tranceFs)
if (datasetSchemaRecord != null) {
schemaFileWriter.append(datasetSchemaRecord);
} else {
System.err.println("* Cannot resolve the schema of " + fullPath);
logger.error("* Cannot resolve the schema of " + fullPath);
}

SampleDataRecord sampleDataRecord = fileAnalyzerFactory.getSampleData(fstat.getPath(), path.toUri().getPath());
Expand Down
2 changes: 2 additions & 0 deletions metadata-etl/src/main/java/metadata/etl/EtlJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ public abstract void load()

public void setup()
throws Exception {
// redirect error to out
System.setErr(System.out);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class HiveViewDependency {
static LineageInfo lineageInfoTool = new LineageInfo();

public static String[] getViewDependency(String hiveQl) {
if (hiveQl == null)
return new String[]{};

try {
lineageInfoTool.getLineageInfo(hiveQl);
TreeSet<String> inputs = lineageInfoTool.getInputTableList();
Expand Down
144 changes: 144 additions & 0 deletions metadata-etl/src/main/resources/jython/AvroColumnParser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/usr/bin/env python
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#

import json

class AvroColumnParser:
"""
This class is used to parse the avro schema, get a list of columns inside it.
As avro is nested, we use a recursive way to parse it.
Currently used in HDFS avro file schema parsing and Hive avro schema parsing.
"""

def __init__(self, avro_schema, urn = None):
"""
:param avro_schema: json of schema
:param urn: optional, could contain inside schema
:return:
"""
self.sort_id = 0
if not urn:
self.urn = avro_schema['uri'] if 'uri' in avro_schema else None
else:
self.urn = urn
self.result = []
self.fields_json_to_csv(self.result, '', avro_schema['fields'])

def get_column_list_result(self):
"""
:return:
"""
return self.result

def fields_json_to_csv(self, output_list_, parent_field_path, field_list_):
"""
Recursive function, extract nested fields out of avro.
"""
parent_id = self.sort_id

for f in field_list_:
self.sort_id += 1

o_field_name = f['name']
o_field_data_type = ''
o_field_data_size = None
o_field_nullable = 'N'
o_field_default = ''
o_field_namespace = ''
o_field_doc = ''
effective_type_index_in_type = -1

if f.has_key('namespace'):
o_field_namespace = f['namespace']

if f.has_key('default') and type(f['default']) != None:
o_field_default = f['default']

if not f.has_key('type'):
o_field_data_type = None
elif type(f['type']) == list:
i = effective_type_index = -1
for data_type in f['type']:
i += 1 # current index
if type(data_type) is None or (data_type == 'null'):
o_field_nullable = 'Y'
elif type(data_type) == dict:
o_field_data_type = data_type['type']
effective_type_index_in_type = i

if data_type.has_key('namespace'):
o_field_namespace = data_type['namespace']
elif data_type.has_key('name'):
o_field_namespace = data_type['name']

if data_type.has_key('size'):
o_field_data_size = data_type['size']
else:
o_field_data_size = None

else:
o_field_data_type = data_type
effective_type_index_in_type = i
elif type(f['type']) == dict:
o_field_data_type = f['type']['type']
else:
o_field_data_type = f['type']
if f.has_key('attributes') and f['attributes'].has_key('nullable'):
o_field_nullable = 'Y' if f['attributes']['nullable'] else 'N'
if f.has_key('attributes') and f['attributes'].has_key('size'):
o_field_data_size = f['attributes']['size']

if f.has_key('doc'):
if len(f['doc']) == 0 and f.has_key('attributes'):
o_field_doc = json.dumps(f['attributes'])
else:
o_field_doc = f['doc']
elif f.has_key('comment'):
o_field_doc = f['comment']

output_list_.append(
[self.urn, self.sort_id, parent_id, parent_field_path, o_field_name, o_field_data_type, o_field_nullable,
o_field_default, o_field_data_size, o_field_namespace,
o_field_doc.replace("\n", ' ') if o_field_doc is not None else None])

# check if this field is a nested record
if type(f['type']) == dict and f['type'].has_key('fields'):
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
self.fields_json_to_csv(output_list_, current_field_path, f['type']['fields'])
elif type(f['type']) == dict and f['type'].has_key('items') and type(f['type']['items']) == dict and f['type']['items'].has_key('fields'):
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
self.fields_json_to_csv(output_list_, current_field_path, f['type']['items']['fields'])

if effective_type_index_in_type >= 0 and type(f['type'][effective_type_index_in_type]) == dict:
if f['type'][effective_type_index_in_type].has_key('items') and type(
f['type'][effective_type_index_in_type]['items']) == list:

for item in f['type'][effective_type_index_in_type]['items']:
if type(item) == dict and item.has_key('fields'):
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
self.fields_json_to_csv(output_list_, current_field_path, item['fields'])
elif f['type'][effective_type_index_in_type].has_key('items') and type(f['type'][effective_type_index_in_type]['items'])== dict and f['type'][effective_type_index_in_type]['items'].has_key('fields'):
# type: [ null, { type: array, items: { name: xxx, type: record, fields: [] } } ]
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
self.fields_json_to_csv(output_list_, current_field_path, f['type'][effective_type_index_in_type]['items']['fields'])
elif f['type'][effective_type_index_in_type].has_key('fields'):
# if f['type'][effective_type_index_in_type].has_key('namespace'):
# o_field_namespace = f['type'][effective_type_index_in_type]['namespace']
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
self.fields_json_to_csv(output_list_, current_field_path, f['type'][effective_type_index_in_type]['fields'])

# End of function

103 changes: 3 additions & 100 deletions metadata-etl/src/main/resources/jython/HdfsTransform.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from wherehows.common.writers import FileWriter
from wherehows.common.schemas import DatasetSchemaRecord, DatasetFieldRecord
from wherehows.common import Constant
from AvroColumnParser import AvroColumnParser


class HdfsTransform:
Expand All @@ -37,105 +38,6 @@ def transform(self, raw_metadata, metadata_output, field_metadata_output):
o_urn = ''
p = ''

def fields_json_to_csv(output_list_, parent_field_path, field_list_):
# string, list, int, optional int
self.sort_id
parent_field_path
parent_id = self.sort_id

for f in field_list_:
self.sort_id += 1

o_field_name = f['name']
o_field_data_type = ''
o_field_data_size = None
o_field_nullable = 'N'
o_field_default = ''
o_field_namespace = ''
o_field_doc = ''
effective_type_index_in_type = -1

if f.has_key('namespace'):
o_field_namespace = f['namespace']

if f.has_key('default') and type(f['default']) != None:
o_field_default = f['default']

if not f.has_key('type'):
o_field_data_type = None
elif type(f['type']) == list:
i = effective_type_index = -1
for data_type in f['type']:
i += 1 # current index
if type(data_type) is None or (data_type == 'null'):
o_field_nullable = 'Y'
elif type(data_type) == dict:
o_field_data_type = data_type['type']
effective_type_index_in_type = i

if data_type.has_key('namespace'):
o_field_namespace = data_type['namespace']
elif data_type.has_key('name'):
o_field_namespace = data_type['name']

if data_type.has_key('size'):
o_field_data_size = data_type['size']
else:
o_field_data_size = None

else:
o_field_data_type = data_type
effective_type_index_in_type = i
elif type(f['type']) == dict:
o_field_data_type = f['type']['type']
else:
o_field_data_type = f['type']
if f.has_key('attributes') and f['attributes'].has_key('nullable'):
o_field_nullable = 'Y' if f['attributes']['nullable'] else 'N'
if f.has_key('attributes') and f['attributes'].has_key('size'):
o_field_data_size = f['attributes']['size']

if f.has_key('doc'):
if len(f['doc']) == 0 and f.has_key('attributes'):
o_field_doc = json.dumps(f['attributes'])
else:
o_field_doc = f['doc']
elif f.has_key('comment'):
o_field_doc = f['comment']

output_list_.append(
[o_urn, self.sort_id, parent_id, parent_field_path, o_field_name, o_field_data_type, o_field_nullable,
o_field_default, o_field_data_size, o_field_namespace,
o_field_doc.replace("\n", ' ') if o_field_doc is not None else None])

# check if this field is a nested record
if type(f['type']) == dict and f['type'].has_key('fields'):
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
fields_json_to_csv(output_list_, current_field_path, f['type']['fields'])
elif type(f['type']) == dict and f['type'].has_key('items') and type(f['type']['items']) == dict and f['type']['items'].has_key('fields'):
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
fields_json_to_csv(output_list_, current_field_path, f['type']['items']['fields'])

if effective_type_index_in_type >= 0 and type(f['type'][effective_type_index_in_type]) == dict:
if f['type'][effective_type_index_in_type].has_key('items') and type(
f['type'][effective_type_index_in_type]['items']) == list:

for item in f['type'][effective_type_index_in_type]['items']:
if type(item) == dict and item.has_key('fields'):
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
fields_json_to_csv(output_list_, current_field_path, item['fields'])
elif f['type'][effective_type_index_in_type].has_key('items') and f['type'][effective_type_index_in_type]['items'].has_key('fields'):
# type: [ null, { type: array, items: { name: xxx, type: record, fields: [] } } ]
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
fields_json_to_csv(output_list_, current_field_path, f['type'][effective_type_index_in_type]['items']['fields'])
elif f['type'][effective_type_index_in_type].has_key('fields'):
# if f['type'][effective_type_index_in_type].has_key('namespace'):
# o_field_namespace = f['type'][effective_type_index_in_type]['namespace']
current_field_path = o_field_name if parent_field_path == '' else parent_field_path + '.' + o_field_name
fields_json_to_csv(output_list_, current_field_path, f['type'][effective_type_index_in_type]['fields'])

# End of function

for line in input_json_file:
try:
j = json.loads(line)
Expand Down Expand Up @@ -200,7 +102,8 @@ def fields_json_to_csv(output_list_, parent_field_path, field_list_):
f['attributes'] = json.loads(f['attributes_json'])
del f['attributes_json']

fields_json_to_csv(o_field_list_, '', j['fields'])
acp = AvroColumnParser(j, o_urn)
o_field_list_ += acp.get_column_list_result()

else:
o_fields = {"doc": None}
Expand Down

0 comments on commit a0b7cb9

Please sign in to comment.