# SparkSQL Practice

### 准备数据

In [1]:
from pyspark import StorageLevel
from pyspark.sql import SparkSession, DataFrame as SparkDataFrame

In [2]:
# 初始化spark session
spark_session = SparkSession.builder.master("local[4]").appName("test_log_processing").getOrCreate()
# standalone时不能把master线程数设的太高, 否则会因为Java GC的原因导致进程failed
spark_session.conf.set("spark.driver.memory", "2g")
spark_session.conf.set("spark.executor.memory", "2g")
print(spark_session.version)  # 查看spark版本信息

2.4.4


In [3]:
# 通过JDBC连接MySQL数据库, 读取数据
# 由于数据库已经保留了表的schema, 因此不需要指定schema
url = "jdbc:mysql://localhost:3306"  # JDBC URL
table = "test.fakelog"  # 表名
properties = {"user": "root", "password": "z2026419"}  # 连接属性(用户名和密码)

In [4]:
# 读取数据
df = spark_session.read.jdbc(url, table, properties=properties)
df.coalesce(50).persist(StorageLevel.MEMORY_AND_DISK)  
# df的分区出现了一些问题, 下面代码可以正常输出前10行, 目前正在进一步了解分区与缓存的原理
%time print("记录数: %d" % df.count())  # 查看数据集的总记录数

记录数: 2336563
Wall time: 4.34 s


In [5]:
%time df.show()  # show的时间复杂度仿佛很高

+--------------------+-------+-------+---------+--------------------+-----------+-------------------+
|               LOGID|ACQ_NUM|DEQ_NUM|ACQ_SHARE|           ITEM_CODE|STATUS_FLAG|            LOGTIME|
+--------------------+-------+-------+---------+--------------------+-----------+-------------------+
|000005acf636cea98...|     19|      2|     0.82|5e0bqjonw8kmzya67...|          B|2019-05-18 08:02:51|
|000011f2d737b56f6...|     16|     19|     0.41|  9o3ibs17824xkgazye|          B|2019-07-02 08:01:03|
|000014a54e90d3af7...|      5|     16|     0.34|              7j0m8b|          B|2019-06-07 01:32:13|
|00001b5238f8475b5...|     38|     18|     0.85|  8xb4zd6aok31ntw72p|          B|2019-08-03 14:39:00|
|00001ead605d17b2a...|      8|     19|     0.67|          ijbra9o3tv|          A|2019-10-21 02:22:45|
|000020f4a2388cb0b...|     34|      2|     0.97|        tl036rev7y42|          A|2019-07-08 11:16:53|
|000021aa3f46fc60e...|     41|      8|     0.17| bhr5ocy807d41x3siqt|          B|2

### 筛选与字段运算

In [5]:
### 筛选一定时间区间内的记录
t1 = "2019-05-06 12:32:56"
t2 = "2019-06-06 12:32:56"

In [6]:
# 方式1: 调DataFrame的方法
df_time_filtered = df.filter((df.LOGTIME >= t1) & (df.LOGTIME < t2))
df_time_filtered.coalesce(50).persist(StorageLevel.MEMORY_AND_DISK)  # 分区数=50 work
print(df_time_filtered.count())
%time df_time_filtered.show(10)

211410
+--------------------+-------+-------+---------+--------------------+-----------+-------------------+
|               LOGID|ACQ_NUM|DEQ_NUM|ACQ_SHARE|           ITEM_CODE|STATUS_FLAG|            LOGTIME|
+--------------------+-------+-------+---------+--------------------+-----------+-------------------+
|000005acf636cea98...|     19|      2|     0.82|5e0bqjonw8kmzya67...|          B|2019-05-18 08:02:51|
|00002963bfc216bfa...|     30|      1|     0.05|uo9dx5rhbp26kqwyc...|          B|2019-05-13 09:53:12|
|0000bc5270286f7a7...|     31|     15|     0.12|           hug4nvkwc|          B|2019-06-02 04:59:34|
|000111c6c2c36620c...|     34|     19|     0.44|1g3vh6r90d7fsc54u...|          A|2019-05-22 06:08:32|
|0001459cc41968df0...|      4|     19|     0.88|jlo86izb9mf753gux...|          A|2019-05-20 20:15:38|
|00014b18aab2b376e...|     17|     11|     0.77|tz1sm07xa3gpcl6k4...|          B|2019-05-25 03:34:51|
|0001a5d00032b7ea6...|     41|     11|     0.69|        g2um56hx9a8c|      

In [6]:
# 方式2: 注册该DataFrame为临时SQL表, 写sql实现
df.createOrReplaceTempView("df")
df_time_filtered = spark_session.sql("""
    SELECT * FROM df WHERE LOGTIME BETWEEN '%s' AND '%s' ORDER BY LOGTIME ASC
""" % (t1, t2))
# 这种方式下Spark会先做一个执行计划, 然后调用Catalyst Optimizer, 比较稳定, 一般不会出GC的问题
df_time_filtered.coalesce(25).persist(StorageLevel.MEMORY_AND_DISK)
%time print(df_time_filtered.count())

