# Лекция 8 "Ускоряем Haskell"

__План:__
- DList
 - Зачем?
 - DList
- Strictness
  - foldr & foldl
  - foldl'
  - NFData
  - BangPatterns
  - Про строгие вычисления
  - Result
- Space leaks
- Deforestation
- Stream
- Mutable objects
- Дополнение про оценку времени

## DList

### Зачем?

В некоторых случаях конкатенация массивов занимает больше времени, чем необходимо:

```
(++) :: [a] -> [a] -> [a]
[]     ++ b = b
(x:xs) ++ b = x : (xs ++ b)

sum :: [Int] -> Int
sum (x:xs) = x + sum xs
sum []     = 0

a =  [1..n] ++ ([1..m]  ++ [1..k]) 
   = [1..n] ++ [1..m, 1..k]      -- m operations
   = [1..n, 1..m, 1..k]          -- n operations
                                 -- concatenated for m + n
b = ([1..n] ++ [1..m]) ++ [1..k] 
   = [1..n, 1..m] ++ [1..k]      -- n operations
   = [1..n, 1..m, 1..k]          -- n + m operations
                                 -- concatenated for 2n + m operations in total
```

Казалось бы, результат один, а время вычислений __разное__. Поэтому решили создать __особый массив__, который __не будет иметь таких side-effect-ов__.

### DList

Реализация:

```
-- import Data.DList
newtype DList a = DL { unDL :: [a] -> [a] }

fromList :: [a] -> DList a
fromList l = DL (l++)

toList :: DList a -> [a] 
toList (DL lf) = lf []

append :: DList a -> DList a -> DList a
(DL f) `append` (DL g) = DL $ \xs -> f (g xs)  -- append = mappend = <>

DL f <> (DL g <> DL h) ≡ DL f <> DL (\xs -> g (h xs)) 
                       ≡ DL f <> DL (\xs -> g' ++ (h' ++ xs))  -- t ≡ (\xs -> ...)
                       ≡ DL f <> DL t
                       ≡ DL $ \ys -> f (t ys) 
                       ≡ DL $ \ys -> f' ++ (t ys)
                       ≡ DL $ \ys -> f' ++ (g' ++ (h' ++ ys)) 

(DL f <> DL g) <> DL h ≡ DL (\xs -> f (g xs)) <> DL h 
                       ≡ DL (\xs -> f' ++ (g' ++ xs)) <> DL h  -- t ≡ (\xs -> ...)
                       ≡ DL t <> DL h
                       ≡ DL $ \ys -> t (h ys) 
                       ≡ DL $ \ys -> t (h' ++ ys)
                       ≡ DL $ \ys -> f' ++ (g' ++ (h' ++ ys)) 
```

Как результат, мы видим, что время вычислений, будет постоянным, как бы мы не расставляли порядок вычислений. 

* __Замечание:__ Если хотим эффективную реализацию списков, то можно использовать __`Seq`__ из пакета __`Data.Sequence`__. Внутри список реализован через __Декартово-дерево__.

## Strictness (Строгие вычисления)

Чтобы зафорсить вычисления до __Слабой Головной Нормальной Формы__ используют функцию __`seq`__ - она ворсит вычисления первого аргумента:

```
seq :: a -> b -> b  -- just a model, not a real implementation
_|_ `seq` _ = _|_ -- bottom ~ udefined
_   `seq` b = b
```

Примеры:

```
ghci> 0 `seq` 10
10

ghci> undefined `seq` 10
*** Exception: Prelude.undefined

ghci> Just undefined `seq` 10
10        -- (͡° ͜ʖ ͡°)

data    DataWrapper a    = DW a
newtype NewtypeWrapper a = NW a

ghci> DW undefined `seq` 42
42
ghci> NW undefined `seq` 42
*** Exception: Prelude.undefined
```

Из примеров видно как работает эта функция. 

### _foldr_ vs _foldl_

