In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
from sklearn.utils import shuffle
import seaborn as sns
import pandas as pd 

### 高阶操作
hive 中高阶操作 包括 explode pos_explode rank row_number lead lag  
以此为标准 分别对pyspark 和pandas 进行探索

In [2]:
tips_df = sns.load_dataset("tips")
spark = SparkSession.builder.appName('abc').getOrCreate()

In [3]:
tips_df["exp_row"] = tips_df.time.apply(lambda r: "#".join(set(list(r))))
tips_df["id"] = tips_df.index
tips_df.head(5)

Unnamed: 0,total_bill,tip,sex,smoker,day,time,size,exp_row,id
0,16.99,1.01,Female,No,Sun,Dinner,2,i#r#e#n#D,0
1,10.34,1.66,Male,No,Sun,Dinner,3,i#r#e#n#D,1
2,21.01,3.5,Male,No,Sun,Dinner,3,i#r#e#n#D,2
3,23.68,3.31,Male,No,Sun,Dinner,2,i#r#e#n#D,3
4,24.59,3.61,Female,No,Sun,Dinner,4,i#r#e#n#D,4


#### explode posexplode
- explode 
行转列 将一行的某个字段拆开 分别对应多行
```SQL
--explode
select
   total_bill,
   tip,
   sex,
   exploed_row
from tips_table lateral view explode(split(exp_row, '#')) t as exploed_row 
```
- posexplode 
除了类似于explode 拆开行之外 还返回 拆开后的每行在原来每行中的索引
```SQL
--explode
select
   total_bill,
   tip,
   sex,
   exploed_row
from tips_table lateral view posexplode(split(exp_row, '#')) t as exploed_row 
```



In [4]:
spark_df = spark.createDataFrame(tips_df)
spark_df.createOrReplaceTempView("tips_table")
spark_df.show(3)

+----------+----+------+------+---+------+----+---------+---+
|total_bill| tip|   sex|smoker|day|  time|size|  exp_row| id|
+----------+----+------+------+---+------+----+---------+---+
|     16.99|1.01|Female|    No|Sun|Dinner|   2|i#r#e#n#D|  0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|i#r#e#n#D|  1|
|     21.01| 3.5|  Male|    No|Sun|Dinner|   3|i#r#e#n#D|  2|
+----------+----+------+------+---+------+----+---------+---+
only showing top 3 rows



##### hive 的explode 的参数可以是 array 或者map array 返回一列 

In [5]:
sql = '''
select
   total_bill,
   tip,
   sex,
   exploed_row
from tips_table lateral view explode(split(exp_row, '#')) t as exploed_row 
'''
explode_hive_df = spark.sql(sql)
explode_hive_df.show(20)

+----------+----+------+-----------+
|total_bill| tip|   sex|exploed_row|
+----------+----+------+-----------+
|     16.99|1.01|Female|          i|
|     16.99|1.01|Female|          r|
|     16.99|1.01|Female|          e|
|     16.99|1.01|Female|          n|
|     16.99|1.01|Female|          D|
|     10.34|1.66|  Male|          i|
|     10.34|1.66|  Male|          r|
|     10.34|1.66|  Male|          e|
|     10.34|1.66|  Male|          n|
|     10.34|1.66|  Male|          D|
|     21.01| 3.5|  Male|          i|
|     21.01| 3.5|  Male|          r|
|     21.01| 3.5|  Male|          e|
|     21.01| 3.5|  Male|          n|
|     21.01| 3.5|  Male|          D|
|     23.68|3.31|  Male|          i|
|     23.68|3.31|  Male|          r|
|     23.68|3.31|  Male|          e|
|     23.68|3.31|  Male|          n|
|     23.68|3.31|  Male|          D|
+----------+----+------+-----------+
only showing top 20 rows



In [21]:
sql = '''
select
   total_bill,
   tip,
   sex,
   col_idx,
   exploed_row
from tips_table lateral view posexplode(split(exp_row, '#')) t as col_idx, exploed_row 
'''
posexplode_hive_df = spark.sql(sql)
posexplode_hive_df.show(20)