211410
Wall time: 24.9 s


In [None]:
df_time_filtered.show(10)  # show依然存在问题, 运行不出来

In [7]:
### 字段运算
# 尽量使用方式2(写SQL实现运算和操作)
df_time_filtered.createOrReplaceTempView("df_time_filtered")
df_smry = spark_session.sql("""
    SELECT 
        LOGID,
        (ACQ_NUM + DEQ_NUM) AS TOTAL_NUM,  -- 简单加法
        ACQ_NUM / ACQ_SHARE AS S_TOTAL,   -- 简单乘法
        ITEM_CODE,
        STATUS_FLAG,
        DATE(LOGTIME) AS LOGDATE  -- datetime转为日期
    FROM df_time_filtered
""")
df_smry.coalesce(25).persist(StorageLevel.MEMORY_AND_DISK)
%time df_smry.count()
%time df_smry.show(10)

Wall time: 9.98 s
+--------------------+---------+----------+--------------------+-----------+----------+
|               LOGID|TOTAL_NUM|   S_TOTAL|           ITEM_CODE|STATUS_FLAG|   LOGDATE|
+--------------------+---------+----------+--------------------+-----------+----------+
|000005acf636cea98...|       21| 23.170732|5e0bqjonw8kmzya67...|          B|2019-05-18|
|00002963bfc216bfa...|       31|600.000000|uo9dx5rhbp26kqwyc...|          B|2019-05-13|
|0000bc5270286f7a7...|       46|258.333333|           hug4nvkwc|          B|2019-06-02|
|000111c6c2c36620c...|       53| 77.272727|1g3vh6r90d7fsc54u...|          A|2019-05-22|
|0001459cc41968df0...|       23|  4.545455|jlo86izb9mf753gux...|          A|2019-05-20|
|00014b18aab2b376e...|       28| 22.077922|tz1sm07xa3gpcl6k4...|          B|2019-05-25|
|0001a5d00032b7ea6...|       52| 59.420290|        g2um56hx9a8c|          A|2019-05-24|
|0001b5eca5e2e80a5...|       19| 12.765957|    4av905nilj3mtz7w|          B|2019-05-23|
|0001e2ec36e06

### 聚合操作

In [8]:
# 依旧使用方式2实现
# 按日期, STATUS_FLAG聚合; 计算衍生字段TOTAL_NUM和S_TOTAL的每日每类和与均值, 以及样本数
df_smry.createOrReplaceTempView("df_smry")
df_time_filtered.unpersist()  # 不用的DataFrame要及时unpersist
df_groupby_date_status_flag = spark_session.sql("""
    SELECT
        LOGDATE,
        STATUS_FLAG,
        SUM(TOTAL_NUM) AS SUM_TOTAL_NUM,
        AVG(TOTAL_NUM) AS AVG_TOTAL_NUM,
        SUM(S_TOTAL) AS SUM_S_TOTAL,
        AVG(S_TOTAL) AS AVG_S_TOTAL
    FROM df_smry
    GROUP BY LOGDATE, STATUS_FLAG
""")
df_groupby_date_status_flag.coalesce(25).persist(StorageLevel.MEMORY_AND_DISK)
%time df_groupby_date_status_flag.count()
%time df_groupby_date_status_flag.show(10)

Wall time: 15.3 s
+----------+-----------+-------------+------------------+-------------+--------------+
|   LOGDATE|STATUS_FLAG|SUM_TOTAL_NUM|     AVG_TOTAL_NUM|  SUM_S_TOTAL|   AVG_S_TOTAL|
+----------+-----------+-------------+------------------+-------------+--------------+
|2019-05-06|          A|        58107| 32.15661317100166|220778.488848|122.6547160267|
|2019-05-25|          B|       120847|32.148709763234905|393668.652214|105.0904036877|
|2019-05-23|          B|       118127|31.857335490830636|438589.807326|118.7624715207|
|2019-06-01|          A|       124614| 32.19168173598553|407792.589233|105.9752051021|
|2019-05-13|          A|       120121|31.694195250659632|386811.079153|102.6841197645|
|2019-05-20|          B|       121877|32.089784096893105|466899.724610|123.5838339359|
|2019-05-10|          B|       122268| 32.03248624574273|427691.300044|112.9068901911|
|2019-06-06|          B|        65058|31.844346549192363|252385.952369|124.3280553542|
|2019-05-27|          B| 

In [12]:
# 聚合完成的结果写回数据库
res_table = "test.grouped_result"
%time df_groupby_date_status_flag.write.jdbc(url, res_table, properties=properties)
# 这里的性能也有待提升

In [None]:
spark_session.stop()