__`foldr`__ и __`foldl`__ весьма медленные и приводят к __space-leak-ам__. Поэтому при вычислении суммы от _10^8_ элементов, программа крашится:

```
module Main where

main :: IO ()
main = print $ foldr (+) 0 [1..10^8]
-- упадет по памяти и времени --
```

Другие примеры:

```
foldr :: (a -> b -> b) -> b -> [a] -> b
foldr _ z []     = z
foldr f z (x:xs) = x `f` foldr f z xs 

foldl :: (b -> a -> b) -> b -> [a] -> b
foldl _ z []     = z
foldl f z (x:xs) = foldl f (z `f` x) xs

sum [1, 2, 3] ≡ foldr (+) 0 [1, 2, 3]
              ≡ 1 + foldr (+) 0 [2, 3]
              ≡ 1 + (2 + foldr (+) 0 [3])
              ≡ 1 + (2 + (3 + foldr (+) 0 []))
              ≡ 1 + (2 + (3 + 0))
              ≡ 1 + (2 + 3)
              ≡ 1 + 5
              ≡ 6

sum [1, 2, 3] ≡ foldl (+) 0 [1, 2, 3]
              ≡ foldl (+) (0 + 1) [2, 3]
              ≡ foldl (+) ((0 + 1) + 2) [3]
              ≡ foldl (+) (((0 + 1) + 2) + 3) []
              ≡ ((0 + 1) + 2) + 3
              ≡ (1 + 2) + 3
              ≡ 3 + 3
              ≡ 6

> foldr (&&) False (repeat False) -- сразу вернёт False
> foldl (&&) False (repeat False) -- упадёт по памяти и времени
```

### _foldl'_

Функция __`foldl'`__ работает на основе функции __`seq`__:

```
foldl' :: (a -> b -> a) -> a -> [b] -> a
foldl' f a []     = a
foldl' f a (x:xs) = let a' = f a x
                    in seq a' (foldl' f a' xs)
```

Пример:

```
sum [1, 2, 3] ≡ foldl' (+) 0 [1, 2, 3]
              ≡ foldl' (+) 1 [2, 3]
              ≡ foldl' (+) 3 [3]
              ≡ foldl' (+) 6 []
              ≡ 6
```

Но есть ситуации когда __WHNF__ не является панацеей:

```
f (acc, len) x = (acc + x, len + 1)

foldl' f (0, 0) [1, 2, 3]
 = foldl' f (0 + 1, 0 + 1) [2, 3]
 = foldl' f ((0 + 1) + 2, (0 + 1) + 1) [3]
 = foldl' f (((0 + 1) + 2) + 3, ((0 + 1) + 1) + 1) []
 = (((0 + 1) + 2) + 3, ((0 + 1) + 1) + 1)
```

Поэтому народ написал функцию __`deepseq`__, которая доводит все до __NF__:

```
ghci> import Control.DeepSeq
ghci> [1,2,undefined] `seq` 3
3
ghci> [1,2,undefined] `deepseq` 3
*** Exception: Prelude.undefined

-- ??
ghci> repeat False `seq`     15 -- return False
ghci> repeat False `deepseq` 15 -- упадет по памяти и времени
```

### NFData

Для вычислений в __NF__ создали специальную структуру:

```
class NFData a where  -- Normal Form Data
    rnf :: a -> ()
    rnf a = a `seq` ()

instance NFData a => NFData (Maybe a) where
    rnf Nothing  = ()
    rnf (Just x) = rnf x

instance NFData a => NFData [a] where
    rnf []     = ()
    rnf (x:xs) = rnf x `seq` rnf xs

deepseq :: NFData a => a -> b -> b
a `deepseq` b = rnf a `seq` b
```

### __`-XBangPatterns`__

Для упрощения жизни строгие вычисления были добавлены в расширение __`-XBangPatterns`__. С его помощью можно легко в коде указывать места, где мы хотим зафорсить вычисления:

