Skip to content

Commit 15eb175

Browse files
Initial Commit
1 parent 6ebb1b3 commit 15eb175

File tree

7 files changed

+1698
-1
lines changed

7 files changed

+1698
-1
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@
1010
# Leave target directory
1111
*/target/*
1212
*/metastore_db*
13-
*/spark-warehouse*
13+
*/spark-warehouse*
14+
*/output*

21-ComplexTypesDemo/build.sbt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
name := "ComplexTypesDemo"
2+
organization := "guru.learningjournal"
3+
version := "0.1"
4+
scalaVersion := "2.12.10"
5+
autoScalaLibrary := false
6+
val sparkVersion = "3.0.0"
7+
8+
val sparkDependencies = Seq(
9+
"org.apache.spark" %% "spark-core" % sparkVersion,
10+
"org.apache.spark" %% "spark-sql" % sparkVersion
11+
)
12+
13+
libraryDependencies ++= sparkDependencies

21-ComplexTypesDemo/data/Invoice-set1.json

Lines changed: 500 additions & 0 deletions
Large diffs are not rendered by default.

21-ComplexTypesDemo/data/Invoice-set2.json

Lines changed: 500 additions & 0 deletions
Large diffs are not rendered by default.

21-ComplexTypesDemo/data/Invoice-set3.json

Lines changed: 590 additions & 0 deletions
Large diffs are not rendered by default.

21-ComplexTypesDemo/log4j.properties

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
4+
# define console appender
5+
log4j.appender.console=org.apache.log4j.ConsoleAppender
6+
log4j.appender.console.target=System.out
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
9+
10+
#application log
11+
log4j.logger.guru.learningjournal.spark.examples=INFO, console
12+
log4j.additivity.guru.learningjournal.spark.examples=false
13+
14+
#define following in Java System
15+
# -Dlog4j.configuration=file:log4j.properties
16+
17+
# Recommendations from Spark template
18+
log4j.logger.org.apache.spark.repl.Main=WARN
19+
log4j.logger.org.spark_project.jetty=WARN
20+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
21+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
22+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
23+
log4j.logger.org.apache.parquet=ERROR
24+
log4j.logger.parquet=ERROR
25+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
26+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
27+
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package guru.learningjournal.spark.examples
2+
3+
import org.apache.log4j.Logger
4+
import org.apache.spark.sql.{SaveMode, SparkSession}
5+
import org.apache.spark.sql.functions.expr
6+
import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, LongType, StringType, StructField, StructType}
7+
8+
object ComplexTypesDemo extends Serializable {
9+
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
10+
11+
def main(args: Array[String]): Unit = {
12+
val spark = SparkSession.builder()
13+
.master("local[3]")
14+
.appName("Complex Types Demo")
15+
.getOrCreate()
16+
17+
val schema = StructType(List(
18+
StructField("InvoiceNumber", StringType),
19+
StructField("CreatedTime", LongType),
20+
StructField("StoreID", StringType),
21+
StructField("PosID", StringType),
22+
StructField("CashierID", StringType),
23+
StructField("CustomerType", StringType),
24+
StructField("CustomerCardNo", StringType),
25+
StructField("TotalAmount", DoubleType),
26+
StructField("NumberOfItems", IntegerType),
27+
StructField("PaymentMethod", StringType),
28+
StructField("CGST", DoubleType),
29+
StructField("SGST", DoubleType),
30+
StructField("CESS", DoubleType),
31+
StructField("DeliveryType", StringType),
32+
StructField("DeliveryAddress", StructType(List(
33+
StructField("AddressLine", StringType),
34+
StructField("City", StringType),
35+
StructField("State", StringType),
36+
StructField("PinCode", StringType),
37+
StructField("ContactNumber", StringType)
38+
))),
39+
StructField("InvoiceLineItems", ArrayType(StructType(List(
40+
StructField("ItemCode", StringType),
41+
StructField("ItemDescription", StringType),
42+
StructField("ItemPrice", DoubleType),
43+
StructField("ItemQty", IntegerType),
44+
StructField("TotalValue", DoubleType),
45+
)))),
46+
))
47+
48+
val df1 = spark.read.schema(schema).json("data/")
49+
50+
val df2 = df1.selectExpr("InvoiceNumber", "CreatedTime", "StoreID", "PosID",
51+
"CustomerType", "PaymentMethod", "DeliveryType",
52+
"DeliveryAddress.City", "DeliveryAddress.State", "DeliveryAddress.PinCode",
53+
"explode(InvoiceLineItems) as LineItem")
54+
55+
val df4 = df2.withColumn("ItemCode", expr("LineItem.ItemCode"))
56+
.withColumn("ItemDescription", expr("LineItem.ItemDescription"))
57+
.withColumn("ItemPrice", expr("LineItem.ItemPrice"))
58+
.withColumn("ItemQty", expr("LineItem.ItemQty"))
59+
.withColumn("TotalValue", expr("LineItem.TotalValue"))
60+
.drop("LineItem")
61+
62+
df4.write.mode(SaveMode.Overwrite).parquet("output/")
63+
logger.info("Finish writing data.")
64+
}
65+
66+
}

0 commit comments

Comments
 (0)