# 混洗（Shuffling）机制

## 数值稳定算法 ！！！
- Prob1: 在似然函数中，需要计算$\log \rho, \log (1-\rho)$
  - 因此当$\rho$过于接近0或1时，数值计算不稳定
- Prob2: 另外，$\rho(x) = 1/ (1-e^{-x})$，此时当$x$较小，$e^{-x}$趋向于正无穷，$\rho$的计算也会变得不稳定

**Prob1 解决方法**

sigmoid函数有如下等价形式
$$\text{sigmoid}(x)=\frac{1}{1+e^{-x}}=\frac{e^x}{1+e^{x}}$$

因此对于正数，可以用左式，对于负数用右式计算，这样便避免了inf的出现

在具体的实现中，既可以通过直接调用现成的函数`scipy.special.expit()`

也可以借用语句`np.where()`

```python
x = np.array()
a = np.array()
b = np.array()
np.where(x>0, a,b) #如果判定为True则在True的位置把对应位置的a元素取得，若判定为False则把b对应位置的元素取得
```

**Prob2 解决方法**
- 在前述中处理的办法为增加一个较小数避免直接取0/1
- 更优的一个处理在于首先计算出其解析值：
$$\log\rho(x)= x-\log(1+e^x)$$
$$\log(1-\rho(x))=-\log(1+e^x)$$
- 记 $s(x) = \log (1+e^x)$ (SoftPlus 函数)
  - $x<0$ 计算无异常
  - $x\ge0, \log(1+e^x)=\log(e^x(e^{-x}+1))=x+\log(1+e^{-x})$
- *思考如何向量化实现该内容*
- 事实上，在计算logistics的似然函数时，就应该直接将$\log\rho$展开，以$s(x)$的更具体的函数形式进行表达
- 因此避免在具体实现中显式出现sigmoid

## RDD缓存机制

- 注意lec9-logistic-regression中有一个语句`dat.cache()`
- 对于迭代式算法，若需要对RDD进行反复迭代读取，就可以事先对RDD进行缓存
- 缓存可以避免中间的一些重复性读取操作
- 相当于将计算后的RDD取值直接记录在内存中，当后续需要时可以直接进行读取
- 相当于用空间换时间，对于每一台运算的机器的内存资源的最大化
- 具体实现：`rdd.cache()`
- 在初始化的时候也需要对默认设置进行声明

## 混洗 Shuffling
- `repartition()` 重新划分分区后可能会打乱数据顺序
- 因此需要注意算法是否会依赖于数据顺序（行打乱是否有影响）
  - 不依赖顺序的算法：排列不变性，通常是一些汇总计算例如计算方差、回归系数、似然函数等
  - 依赖排序等算法：通常是与观测相关的计算，例如观测的预测值等
- 增加分区数很有可能会触发混洗
- 若为了减小分区，可以用`coalesce()`替代`repartition()`避免出现shuffle
- 若在增加分区时不想影响其顺序，可以用`rdd.zipWithIndex()`增加一个顺序信息（id索引）
  - 利用这个函数也可以直接对rdd内容进行行操作等
  - 在进行`repartition()`后尽管顺序依然打乱，但是由于保留了`index()`因此可以通过后续的排序等操作重新整理
- 总而言之，增加分区会触发混洗，导致一定的混乱；但另一方面由于并行计算的需求，有时可能还是需要进行repartition。因此这里需要进行tradeoff

In [2]:
import findspark
findspark.init("/Users/xinby/Library/Spark")

from pyspark.sql import SparkSession
# 本地模式
spark = SparkSession.builder.\
    master("local[*]").\
    appName("PySpark RDD").\
    getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")
print(spark)
print(sc)

<pyspark.sql.session.SparkSession object at 0x7ff0c8f00d60>
<SparkContext master=local[*] appName=PySpark RDD>


创建一个简单的 RDD：

In [3]:
import string

rdd = sc.parallelize(string.ascii_uppercase, numSlices=5)
print(rdd.collect())

['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']


In [4]:
rdd.getNumPartitions()

5

将每个分区的字母合并成一个字符串，从而可以知道分区是如何划分的：

In [5]:
def concat_letters(iter):
    yield "".join(iter)

rdd.mapPartitions(concat_letters).collect()

                                                                                

['ABCDE', 'FGHIJ', 'KLMNO', 'PQRST', 'UVWXYZ']

将 RDD 重新分区，可以看出来字母的顺序产生了变化：

In [6]:
rdd2 = rdd.repartition(6)
print(rdd2.collect())

['A', 'B', 'C', 'D', 'E', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'K', 'L', 'M', 'N', 'O', 'F', 'G', 'H', 'I', 'J']


