-
Notifications
You must be signed in to change notification settings - Fork 9
/
App.scala
73 lines (66 loc) · 2.45 KB
/
App.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import java.util.Properties
import com.typesafe.config.ConfigFactory
import entity.TransData
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* Spark 应用程序
* 2019/07/06
*
* @author zhanghao
*/
object App {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("sz-metro-transport-card-data-app")
.master("local[*]").getOrCreate()
var dataPath: String = null
if (args.length == 1) {
dataPath = args(0)
} else {
//默认为card-data文件夹
dataPath = this.getClass.getClassLoader.getResource("card-data").getPath
println(dataPath)
}
//设置日志级别为WARN
spark.sparkContext.setLogLevel("WARN")
//导入隐式依赖
import spark.implicits._
//加载数据并转成Dataset
val data = spark.read.option("header", value = true).csv(s"$dataPath/*.csv").as[TransData]
//缓存数据
data.cache()
//数据库连接信息
val conf = ConfigFactory.load
val jdbcURL = conf.getString("jdbcUrl")
val connectionProperties = new Properties()
connectionProperties.setProperty("user", conf.getString("username"))
connectionProperties.setProperty("password", conf.getString("password"))
//展示前5条数据
data.show(5)
//数据总量
println(s"total_data_amount: ${data.count()}")
import org.apache.spark.sql.functions._
//数据期间与数据总量
data.agg(min("deal_date").as("min"),
max("deal_date").as("max"), count("*").as("total_amount")).show()
//过滤出站点不为空的数据
val filterData = data.filter("station is not null")
filterData.cache()
//按天进行分组的数据
val stationDayData = filterData.groupBy('company_name, $"station", 'deal_type,
date_format($"deal_date", "YYYY-MM-dd").as("deal_date"))
.count()
.orderBy(desc("count"))
//保存数据到数据库中
stationDayData.write.mode(SaveMode.Append).jdbc(jdbcURL, "day_card_data", connectionProperties)
//按天和小时进行分组的数据
val stationDayHourData = filterData.groupBy('company_name, $"station", 'deal_type,
date_format($"deal_date", "YYYY-MM-dd").as("deal_date"),
date_format($"deal_date", "HH").as("deal_hour"))
.count()
.orderBy(desc("count"))
//保存数据到数据库中
stationDayHourData.write.mode(SaveMode.Append).jdbc(jdbcURL, "day_hour_card_data", connectionProperties)
//关闭SparkSession
spark.close()
}
}