In [1]:
from multiprocessing import Pool, TimeoutError
import time
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.stats as sts
import math
import itertools
import pickle
import datetime
from numba import jit, cuda, float32, float64

In [2]:
def set_initial_guess(xdata, k):
    np.random.seed(1234)
# set pi mu sigma
    pis = np.random.random(k)
    pis *= 0.9
    pis += 0.1
    pis /= np.sum(pis)
# mu
    mus = np.zeros(k)
# sigma
    sigmas = np.random.random(k)
    sigmas *= 1.5
    sigmas += 0.25
    sigmas *= np.std(xdata)
    return pis, mus, sigmas

In [3]:
df = pd.read_csv('RI.IMOEX_180323_180424_5min.csv', sep=';')
# drop last from every day
df = df.groupby('<DATE>').apply(lambda x: x.iloc[:-2] if len(x) > 1 else x).reset_index(drop=True)

In [4]:
data = []
prevopen = 0
for index, row in df.iterrows():
    if prevopen != 0:
        adata = row[4] - prevopen
        data = np.append(data, adata)
    prevopen = row[4]

In [5]:
text_file = open("d://tmp//base_2.txt", "r")
lines = text_file.readlines()

In [6]:
a = []
for i in lines:
    a.append(float(i))

In [7]:
data = a

In [8]:
data = data[0:1040]

In [9]:
print(data)