```
{-# LANGUAGE BangPatterns #-}

sum :: Num a => [a] -> a
sum = go 0
  where
    go !acc (x:xs) = go (acc + x) xs
    go  acc []     = acc

-- Это всего лишь синтаксический сахар для вот такой записи: --
sum :: Num a => [a] -> a
sum = go 0
  where
    go acc _ | acc `seq` False = undefined  -- that's why we need `seq`
    go acc (x:xs)              = go (acc + x) xs
    go acc []                  = acc
```

Ещё примеры:

```
g (!x, y) = [x,y]

let (!x,[y]) = e in b

stackOps :: State Stack Int  -- type Stack = [Int]
stackOps = do 
    !x <- pop
    push 42
    return x

($!) :: (a -> b) -> a -> b                 -- strict function application
f $! x = let !vx = x in f vx

($!!) :: (NFData a) => (a -> b) -> a -> b  -- deep strict function application
f $!! x = x `deepseq` f x

randomSum :: Int -> IO Int
randomSum n = do 
    randList <- replicateM n $ randomRIO (0, 10)
    return $! sum randList
```

Также можно проводить и обратные манипуляции, т. е. указать, что вычисления оставить ленивыми:

```
f :: (a, b) -> Int
f (a, b) = const 1 a   -- pair pattern is too strict

g :: (a, b) -> Int
g ~(a, b) = const 1 a  -- irrefutable pattern ≡ lazy pattern

ghci> f undefined
*** Exception: Prelude.undefined

ghci> g undefined
1

lazyHead :: [a] -> a
lazyHead ~[]    = undefined
lazyHead ~(x:_) = x

f1 :: Either e Int -> Int
f1 ~(Right 1) = 42

ghci > f (Left "kek")
42
ghci > f (error "mda")
42
```

### Ещё немного про строгие вычисления

Пусть у нас есть некоторая структура:

```
data Config = Config
    { users ::  Int
    , extra ::  Maybe Settings
    } deriving (Show)
```

Мы хотим задать поля как __strict__. Почему бы и нет :)

```
data Config = Config  -- with strict fields
    { users :: !Int
    , extra :: !(Maybe Settings)  
    } deriving (Show)
```

Но можно ещё лучше - подключим расширешие задающее все поля структур как __strict__:

```
{-# LANGUAGE StrictData #-}

data Config = Config
    { users :: Int
    , extra :: Maybe Settings  
    } deriving (Show)
```

Можно и все сделать строким, но тогда исчезнет вся фишка _Haskell_-я:

```
{-# LANGUAGE Strict #-}  -- but nobody uses this
```

### Вывод

__Зачем же использовать строгие вычисления ?__

1. When your program runs slow or crashes with StackOverflow: strict evaluation could help.
2. Arithmetic operations (+, *, square, etc.)
3. Reduce grow of calling functions (or in recursive call case):
```
f x = g $! (h x)
f x = g x -- no strict application, all is ok
f !x = g (h x) -- f is strict by argument, no need to call $!
```
not necessary rule but it could be helpful
4. Fields of data types (to avoid space leaks)

## Немного про __Space leaks__

```
-- Беды с утечками --
bad :: [Double] -> (Double, Int)
bad xs = (Prelude.sum xs, Prelude.length xs)  -- unavoidable space leak here

import Data.List (foldl')

bad2 :: Num a => [a] -> (a, Int)
bad2 xs = foldl' step (0, 0) xs
  where
    step (x, y) n = (x + n, y + 1)
```

