Skip to content

Commit 3c5a1ca

Browse files
Initial Commit
1 parent f7db9a5 commit 3c5a1ca

File tree

4 files changed

+86
-0
lines changed

4 files changed

+86
-0
lines changed

17-RankingDemo/build.sbt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
name := "RankingDemo"
2+
organization := "guru.learningjournal"
3+
version := "0.1"
4+
scalaVersion := "2.12.10"
5+
6+
autoScalaLibrary := false
7+
val sparkVersion = "3.0.0-preview2"
8+
9+
val sparkDependencies = Seq(
10+
"org.apache.spark" %% "spark-core" % sparkVersion,
11+
"org.apache.spark" %% "spark-sql" % sparkVersion
12+
)
13+
14+
libraryDependencies ++= sparkDependencies

17-RankingDemo/data/summary.parquet

2.43 KB
Binary file not shown.

17-RankingDemo/log4j.properties

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Set everything to be logged to the console
2+
log4j.rootCategory=WARN, console
3+
4+
# define console appender
5+
log4j.appender.console=org.apache.log4j.ConsoleAppender
6+
log4j.appender.console.target=System.out
7+
log4j.appender.console.layout=org.apache.log4j.PatternLayout
8+
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
9+
10+
#application log
11+
log4j.logger.guru.learningjournal.spark.examples=INFO, console, file
12+
log4j.additivity.guru.learningjournal.spark.examples=false
13+
14+
#define rolling file appender
15+
log4j.appender.file=org.apache.log4j.RollingFileAppender
16+
log4j.appender.file.File=${spark.yarn.app.container.log.dir}/${logfile.name}.log
17+
#define following in Java System
18+
# -Dlog4j.configuration=file:log4j.properties
19+
# -Dlogfile.name=hello-spark
20+
# -Dspark.yarn.app.container.log.dir=app-logs
21+
log4j.appender.file.ImmediateFlush=true
22+
log4j.appender.file.Append=false
23+
log4j.appender.file.MaxFileSize=500MB
24+
log4j.appender.file.MaxBackupIndex=2
25+
log4j.appender.file.layout=org.apache.log4j.PatternLayout
26+
log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
27+
28+
29+
# Recommendations from Spark template
30+
log4j.logger.org.apache.spark.repl.Main=WARN
31+
log4j.logger.org.spark_project.jetty=WARN
32+
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
33+
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
34+
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
35+
log4j.logger.org.apache.parquet=ERROR
36+
log4j.logger.parquet=ERROR
37+
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
38+
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
39+
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package guru.learningjournal.spark.examples
2+
3+
import org.apache.log4j.Logger
4+
import org.apache.spark.sql.SparkSession
5+
import org.apache.spark.sql.expressions.Window
6+
import org.apache.spark.sql.functions._
7+
8+
object RankingDemo extends Serializable {
9+
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
10+
11+
def main(args: Array[String]): Unit = {
12+
13+
val spark = SparkSession.builder()
14+
.appName("Ranking Demo")
15+
.master("local[3]")
16+
.getOrCreate()
17+
18+
val summaryDF = spark.read.parquet("data/summary.parquet")
19+
20+
summaryDF.sort("Country", "WeekNumber").show()
21+
22+
val rankWindow = Window.partitionBy("Country")
23+
.orderBy(col("InvoiceValue").desc)
24+
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
25+
26+
summaryDF.withColumn("Rank", dense_rank().over(rankWindow))
27+
.where(col("Rank") ===1)
28+
.sort("Country", "WeekNumber")
29+
.show()
30+
31+
spark.stop()
32+
}
33+
}

0 commit comments

Comments
 (0)