+----------+----+------+-------+-----------+
|total_bill| tip|   sex|col_idx|exploed_row|
+----------+----+------+-------+-----------+
|     16.99|1.01|Female|      0|          i|
|     16.99|1.01|Female|      1|          r|
|     16.99|1.01|Female|      2|          e|
|     16.99|1.01|Female|      3|          n|
|     16.99|1.01|Female|      4|          D|
|     10.34|1.66|  Male|      0|          i|
|     10.34|1.66|  Male|      1|          r|
|     10.34|1.66|  Male|      2|          e|
|     10.34|1.66|  Male|      3|          n|
|     10.34|1.66|  Male|      4|          D|
|     21.01| 3.5|  Male|      0|          i|
|     21.01| 3.5|  Male|      1|          r|
|     21.01| 3.5|  Male|      2|          e|
|     21.01| 3.5|  Male|      3|          n|
|     21.01| 3.5|  Male|      4|          D|
|     23.68|3.31|  Male|      0|          i|
|     23.68|3.31|  Male|      1|          r|
|     23.68|3.31|  Male|      2|          e|
|     23.68|3.31|  Male|      3|          n|
|     23.6

##### spark 的explode 的参数可以是 array 或者map array 返回一列 
注意的是 spark方式 是描述一列为explode 和 hive的描述表为lateral view的方式有点不一样

In [6]:
explode_spark_df = spark_df.select("total_bill", "tip", "sex", F.explode(F.split(F.col("exp_row"), "#")).alias("exploed_row"))
explode_spark_df.show(20)

+----------+----+------+-----------+
|total_bill| tip|   sex|exploed_row|
+----------+----+------+-----------+
|     16.99|1.01|Female|          i|
|     16.99|1.01|Female|          r|
|     16.99|1.01|Female|          e|
|     16.99|1.01|Female|          n|
|     16.99|1.01|Female|          D|
|     10.34|1.66|  Male|          i|
|     10.34|1.66|  Male|          r|
|     10.34|1.66|  Male|          e|
|     10.34|1.66|  Male|          n|
|     10.34|1.66|  Male|          D|
|     21.01| 3.5|  Male|          i|
|     21.01| 3.5|  Male|          r|
|     21.01| 3.5|  Male|          e|
|     21.01| 3.5|  Male|          n|
|     21.01| 3.5|  Male|          D|
|     23.68|3.31|  Male|          i|
|     23.68|3.31|  Male|          r|
|     23.68|3.31|  Male|          e|
|     23.68|3.31|  Male|          n|
|     23.68|3.31|  Male|          D|
+----------+----+------+-----------+
only showing top 20 rows



In [28]:
# !!!alias 可以有多个参数 
posexplode_spark_df = spark_df.select("total_bill", 
                                      "tip", 
                                      "sex", 
                                      F.posexplode(F.split(F.col("exp_row"), "#")).alias("col_idx", "exploed_row")
                                     )
posexplode_spark_df.show(20)

+----------+----+------+-------+-----------+
|total_bill| tip|   sex|col_idx|exploed_row|
+----------+----+------+-------+-----------+
|     16.99|1.01|Female|      0|          i|
|     16.99|1.01|Female|      1|          r|
|     16.99|1.01|Female|      2|          e|
|     16.99|1.01|Female|      3|          n|
|     16.99|1.01|Female|      4|          D|
|     10.34|1.66|  Male|      0|          i|
|     10.34|1.66|  Male|      1|          r|
|     10.34|1.66|  Male|      2|          e|
|     10.34|1.66|  Male|      3|          n|
|     10.34|1.66|  Male|      4|          D|
|     21.01| 3.5|  Male|      0|          i|
|     21.01| 3.5|  Male|      1|          r|
|     21.01| 3.5|  Male|      2|          e|
|     21.01| 3.5|  Male|      3|          n|
|     21.01| 3.5|  Male|      4|          D|
|     23.68|3.31|  Male|      0|          i|
|     23.68|3.31|  Male|      1|          r|
|     23.68|3.31|  Male|      2|          e|
|     23.68|3.31|  Male|      3|          n|
|     23.6

#### pandas 不支持explode   
需要自己造轮子

#### rank dense_rank row_number 
1. row_number() 是没有重复值的排序(即使两天记录相等也是不重复的)，可以利用它来实现分页
1. dense_rank() 是连续排序，两个第二名仍然跟着第三名
1. rank()       是跳跃拍学，两个第二名下来就是第四名

也就是说 rank 是有重复值的 但是row_number 没有 

hive example 
```SQL
select
    id, 
    row_number() over (partition by sex order by size desc) as rn,
    dense_rank() over (partition by sex order by size desc) as drk,
    rank() over (partition by sex order by size desc) as rk 
from tips_table
```

- hive 方式的开窗函数 