In [7]:
rdd2.mapPartitions(concat_letters).collect()

['', 'ABCDE', 'PQRSTUVWXYZ', '', 'KLMNO', 'FGHIJ']

使用 `toDebugString()` 查看 RDD 包含的操作：

In [8]:
print(rdd.toDebugString().decode("UTF-8"))
print("\n\n")
print(rdd2.toDebugString().decode("UTF-8"))

(5) ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274 []



(6) MapPartitionsRDD[6] at coalesce at DirectMethodHandleAccessor.java:104 []
 |  CoalescedRDD[5] at coalesce at DirectMethodHandleAccessor.java:104 []
 |  ShuffledRDD[4] at coalesce at DirectMethodHandleAccessor.java:104 []
 +-(5) MapPartitionsRDD[3] at coalesce at DirectMethodHandleAccessor.java:104 []
    |  PythonRDD[2] at RDD at PythonRDD.scala:53 []
    |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274 []


可以看出来 `rdd2` 包含了混洗（shuffling）操作，这也是数据顺序打乱的原因。

如果需要减小分区数目，可以使用 `coalesce()` 函数，避免混洗操作：

In [9]:
rdd3 = rdd.coalesce(numPartitions=2, shuffle=False)
print(rdd3.collect())
rdd3.mapPartitions(concat_letters).collect()

['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z']


['ABCDEFGHIJ', 'KLMNOPQRSTUVWXYZ']

In [10]:
print(rdd3.toDebugString().decode("UTF-8"))

(2) CoalescedRDD[8] at coalesce at DirectMethodHandleAccessor.java:104 []
 |  ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274 []


如果确实需要增加分区数目，同时希望保持数据顺序，可以在原始 RDD 中增加索引信息：

In [11]:
print(rdd.zipWithIndex().collect())

[('A', 0), ('B', 1), ('C', 2), ('D', 3), ('E', 4), ('F', 5), ('G', 6), ('H', 7), ('I', 8), ('J', 9), ('K', 10), ('L', 11), ('M', 12), ('N', 13), ('O', 14), ('P', 15), ('Q', 16), ('R', 17), ('S', 18), ('T', 19), ('U', 20), ('V', 21), ('W', 22), ('X', 23), ('Y', 24), ('Z', 25)]


In [12]:
rdd4 = rdd.zipWithIndex().repartition(5)
print(rdd4.collect())

[('A', 0), ('B', 1), ('C', 2), ('D', 3), ('E', 4), ('K', 10), ('L', 11), ('M', 12), ('N', 13), ('O', 14), ('P', 15), ('Q', 16), ('R', 17), ('S', 18), ('T', 19), ('F', 5), ('G', 6), ('H', 7), ('I', 8), ('J', 9), ('U', 20), ('V', 21), ('W', 22), ('X', 23), ('Y', 24), ('Z', 25)]


然后定义一个分区映射函数，在合并数据行时获取分区中第一条数据的索引：

In [13]:
def concat_letters_with_order(iter):
    letters_and_indices = list(iter)
    letters = map(lambda x: x[0], letters_and_indices)
    indices = map(lambda x: x[1], letters_and_indices)
    if len(letters_and_indices) < 1:
        yield ()
    else:
        first_ind = next(indices)
        combined_letters = "".join(letters)
        yield combined_letters, first_ind

rdd5 = rdd4.mapPartitions(concat_letters_with_order)
print(rdd5.collect())

[(), ('ABCDE', 0), ('KLMNOPQRST', 10), ('FGHIJ', 5), ('UVWXYZ', 20)]


然后过滤空分区，并按生成的索引对 RDD 进行排序：

In [14]:
rdd5.filter(lambda x: len(x) > 1).collect()

[('ABCDE', 0), ('KLMNOPQRST', 10), ('FGHIJ', 5), ('UVWXYZ', 20)]

In [15]:
rdd5.filter(lambda x: len(x) > 1).sortBy(lambda x: x[1], ascending=True).collect()

[('ABCDE', 0), ('FGHIJ', 5), ('KLMNOPQRST', 10), ('UVWXYZ', 20)]

23/04/27 22:12:02 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:593)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$1(BlockManagerMasterEndpoint.scala:592)
	at org.apache.spar

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 53192)
Traceback (most recent call last):
  File "/Users/xinby/opt/anaconda3/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Users/xinby/opt/anaconda3/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Users/xinby/opt/anaconda3/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Users/xinby/opt/anaconda3/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/Users/xinby/Library/Spark/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/Users/xinby/Library/Spark/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/Users/xinby/Library/Spark/python/pyspark/accumulators.py", line 257