-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.scala
102 lines (86 loc) · 4.44 KB
/
run.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
val inputFileName : String = "путь_к_файлу.json";
val outputFileName : String = "путь_к_выходному_файлу.parquet";
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.node.ValueNode
import scala.collection.JavaConverters._;
import scala.collection.JavaConversions._;
import scala.Tuple3;
import org.apache.spark.sql.functions._;
import org.apache.spark.sql.expressions.UserDefinedFunction;
object JsonFlattener {
//output = Struct{path, key, value}
def flattenJson(inputJson: String): java.util.List[Tuple3[String, String, String]] = {
val outArray = new java.util.ArrayList[Tuple3[String, String, String]]
if (inputJson == null || inputJson.isEmpty || inputJson.equalsIgnoreCase("null") || inputJson.equalsIgnoreCase("none") || inputJson.equalsIgnoreCase("nan")) {
outArray.add(new Tuple3[String, String, String]("/", "/", "null"))
return outArray
}
val objectMapper = new ObjectMapper
try {
val jsonNode = objectMapper.readTree(inputJson)
addKeys(outArray, jsonNode, "", "")
} catch {
case e: Exception =>
outArray.add(new Tuple3[String, String, String]("", "parsing_error_invalid_json", e.getMessage))
return outArray
}
outArray
}
private def addKeys(outArray: java.util.List[Tuple3[String, String, String]], jsonNode: JsonNode, path: String, key: String): Unit = {
if (jsonNode.isArray) parseArray(outArray, jsonNode.asInstanceOf[ArrayNode], path, key)
else if (jsonNode.isObject) parseObject(outArray, jsonNode.asInstanceOf[ObjectNode], path, key)
else parseValue(outArray, jsonNode.asInstanceOf[ValueNode], path, key)
}
private def buildPath(path: String, key: String): String = {
var newPath: String = null
if (path.isEmpty) return key
if (!key.isEmpty && key.charAt(0) == '[') newPath = path + key
else newPath = path + "/" + key
newPath
}
private def parseObject(outArray: java.util.List[Tuple3[String, String, String]], objectNode: ObjectNode, path: String, key: String): Unit = {
val newPath = buildPath(path, key)
if (objectNode.size == 0) outArray.add(new Tuple3[String, String, String](newPath, key, "{}"))
val iterator = objectNode.fields
while (iterator.hasNext) {
val entry = iterator.next
val entryKey = entry.getKey
addKeys(outArray, entry.getValue, newPath, entryKey)
}
}
private def parseArray(outArray: java.util.List[Tuple3[String, String, String]], arrayNode: ArrayNode, path: String, key: String): Unit = {
val newPath = buildPath(path, key)
if (arrayNode.size == 0) {
outArray.add(new Tuple3[String, String, String](newPath, key, "[]"))
return
}
for (i <- 0 until arrayNode.size) {
addKeys(outArray, arrayNode.get(i), newPath, "[" + i + "]")
}
}
private def parseValue(outArray: java.util.List[Tuple3[String, String, String]], valueNode: ValueNode, path: String, key: String): Unit = {
val newPath = buildPath(path, key)
if (valueNode.isFloatingPointNumber) outArray.add(new Tuple3[String, String, String](newPath, key, String.valueOf(valueNode.asDouble)))
else if (valueNode.isBoolean) outArray.add(new Tuple3[String, String, String](newPath, key, String.valueOf(valueNode.asBoolean)))
else if (valueNode.isIntegralNumber) outArray.add(new Tuple3[String, String, String](newPath, key, String.valueOf(valueNode.asLong)))
else {
val text = valueNode.asText
outArray.add(new Tuple3[String, String, String](newPath, key, if (text == null || text.trim.isEmpty) "null"
else text.trim))
}
}
}
val jsonToKeyValueUdf : UserDefinedFunction = udf((inputJson: String) => {
val javaList : java.util.List[Tuple3[String, String, String]] = JsonFlattener.flattenJson(inputJson);
val scalaList : scala.List[Tuple3[String, String, String]] = javaList.asScala.toList;
scalaList
});
val jsonDf = spark.read.option("wholetext", true).text(inputFileName).selectExpr("value as json");
//val jsonText : String = jsonDf.first().getString(0);
val jsonParsed = jsonDf.withColumn("resultStruct", explode(jsonToKeyValueUdf(col("json")))).drop("json").selectExpr("resultStruct._1 as path", "resultStruct._2 as key", "resultStruct._3 as value");
jsonParsed.cache();
jsonParsed.write.mode("overwrite").parquet(outputFileName);
jsonParsed.show(10,false);