[-8.23, 0.33, -0.76, 4.03, -1.35, -2.78, -2.93, 2.59, 0.65, 9.46, 0.38, 2.37, 1.64, -4.04, 1.36, 2.85, -2.91, -0.08, 1.58, 0.58, -2.25, -3.1, -0.67, -0.75, 2.31, -3.66, -1.98, 3.24, -2.12, -0.64, 0.87, -0.88, -2.23, 2.2, -1.53, 0.14, -1.86, 2.35, 2.2, 0.46, -0.2, 0.7, 1.64, 2.4, -0.13, 1.14, 1.1, -0.08, 0.88, 1.03, 1.14, -0.09, -0.25, -0.27, -1.4, -0.82, 0.16, 0.9, 1.01, 0.23, 5.69, -0.91, -1.6, 0.31, -0.2, -0.76, 0.96, 3.03, -0.86, -0.5, -1.4, 1.23, 0.96, 0.3, -0.83, 1.46, -0.26, 1.71, 0.4, -4.08, -0.11, -0.27, 1.23, -0.26, 1.64, -2.48, -1.67, -0.89, -1.67, 2.9, 3.77, -2.22, 1.22, -0.22, 1.28, 0.27, -1.66, -1.11, 0.29, -1.78, 0.3, -2.63, 1.8, 3.32, -4.42, -3.63, -3.18, -1.03, -0.58, 0.15, 0.47, 2.45, 1.01, -2.04, 0.26, -2.12, -0.45, -0.97, 1.12, -0.1, 0.4, -0.95, -0.19, 2.57, -1.14, -2.56, -0.44, -0.86, 2.35, -0.29, 1.94, 0.38, -2.16, 0.95, 1.56, 1.27, -2.34, 1.22, 0.85, 1.57, 1.61, -0.15, 0.15, -0.05, -0.68, -1.28, 1.63, 1.73, -1.81, 0.27, 1.18, -1.58, -0.05, -0.4, -0.31, -1.09, 1.55

In [10]:
k = 10

In [11]:
pis = [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]
mus = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
sigmas = [0.5, 2.0, 2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.8, 3.0]

In [12]:
tol = 0.01
max_iterations = 2
n = len(data)
# ем алг
ll_old = 0
ll_new = 0
for i in range(max_iterations):
    print('{3} [{0}:{1}]EM iteration {2}'.format(iter, os.getpid(), i, str(datetime.datetime.now())))
    pis, mus, sigmas = set_initial_guess(data, k)
# E step
    ws = np.zeros((k, n))
    for j in range(len(mus)):
        for i in range(n):
            ws[j, i] = pis[j] * sts.norm(mus[j], sigmas[j]).pdf(data[i])
    ws /= ws.sum(0)

# M-step
    pis = np.zeros(k)
    for j in range(len(mus)):
        for i in range(n):
            pis[j] += ws[j, i]
    pis /= n

    mus = np.zeros(k)
    for j in range(k):
        for i in range(n):
            mus[j] += ws[j, i] * data[i]
        mus[j] /= ws[j, :].sum()

    sigmas = np.zeros(k)
    for j in range(k):
        for i in range(n):
            sigmas[j] += ws[j, i] * ((data[i] - mus[j]) ** 2)
        sigmas[j] /= ws[j, :].sum()
        sigmas[j] = math.sqrt(sigmas[j])

    print('pis {0}'.format(pis))
    print( 'mus {0}'.format(mus))
    print( 'sigmas {0}'.format(sigmas))

    #
    # update complete log likelihoood
    ll_new = 0.0
    for i in range(n):
        s = 0
        for j in range(k):
            s += pis[j] * sts.norm(mus[j], sigmas[j]).pdf(data[i])
        ll_new += np.log(s)

    if np.abs(ll_new - ll_old) < tol:
        break
    # print 'll = {0}\n'.format(ll_new)
    ll_old = ll_new

2019-05-03 19:30:34.988358 [<built-in function iter>:12844]EM iteration 0
pis [0.04628683 0.10520102 0.0729331  0.11785439 0.13554533 0.05361471
 0.05555699 0.15007071 0.13786118 0.12507573]
mus [ 0.07363021  0.04728053  0.00891124  0.00376498  0.07205053  0.03402416
  0.04682327  0.01507309 -0.00673703 -0.08702928]
sigmas [1.23747533 1.50756625 1.88585143 1.96415632 1.26125159 1.62213229
 1.51149328 0.50244646 2.1658563  2.87240044]
2019-05-03 19:30:51.917360 [<built-in function iter>:12844]EM iteration 1
pis [0.04628683 0.10520102 0.0729331  0.11785439 0.13554533 0.05361471
 0.05555699 0.15007071 0.13786118 0.12507573]
mus [ 0.07363021  0.04728053  0.00891124  0.00376498  0.07205053  0.03402416
  0.04682327  0.01507309 -0.00673703 -0.08702928]
sigmas [1.23747533 1.50756625 1.88585143 1.96415632 1.26125159 1.62213229
 1.51149328 0.50244646 2.1658563  2.87240044]


In [18]:
ws[1]

array([0.00136423, 0.08941365, 0.11118894, ..., 0.08903831, 0.08731396,
       0.02198288])

In [21]:
math.sqrt(2*math.pi)

2.5066282746310002

In [None]:
def worker_task(iter, zdata, k, pis, mus, sigmas):
    print("{2} start worker {0}:{1}\n".format(iter, os.getpid(), str(datetime.datetime.now())))
    pis, mus, sigmas = set_initial_guess(zdata, k)
    ll, pi, mu, sigma = em_alg_decomposition(iter, zdata, k, pis, mus, sigmas)
    print("{2} end wkr {0}:{1}\n".format(iter, os.getpid(), str(datetime.datetime.now())))
    return [ll, pi, mu, sigma]


def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return worker_task(*a_b)

@jit
def preparedatas():

    return datas


# main
if __name__ == '__main__':
    slswindowlen = 7
    pool = Pool(processes=8)  # start 8 worker processes

# init


# prepare data
    datas = preparedatas()

# start
    k = 10

# set initial guess
    pis, mus, sigmas = set_initial_guess(datas[0], k)
    result = pool.map(func_star, zip(range(len(datas)), datas, itertools.repeat(k), itertools.repeat(pis), itertools.repeat(mus), itertools.repeat(sigmas)))

    print(result)
    serialized = pickle.dump(result, open("Output_180323_180424_5min.txt", "wb"))
