In [1]:
# spark 环境初始化与包导入
import os
import sys
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql import functions as F

sys.path.append("/home/worker/libs/advai_data_common-0.4.6-py3.6.egg")
sys.path.append("/home/worker/libs/data-ocean")
from asiacredit.conf.request import reqConf
from asiacredit.util.pyspark import decrypt_data
from asiacredit.util.pyspark.decrypt_data import DecryptData
from asiacredit.util.requeset.http_request import Request


def create_spark(driver_memory='4G', executor_memory='16G', py_files=None, jars=None, app_name=u'奔跑吧兄die ~~ '):
    os.environ['SPARK_DRIVER_MEMORY'] = driver_memory
    os.environ['SPARK_EXECUTO R_MEMORY'] = executor_memory

    config = SparkConf()
    config.set('spark.dynamicAllocation.maxExecutors', '8')
    config.set('spark.driver.memory', driver_memory)
    config.set('spark.executor.memory', executor_memory)
    config.set('spark.executor.cores', '8')
    config.set('spark.yarn.executora.memoryOverhead', '4G')
    config.set('spark.sql.shuffle.partitions', '500')
    config.set('spark.default.parallelism', '500')
    config.set('spark.port.maxRetries', '1000')
    config.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
    config.set('spark.master','local[4]')
    
    if jars:
        config.set('spark.jars', ','.join(jars))

    spark = SparkSession.builder.appName(app_name).config(conf=config).enableHiveSupport().getOrCreate()

    if py_files:
        for f in py_files:
            spark.sparkContext.addPyFile(f)

    return spark

spark = create_spark()


In [2]:
df = spark.createDataFrame([
    {'id': 1, u'姓名': u'张三', u'分数': 88, u'科目': u'数学'},
    {'id': 2, u'姓名': u'李雷', u'分数': 67, u'科目': u'数学'},
    {'id': 3, u'姓名': u'宫九', u'分数': 77, u'科目': u'数学'},
    {'id': 4, u'姓名': u'王五', u'分数': 65, u'科目': u'数学'},
    {'id': 1, u'姓名': u'张三', u'分数': 77, u'科目': u'英语'},
    {'id': 2, u'姓名': u'李雷', u'分数': 90, u'科目': u'英语'},
    {'id': 3, u'姓名': u'宫九', u'分数': 24, u'科目': u'英语'},
    {'id': 4, u'姓名': u'王五', u'分数': 90, u'科目': u'英语'},
    {'id': 1, u'姓名': u'张三', u'分数': 33, u'科目': u'语文'},
    {'id': 2, u'姓名': u'李雷', u'分数': 87, u'科目': u'语文'},
    {'id': 3, u'姓名': u'宫九', u'分数': 92, u'科目': u'语文'},
    {'id': 4, u'姓名': u'王五', u'分数': 87, u'科目': u'语文'},

])



## 行转列

In [3]:
df.createOrReplaceTempView('df')
df.show()

+---+----+----+----+
| id|分数|姓名|科目|
+---+----+----+----+
|  1|  88|张三|数学|
|  2|  67|李雷|数学|
|  3|  77|宫九|数学|
|  4|  65|王五|数学|
|  1|  77|张三|英语|
|  2|  90|李雷|英语|
|  3|  24|宫九|英语|
|  4|  90|王五|英语|
|  1|  33|张三|语文|
|  2|  87|李雷|语文|
|  3|  92|宫九|语文|
|  4|  87|王五|语文|
+---+----+----+----+



In [4]:
# 行转列
df_pivot = spark.sql("""
SELECT
    * 
FROM df
     PIVOT
     (
         SUM(`分数`) 
         FOR `科目` in ('数学','英语','语文')
     )
ORDER BY id      
""").cache()
df_pivot.createOrReplaceTempView('df_pivot')
df_pivot.show()

+---+----+----+----+----+
| id|姓名|数学|英语|语文|
+---+----+----+----+----+
|  1|张三|  88|  77|  33|
|  2|李雷|  67|  90|  87|
|  3|宫九|  77|  24|  92|
|  4|王五|  65|  90|  87|
+---+----+----+----+----+



## 列转行



In [5]:
spark.sql("""
SELECT
    id
    ,`姓名`
    ,stack(3, '数学', `数学`, '英语', `英语`, '语文', `语文`) as (`科目`, `分数`)
FROM df_pivot
""").cache().show()

+---+----+----+----+
| id|姓名|科目|分数|
+---+----+----+----+
|  1|张三|数学|  88|
|  1|张三|英语|  77|
|  1|张三|语文|  33|
|  2|李雷|数学|  67|
|  2|李雷|英语|  90|
|  2|李雷|语文|  87|
|  3|宫九|数学|  77|
|  3|宫九|英语|  24|
|  3|宫九|语文|  92|
|  4|王五|数学|  65|
|  4|王五|英语|  90|
|  4|王五|语文|  87|
+---+----+----+----+



In [7]:
# 只看转化一部分数据
spark.sql("""
SELECT
    id
    ,`姓名`
    ,stack(1, '数学', `数学`) as (`科目`, `分数`)
FROM df_pivot
""").cache().show()

+---+----+----+----+
| id|姓名|科目|分数|
+---+----+----+----+
|  1|张三|数学|  88|
|  2|李雷|数学|  67|
|  3|宫九|数学|  77|
|  4|王五|数学|  65|
+---+----+----+----+



## 参考

- https://databricks.com/blog/2018/11/01/sql-pivot-converting-rows-to-columns.html
- https://www.cnblogs.com/hhelibeb/p/10310369.html

TODO:

pivot的高级用法, 以及与group by的区别