In [7]:
# hive 方式的开窗函数 
sql = '''
select
    id, 
    row_number() over (partition by sex order by size desc) as rn,
    dense_rank() over (partition by sex order by size desc) as drk,
    rank() over (partition by sex order by size desc) as rk 
from tips_table
order by id
'''
rank_hive_df = spark.sql(sql)
rank_hive_df.show(10)

+---+---+---+---+
| id| rn|drk| rk|
+---+---+---+---+
|  0| 27|  5| 27|
|  1| 35|  4| 35|
|  2| 36|  4| 35|
|  3| 59|  5| 59|
|  4|  4|  3|  4|
|  5|  7|  3|  7|
|  6| 60|  5| 59|
|  7|  8|  3|  7|
|  8| 61|  5| 59|
|  9| 62|  5| 59|
+---+---+---+---+
only showing top 10 rows



- pyspark 使用window 对象来描述 类似于hive中的开窗 计算顺序的函数 需要 使用 


In [8]:
# pyspark 使用window 对象来描述 类似于hive中的开窗 计算顺序的函数 需要 使用 

window_1 = Window.partitionBy("sex").orderBy(F.col("size").desc())
rank_spark_df = spark_df.select(F.col("id"),
                                F.row_number().over(window_1).alias("rn"),
                                F.dense_rank().over(window_1).alias("drk"), 
                                F.row_number().over(window_1).alias("rk")).orderBy(F.col("id"))
rank_spark_df.show(10)

+---+---+---+---+
| id| rn|drk| rk|
+---+---+---+---+
|  0| 27|  5| 27|
|  1| 35|  4| 35|
|  2| 36|  4| 36|
|  3| 59|  5| 59|
|  4|  4|  3|  4|
|  5|  7|  3|  7|
|  6| 60|  5| 60|
|  7|  8|  3|  8|
|  8| 61|  5| 61|
|  9| 62|  5| 62|
+---+---+---+---+
only showing top 10 rows



- pandas 的 rank 可以在DataFrame 上和 groupedDataFrame上直接使用 
- 使用 不同的method参数 来实验 row_number dense_rank 等方法
1. min 是hive rank() 中该数值的最小值
1. max 是hive rank() 中该数值的最大值
1. average 是hive rank() 中该数值的平均值
1. denses  是hive 的dense_rank()
1. first   是hive 的row_number()

In [9]:
rank_pandas_df = tips_df.copy()[["sex", "id", "size"]]
rank_pandas_df.head(2)

Unnamed: 0,sex,id,size
0,Female,0,2
1,Male,1,3


- 可以看出rank调用返回一个包含dataFrame中数值列的rank的一个新的DataFrame   
- 下面以一个简单的DataFrame说明rank返回的结果

In [15]:
grouped_df = rank_pandas_df.groupby(by="sex").rank(method="min", ascending=False)
# rank(method="dense", ascending=False)
grouped_df.head(10)

Unnamed: 0,id,size
0,87.0,27.0
1,157.0,35.0
2,156.0,35.0
3,155.0,59.0
4,86.0,4.0
5,154.0,7.0
6,153.0,59.0
7,152.0,7.0
8,151.0,59.0
9,150.0,59.0


In [11]:
# row_number
grouped_df = rank_pandas_df.groupby(by="sex").rank(method="first", ascending=False)
grouped_df.head(10)

Unnamed: 0,id,size
0,87.0,27.0
1,157.0,35.0
2,156.0,36.0
3,155.0,59.0
4,86.0,4.0
5,154.0,7.0
6,153.0,60.0
7,152.0,8.0
8,151.0,61.0
9,150.0,62.0


In [14]:
df = pd.DataFrame({"A":[1, 1, 1, 2, 2, 2, 3, 3], "B":[1, 1, 1, 2, 2, 2, 3, 3]})

In [68]:
df 

Unnamed: 0,A,B
0,1,1
1,1,1
2,1,1
3,2,2
4,2,2
5,2,2
6,3,3
7,3,3


In [67]:
# max 是hive rank() 中该数值的最大值
df.rank(method="max")

Unnamed: 0,A,B
0,3.0,3.0
1,3.0,3.0
2,3.0,3.0
3,6.0,6.0
4,6.0,6.0
5,6.0,6.0
6,8.0,8.0
7,8.0,8.0


In [69]:
# min 是hive rank() 中该数值的最小值
df.rank(method="min")

Unnamed: 0,A,B
0,1.0,1.0
1,1.0,1.0
2,1.0,1.0
3,4.0,4.0
4,4.0,4.0
5,4.0,4.0
6,7.0,7.0
7,7.0,7.0