Как избежать ?  
Используй библиотеку [Gabriel foldl](https://www.stackage.org/package/foldl)

```
import qualified Control.Foldl as L

good :: [Double] -> (Double, Int)
good xs = L.fold ((,) <$> L.sum <*> L.length) xs

ghci> let average = (/) <$> L.sum <*> L.length
ghci> L.fold average [1..10]
5.5
```

## Deforestation

__Deforestation__ - это ещё один способ оптимизации кода. Его основная идея: избавиться от лишнего копирования данных и ускорение обращения к ним.

```
func = foldr (+) 0 . map (\x -> x * 10)  -- input
```

Лишнее копирование массива. Что ж, давайте начнем избавление от него:

```
-- step 0: rewrite composition to application
func l = foldr (+) 0 (map (\x -> x * 10) l)

-- step 1: inline foldr body
func l = case (map (\x -> x * 10) l) of [] -> 0
                                        (x:xs) -> x + (foldr (+) 0 xs)  

-- step 2: inline map body and unfold lambda
func l = case (case l of [] -> []
                         (y:ys) -> y * 10 : map (*10) ys) of
              [] -> 0
              (x:xs) -> x + (foldr (+) 0 xs)  

-- step 3: applying case-of-case transformation
func l = case l of [] -> (case [] of [] -> 0
                                     (x:xs) -> x + (foldr (+) 0 xs))
                   (y:ys) -> (case (y * 10 : map (*10) ys) of
                                   [] -> 0
                                   (x:xs) -> x + (foldr (+) 0 xs)) 

-- step 4: unfold inner cases by constructors
func l = case l of [] -> 0
                   (y:ys) -> 10 * y + (foldr (+) 0 (map (*10) ys))

-- step 5: replace last call with recursive call
func l = case l of [] -> 0
                   (y:ys) -> 10 * y + func ys
```

В итоге мы не копируем массив лишний раз, а меняем элементы по одному и сразу засовываем в аккумулятор.

Таким образом мы __уменьшили__ количество потребляймой __памяти и времени__. 

## Streams

Мы знаем потоки в других ЯП, давайте рассмотрим их и в __Haskell__:

```
data Step s a = Done 
              | Skip    s 
              | Yield a s

data Stream a = forall s . Stream (s -> Step s a) s
                                     │            │
                                     │            └── stream itself
                                     │
                                     └─── stream extractor
-- вспомогательные функции --
stream :: forall a . [a] -> Stream a
stream xs = Stream next xs 
  where
    next :: [a] -> Step [a] a
    next []     = Done
    next (x:xs) = Yield x xs

unstream :: forall a . Stream a -> [a]
unstream (Stream next s0) = go s0 
  where
    go s = case next s of 
             Done       -> []
             Skip s'    -> go s'
             Yield a s' -> a : go s'
-- функции для работы с потоками --
mapS :: forall a b . (a -> b) -> Stream a -> Stream b
mapS f (Stream next s) = Stream next' s 
  where
    next' xs = case next xs of 
                    Done       -> Done
                    Skip s'    -> Skip s'
                    Yield a s' -> Yield (f a) s'

filterS :: forall a . (a -> Bool) -> Stream a -> Stream a
filterS p (Stream next s) = Stream next' s 
  where
    next' xs = case next xs of 
                    Done       -> Done
                    Skip s'    -> Skip s'
                    Yield a s' -> if p a then Yield a s' else Skip s'
```

Пример работы с потоком:

```
map :: (a -> b) -> [a] -> [b]
map f = unstream . mapS f . stream

filter :: (a -> Bool) -> [a] -> [a]
filter p = unstream . filterS p . stream

foo ≡            map  show          .            filter  even
    ≡            map  show          . unstream . filterS even . stream
    ≡ unstream . mapS show . stream . unstream . filterS even . stream
                               │          │
                               └────┬─────┘
                                    │
                            nuked by rewrite rule
```

Как можно заметить, есть __`stream . unstream`__, которые немного бесполезны, давайте скажем об этом компилятору явно, используя так называемый __rewrite rule__:

```
{-# RULES "stream/unstream" 
    forall (s :: Stream a) . stream (unstream s) = s 
  #-}
```

Вот так теперь выглядит последовательность оптимизаций:

__Sequences:__ Stream fusion ≫= Rewrite Rules ≫= Deforestation.

## Mutable Objects

В хаскеле можно использовать изменяемые переменные __вне `IO`__ с помощью __`ST`__ монады:

```
-- import Control.Monad.ST
data ST s a  -- The strict state-transformer monad

runState :: State s a -> s -> (a, s)  -- use evalState with state to get result
runST    :: (forall s. ST s a) -> a   -- forall trick

data STRef s a  -- a mutable variable

newSTRef    :: a -> ST s (STRef s a) 
readSTRef   :: STRef s a -> ST s a
writeSTRef  :: STRef s a -> a -> ST s ()
modifySTRef :: STRef s a -> (a -> a) -> ST s () 
```

По сути это __`IO`__  монады без возможности взаимодействия с внешним миром. 

Пример работы:

```
import Control.Monad.ST
import Data.STRef
import Data.Foldable

sumST :: Num a => [a] -> a
sumST xs = runST $ do
    n <- newSTRef 0
    for_ xs $ \x ->
        modifySTRef n (+x)
    readSTRef n
```

Так же существуют __изменяемые массивы__:

```
class Monad m => MArray a e m where  -- type class for all arrays

-- import Data.Array.ST
data STArray s i e :: * -> * -> * -> *  -- Mutable, boxed, non-strict arrays
-- s: the state variable argument for the ST type
-- i: the index type of the array (should be an instance of Ix), usually Int
-- e: the element type of the array.

data STUArray s i e  -- A mutable array with unboxed elements 
                     -- (Int, Double, Bool, etc.)

newArray   :: Ix i => (i, i) -> e -> m (a i e)
readArray  :: (MArray a e m, Ix i) => a i e -> i -> m e
writeArray :: (MArray a e m, Ix i) => a i e -> i -> e -> m ()
```

Ну и чего уж там, есть и __динамические массивы__ (Vector):

```
data Vector    a  -- immutable vectors
data MVector s a  -- mutable vectors

-- immutable vectors
(!) :: Vector a -> Int -> a  -- O(1) indexing
fromList :: [a] -> Vector a
map, filter, etc. 

-- mutable vectors
read   :: (PrimMonad m, MVector v a) => v (PrimState m) a -> Int -> m a
write  :: (PrimMonad m, MVector v a) => v (PrimState m) a -> Int -> a -> m ()
grow   :: PrimMonad m => MVector (PrimState m) a -> Int -> m (MVector (PrimState m) a)
freeze :: PrimMonad m => MVector (PrimState m) a -> m (Vector a)
-- holy types O_O
```

Сортировка вставкой на основе изменяемых дин. массивов:

```
{-# LANGUAGE FlexibleContexts #-}
module VecInsertionSort where

import           Control.Monad.ST
import           Data.Foldable (forM_)
import           Control.Monad (unless)
import qualified Data.Vector.Unboxed         as V
import qualified Data.Vector.Unboxed.Mutable as M

sort :: [Int] -> [Int]  -- sort on mutable arrays
sort list = runST $ do
    let listSize = length list
    vec <- V.thaw $ V.fromList list :: ST s (M.MVector s Int)

    forM_ [1..listSize - 1] $ \i -> do
        let jScan j
                | j >= 0 = do
                    cur  <- M.read vec j
                    next <- M.read vec (j + 1)
                    unless (cur <= next) $ do M.swap vec j (j + 1)
                                              jScan (j - 1)
                | otherwise = return ()
        jScan (i - 1)

    resVec <- V.freeze vec
    return $ V.toList resVec
```

Выглядит как императивный код :)

## Допонительно*

__Как измерить время исполнения той или иной функции ?__

Запустите код с флагом __`-s`__:

```
ghci> :set +s
```

Ну или для более комплексного анализа используйте библиотеку [Criterion](https://hackage.haskell.org/package/criterion).