In [17]:
# average 是hive rank() 中该数值的平均值
df.rank(method="average")

Unnamed: 0,A,B
0,2.0,2.0
1,2.0,2.0
2,2.0,2.0
3,5.0,5.0
4,5.0,5.0
5,5.0,5.0
6,7.5,7.5
7,7.5,7.5


In [70]:
# denses 是hive 的dense_rank()
df.rank(method="dense")

Unnamed: 0,A,B
0,1.0,1.0
1,1.0,1.0
2,1.0,1.0
3,2.0,2.0
4,2.0,2.0
5,2.0,2.0
6,3.0,3.0
7,3.0,3.0


In [18]:
# 1. average 是hive row_number()
df.rank(method="first")

Unnamed: 0,A,B
0,1.0,1.0
1,2.0,2.0
2,3.0,3.0
3,4.0,4.0
4,5.0,5.0
5,6.0,6.0
6,7.0,7.0
7,8.0,8.0


#### lag  lead  first_value  last_value
- 一些其他的常用开窗函数
1. lag(col,n,DEFAULT)  取分组内排序后，从当前行起 往上第n行 col列的值
1. lead(col,n,DEFAULT) 取分组内排序后，从当前行起 往下第n行 col列的值
1. first_value(col) 取分组内排序后，截止到当前行 第一个值
1. last_value(col) 取分组内排序后，截止到当前行 最后一个值
** 注意这几个函数一定要指定 order by 子句 不然顺序无法保证**

In [30]:
# hive 方式的开窗函数 
# spark_df = spark.createDataFrame(tips_df)
# spark_df.createOrReplaceTempView("tips_table")
# spark_df.show(3)
sql = '''
select
    id, 
    lag(tip, 2, 3333.0) over (partition by sex order by size desc) as lag_val,
    lead(tip, 2, 3333.0) over (partition by sex order by size desc) as lead_val,
    first_value(tip) over (partition by sex order by size desc) as first_val,
    last_value(tip) over (partition by sex order by size desc) as last_val
from tips_table
order by id
'''
lag_hive_df = spark.sql(sql)
lag_hive_df.show(10)

+---+-------+--------+---------+--------+
| id|lag_val|lead_val|first_val|last_val|
+---+-------+--------+---------+--------+
|  0|    3.0|    2.75|      4.2|     3.0|
|  1|    3.0|    3.71|      6.7|    5.92|
|  2|    2.0|    3.35|      6.7|    5.92|
|  3|    3.0|    1.96|      6.7|    1.75|
|  4|    5.0|    2.45|      4.2|    3.09|
|  5|    2.0|     3.0|      6.7|     2.0|
|  6|   5.92|    3.23|      6.7|    1.75|
|  7|    3.0|    7.58|      6.7|     2.0|
|  8|   3.31|    1.71|      6.7|    1.75|
|  9|    2.0|    1.57|      6.7|    1.75|
+---+-------+--------+---------+--------+
only showing top 10 rows



- spark中的函数名称需要注意一下 不是first_value是first
- 其他的操作参考 rank等开窗函数

In [33]:
window_2 = Window.partitionBy("sex").orderBy(F.col("size").desc())
rank_spark_df = spark_df.select(F.col("id"),
                                F.lag("tip", 2, 3333.0).over(window_2).alias("lag_val"),
                                F.lead("tip", 2, 3333.0).over(window_2).alias("lead_val"),
                                F.first("tip").over(window_2).alias("first_val"), 
                                F.last("tip").over(window_2).alias("last_val")).orderBy(F.col("id"))
rank_spark_df.show(10)

+---+-------+--------+---------+--------+
| id|lag_val|lead_val|first_val|last_val|
+---+-------+--------+---------+--------+
|  0|    3.0|    2.75|      4.2|     3.0|
|  1|    3.0|    3.71|      6.7|    5.92|
|  2|    2.0|    3.35|      6.7|    5.92|
|  3|    3.0|    1.96|      6.7|    1.75|
|  4|    5.0|    2.45|      4.2|    3.09|
|  5|    2.0|     3.0|      6.7|     2.0|
|  6|   5.92|    3.23|      6.7|    1.75|
|  7|    3.0|    7.58|      6.7|     2.0|
|  8|   3.31|    1.71|      6.7|    1.75|
|  9|    2.0|    1.57|      6.7|    1.75|
+---+-------+--------+---------+--------+
only showing top 10 rows

