In [60]:
import numpy as np
import scipy.stats as sts
import matplotlib.pyplot as plt

In [61]:
SETA = (np.random.random(10)*10).astype(int)
SETB = (np.random.random(10)*10).astype(int)

## $ 首先回忆一下概率计算单事件的概率为P(A),不发生的概率就是1-P(A) $
## $ 两个独立事件的交集是 P(A n B) = P(A) P(B) $
## $ 如果两个事件互斥，也就是同时只能发生一个，那事件不发生的概率就是 P(A) + P(B) $
## $如果事件不是相互排斥的，那么,两个事件同时不反升的概率就是 P(A U B) = P(A) + P(B) - P(A n B)
## 总结：概率的交集用乘法概率的并集用加法$


## 概率统计函数

## 事件概率加法

In [62]:
def P(SET,A):
    m = SET.shape[0]
    return np.dot((SET==A).astype(int),np.ones(m))/np.array([m])

In [63]:
P(SETA,3),P(np.array(['a','b','c']),'a')

(array([0.1]), array([0.33333333]))

## 相反概率

In [64]:
def PContrary(SET,A):
    return 1-P(SETA,A)

In [65]:
PContrary(SETA,3)

array([0.9])

## 联合事件概率计算

In [66]:
def PUnin(SETList,eventList):
    if len(SETList)==len(eventList):
        step1 = np.array([P(sets , even) for sets , even in zip(SETList,eventList)])
        return np.cumprod(step1)[-1:]

In [67]:
PUnin([SETA,SETB],[1,1])

array([0.01])

## 联合事件的反的概率

In [68]:
def PUninInverse(SETList,eventList):
    return 1-PUnin(SETList,eventList)

In [69]:
PUninInverse([SETA,SETB],[1,1])

array([0.99])

## 相互独立事件的联合事件发生概率

In [70]:
def PIndependentUnin(SETAList,eventList):
    if len(SETAList)==len(eventList):
        m = len(SETAList)
        step = np.array([P(sets,even) for sets , even in zip(SETAList,eventList)])
        return np.dot(step.T,np.ones((m)))
    else:
        return np.nan

In [71]:
PIndependentUnin([SETA,SETB],[1,1])

array([0.2])

## 相互独立事件的联合事件概率的反概率

In [72]:
def PIndependentUninsInverse(SETList,eventList):
    m = len(SETList)
    return m-PIndependentUnin(SETList,eventList)

In [73]:
PIndependentUninsInverse([SETA,SETB],[1,1])

array([1.8])

## 如果相互独立的事件不是互斥的，则联合概率的反概率为

In [74]:
def P2(SETList,eventList):
    return PIndependentUnin(SETList,eventList)-PUnin(SETList,eventList)

In [75]:
P2([SETA,SETB],[1,1])

array([0.19])

## 条件概率

In [76]:
def PAandB(SETA,SETB,A,B):
    step1 , step2 = (P(SETA,A),P(SETB,B))
    return PUnin([SETA,SETB],[A,A])/P(SETB,B)


In [77]:
PAandB(SETA,SETB,4,2)[0]

0.0

## 如果我们有俩个不相关的事件x和y，那么我们观察到的俩个事件同时发生时获得的信息应该等于观察到的事件各自发生时获得的信息之和
## $ h(x,y)=h(x)+h(y) $
## 由于x，y是俩个不相关的事件，那么满足
## $ p(x,y) = p(x)*p(y) $

In [78]:
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@File    :   MathZXB.py
@Time    :   2020/04/27 09:52:06
@Author  :   DataMagician 
@Version :   1.0
@License :   DataMagician
@Desc    :   None
'''

# here put the import lib
import numpy as np
import math
from scipy import stats as sst
import os
from math import log
from scipy.optimize import curve_fit

def longerror(func):
    def In(*vars):
        try :
            return func(*vars),
        except Exception as e :
            __import__('cgitb').enable ( format = 'text' )
        return func (*vars)
    return In

def check(func):
    def In(*data):
        try:
            return func(*data)
        except Exception :
            return func(*data),
    return In


class stats:

    def __init__(self):
        pass


    @longerror
    @check
    def dim2_to_dim1(self,data)->iter:
        '''
        二维降一维
        :param data:
        :return:
        '''
        return [j for i in range(len(data)) for j in data[i]]

    def spread(self,arg):
        '''
        广播函数
        :param arg:
        :return:
        '''
        resultsp = []
        for i in arg:
            if isinstance(i, (list,tuple)):
                resultsp.extend(i)
            else:
                resultsp.append(i)
        return resultsp

    @longerror
    @check
    def deep_flatten(self,ls)->list:
        '''
        深度平展
        :param lst:
        :return:
        '''
        result = []
        result.extend(self.spread(list(map(lambda x: self.deep_flatten(x) if type(x) in (list,tuple) else x, ls))))
        return result


    @longerror
    @check
    def mean(self,vector):
        n = vector.shape[0]
        return np.dot(vector,np.ones(n))/n
    
    @longerror
    @check
    def average(self,vector,weights=None):
        '''加权平均'''
        n = vector.shape[0]
        if weights:
            weights_arr = np.array(weights)
            n2 = weights_arr.shape
            if n2 == n:
                return np.dot(vector,vector[::-1])/weights_arr.sum()
            else:
                return 'ERROR : the weights length does not meet the requirements'
        else:
            return np.dot(vector,vector[::-1])/np.dot(vector[::-1],np.ones(n))
    

    
    @longerror
    @check
    def quadraticsum(self,vector):
        return np.dot(vector,vector.T)

    @longerror
    @check
    def samplevar(self,vector):
        m = vector.shape
        cur = (vector-self.avg(vector))
        return np.dot(cur,cur.T)/(m[0]-1)

    @longerror
    @check
    def populationvar(self,vector):
        m = vector.shape
        cur = (vector-self.avg(vector))
        return np.dot(cur,cur.T)/m[0]


    @longerror
    @check
    def samplestd(self,vector):
        return np.sqrt(self.samplevar(vector))

    @longerror
    @check
    def populationstd(self,vector):
        return np.sqrt(self.populationvar(vector))

    @longerror
    @check
    def cov(self,vector1,vector2):
        m = vector1.shape[0]
        return np.dot((vector1-self.avg(vector1)),(vector2-self.avg(vector2)))/(m-1)

    @longerror
    @check
    def coe(self,vector):
        return self.populationstd(vector)/self.avg(vector)

    @longerror
    @check
    def zscore(self,vector):
        return (vector-self.avg(vector))/self.populationstd(vector)

    @longerror
    @check
    def pearson(self,vector1,vector2):
        n = vector1.shape[0]
        sum_arr1 , sum_arr2 = vector1.sum() , vector2.sum()
        sum_pow_arr1,sum_pow_arr2 = np.dot(vector1,vector1) , np.dot(vector2,vector2)
        p_sum_arr = np.dot(vector1,vector2)
        cov = p_sum_arr-(sum_arr1*sum_arr2/n)
        std = np.sqrt((sum_pow_arr1 - (sum_arr1** 2) / n) * (sum_pow_arr2 - (sum_arr2** 2) / n))
        return cov/std

    @longerror
    @check
    def MSE(self,yhat,y):
        '''
        残差平方和
        :param yhat:
        :param y:
        :return:
        '''
        return np.dot(yhat-self.avg(y),yhat-self.avg(y))

    @longerror
    @check
    def EquationRegression(self,X,Y,predict):
        '''
        回归方程
        :param X:
        :param Y:
        :param predict:
        :return:
        '''
        try:
            xm,xn = X.shape
            ym,yn = Y.shape
        except Exception:
            xm,xn = X.shape
            Y = Y[:,None]
        finally:
            newX = np.c_[np.ones(xm),X]
            fit = np.dot(np.dot(np.linalg.inv(np.dot(newX.T,newX)),newX.T),Y)
            predictX = np.dot(np.r_[np.ones(1),np.array(predict)],fit)
            return fit,predictX

    @longerror
    @check
    def pearsonMove(self,vector1,vector2,alpha):
        '''
        滑动皮尔逊系数范围
        :param vector1:
        :param vector2:
        :param alpha:
        :return:
        '''
        n1,n2 = vector1.shape[0] , vector2.shape[0]
        return np.array([self.pearson(vector1[:i+alpha],vector2[:i+alpha]) for i in range(n1//alpha)])

    @longerror
    @check
    def personConstand(self,vector1,vector2,alpha):
        '''
        定长滑动皮尔逊系数
        :param vector1:
        :param vector2:
        :param alpha:
        :return:
        '''
        n1,n2 = vector1.shape[0] , vector2.shape[0]
        return np.array([self.pearson(vector1[:i+alpha],vector2[:i+alpha]) for i in range(0,n1//alpha)])

    @longerror
    @check
    def personMat(self,vector):
        '''
        皮尔逊矩阵
        :param vector:
        :return:
        '''
        mean = np.mean(vector,axis=0)
        std = np.std(vector,axis=0)
        zscore = (vector-mean)/std
        return  np.corrcoef(zscore)

    @longerror
    @check
    def P(self,SET,symbols,A):
        m = SET.shape[0]
        return np.dot(eval('SET'+symbols+'A').astype(int),np.ones(m))/m

    @longerror
    @check
    def PInverse(self,SET,symbols,A):
        '''
        反概率
        :param SET:
        :param A:
        :return:
        '''
        return (1-self.P(SET,symbols,A)[0])

    @longerror
    @check
    def PUnin(self,SETList,symblos,eventList):
        '''
        有BUG，要该，数据格式为numpy
        联合事件概率计算
        :param SETList:
        :param eventList:
        :return:
        '''
        if len(SETList)==len(eventList):
            step1 = np.array([self.P(sets,symblos,even) for sets , even in zip(SETList,eventList)])
            return np.cumprod(step1)[-1:]
        else:
            return np.nan

    @longerror
    @check
    def PUninInverse(self,SETList,symbols,eventList):
        '''
        联合事件的反的概率
        :param SETList:
        :param symbols:
        :param eventList:
        :return:
        '''
        return 1-self.PUnin(SETList,symbols,eventList)[0]

    @longerror
    @check
    def PIndependentUnin(self,SETAList,symbols,eventList):
        '''
        相互独立且互斥事件的联合事件发生概率
        :param SETAList:
        :param symbols:
        :param eventList:
        :return:
        '''
        if len(SETAList)==len(eventList):
            m = len(SETAList)
            step = np.array([self.P(sets,symbols,even) for sets , even in zip(SETAList,eventList)])
            return np.dot(step.T,np.ones((m)))
        else:
            return np.nan

    @longerror
    @check
    def PIndependentUninsInverse(self,SETList,symbols,eventList):
        '''
        相互独立且互斥事件的联合事件概率的反概率
        :param SETList:
        :param symbols:
        :param eventList:
        :return:
        '''
        m = len(SETList)
        return m-self.PIndependentUnin(SETList,symbols,eventList)

    @longerror
    @check
    def PIndependentUninsSameTime(self,SETList,symbols,eventList):
        '''
        如果相互独立的事件不是互斥的，则联合概率的反概率为
        :param SETList:
        :param symbols:
        :param eventList:
        :return:
        '''
        return self.PIndependentUnin(SETList,symbols,eventList)-self.PUnin(SETList,symbols,eventList)

    @longerror
    @check
    def PAandB(self,SETA,SETB,A,B,symbols):
        '''
        条件概率
        :param SETA:
        :param SETB:
        :param A:
        :param B:
        :return:
        '''
        return self.PUnin([SETA,SETB],symbols,[A,B])[0]/self.P(SETB,symbols,A)[0]



    @longerror
    @check
    def sigmoid(self,z):
        '''
        逻辑曲线
        :param z:
        :return:
        '''
        return 1/(1+np.exp(-z))

    @longerror
    @check
    def frequency(self,data)->list:
        '''
        基础频率分析
        :param data:
        :return:
        '''
        dropdata = self.deep_flatten(data)
        setdata = tuple(set(dropdata))
        vector = np.array([setdata.index(i) for i in dropdata])
        logistic = set(self.sigmoid(vector))
        name =  np.array(['name','p','sigmoid','count'])
        unin = np.array([(i,dropdata.count(i)/len(data),l,dropdata.count(i)) for i,l in zip(setdata,logistic)])
        return {'unin':[name,unin],'element':setdata,'count':len(dropdata)}

    @longerror
    @check
    def compare(self,Asrc,Bsrc)->list:
        '''
        双向量比较
        :param Asrc:
        :param Bsrc:
        :return:
        '''
        Asrc,Bsrc = self.deep_flatten(Asrc),self.deep_flatten(Bsrc)
        drop = lambda data:[j for i in range(len(data)) for j in data[i]]
        A,B = drop(Asrc),drop(Bsrc)
        Am ,Bm = len(A),len(B)
        thesame = [[(A[An],An,Bn)for Bn in range(Bm) if  A[An]==B[Bn]] for An in range(Am)]
        different = [[(A[An],An,Bn)for Bn in range(Bm) if  A[An]!=B[Bn]] for An in range(Am)]
        return {'different':drop(sorted(different)),'thesame':drop(sorted(thesame))}

    def entropyPx(self,X,symbol,x,In='bit'):
        n = X.shape[0]
        px = np.dot(np.array(eval('X'+symbol+'x')).astype(int)/n,np.ones(n))
        if px == 0 :
            return px*0
        if px == 1:
            return 0
        else:
            if In == 'bit':
                return -px * math.log(px,2)
            elif In == 'nat':
                return -px * math.log(px,math.e)

  
    def entropyPX(self,X,symbol='==',In='bit'):
        n = X.shape[0]
        PXi= np.array([self.entropyPx(X,symbol,x,In) for x in X])
        PX = np.dot(PXi,np.ones(n).T)
        return {'entropy':PX,'probability':{x:p for x,p in zip(X,PXi)}}


    @longerror
    @check
    def cal_entropy(self,data)->list:
        '''
        信息熵计算
        :param data:
        :return:
        '''
        ltries = len(data)
        labelcounts = {}
        for feat in data:
            label = feat[-1]
            if label not in labelcounts.keys():
                labelcounts[label] = 0
            labelcounts[label] += 1
        entropy = 0.0
        for key in labelcounts.keys():
            p_i = float(labelcounts[key]/ltries)
            entropy -= p_i * log(p_i,2)
        return entropy

    def bayesTestPAcount(self,SETA,SETB,symbolsA,symbolsB,A,B):
        PB = self.P(SETB,symbolsB,B)[0]
        if PB > 0:
            PA = self.P(SETA,symbolsA,A)[0]
            PAB = PB*PA
            return np.array([(PAB*PA)/PB])
        else:
            return np.nan
    
    def BayesianInference(self,prior,PA,PB):
        inverseOfPrior = 1-prior
        bayesFactor = PA/PB
        return (prior/inverseOfPrior)*bayesFactor,bayesFactor

    def L2(self,theta):
        try:
            theta.shape[-1]
            return np.dot((theta*theta).T,np.ones(theta.shape[0]))
        except Exception:
            return np.dot((theta*theta),np.ones(theta.shape[0]))



    def logistic_model_fitting(self,data):
        '''
        a为速度
        b数量的极大值
        c预期最终总数
        x是作图的x轴值
        y是作图的y轴值
        logisticNumber 是拟合的logistic曲线
        fit[1]是协方差矩阵
        endNumber是修正后的最终值
        endTime最终周期预测长度
        '''
        data = [i for i in data ]
        from scipy.optimize import curve_fit,fsolve
        len_ = len(data)
        x = list(range(len_))
        y = list(data)
        funtion = lambda a,b,c,x : c/(1+np.exp(-(x-b)/a))
        fit = curve_fit(logistic_model,x,y,p0=[2,100,200000])
        a,b,c= tuple(fit[0])
        logisticNumber = funtion(a,b,c,x)
        endNumber = c-fit[1][-1]
        endTime = int(fsolve(lambda x : logistic_model(x,a,b,c) - int(c),b))-x[-1]
        return (a,b,c,x,y,fit[1],logisticNumber,endNumber,endTime)
    
    def logistic_model_plot(self,data):
        a,b,c,x,y,fit1,logistic_number,logisticNumber,endNumber,endTime = self.logistic_model1(data)
        plt.figure(figsize=(10,8))
        plt.scatter(x,y,c='g')
        plt.plot(logistic_number,c='r')
        plt.scatter(endTime,endNumber,'b')
        plt.show()
        plt.close()

    


    


    def __del__(self):
        print("{}{}".format(stats.__name__,"is over"))


### 计算公式 :
    P 是概率
    检验的公式为:
    在B条件成立下，事件A的概率也成立,此时，事件A的概率 =（B的概率*A的概率）/ B的概率
## $P(A|B)=P(A|B)*P(A)/P(B)$

In [79]:
s = stats()  

In [80]:
s.average(SETA,[1,2,2,1,1,1]),s.mean(SETA)

(('ERROR : the weights length does not meet the requirements',), (4.5,))

In [81]:
s.bayesTestPAcount(SETA,SETB,'>','<', 2,1)

array([0.64])

In [82]:
s.entropyPX(SETA)

{'entropy': 4.108241808752196,
 'probability': {7: 0.33219280948873625,
  5: 0.5287712379549449,
  0: 0.33219280948873625,
  1: 0.33219280948873625,
  3: 0.33219280948873625,
  8: 0.33219280948873625,
  6: 0.33219280948873625}}

### >例2 概率更新:
### 判断一个人是不是星球大战的粉丝
### 过程1：
    初始信息:
    人群中星球大战的粉丝=60%
    非星球大战的粉丝=40%
    判断1，某人看过星球大战概率是60%
### 如果判断1成立:
    更新信息1:
    最新看过星球大战的人里粉丝概率是99%
    不是粉丝而因为其他原因去看的人概率是5%
### 判断2，某事是星球大战粉丝的概率为:
    -> 99% / 5% = 198% <- (这里的198%为叶贝斯因子，代表通过学习获得的概率更新)
### 总结:
    此时的后验概率：
    (60%/40%) * (99%/5%) =>  1.5 * 1.98 => 2.97

In [83]:
def BayesianInference(prior,PA,PB):
    ''''''
    inverseOfPrior = 1-prior
    bayesFactor = PA/PB
    return (prior/inverseOfPrior)*bayesFactor,bayesFactor

In [84]:
BayesianInference(0.6,0.99,0.05)

(29.699999999999992, 19.799999999999997)

In [85]:

def add(**data):
    print(data)
add(data={'a':1,'b':4,'c':5})

{'data': {'a': 1, 'b': 4, 'c': 5}}


In [86]:
def add(*data):
    sum = 0
    for i in data:
        sum = sum + i
        print(sum)
    return sum
add(1,2,3,4)

1
3
6
10


10

## 拟合多项式结果

In [87]:
from scipy.optimize import curve_fit

def testfunc(x, a):
    return min(np.sin(x)*a*np.random.randn(100))

a,x = curve_fit(testfunc,np.arange(100),np.arange(100)[::-1])
a,x

(array([0.99999816]), array([[2.2095364e-12]]))

In [88]:
from scipy.stats import stats

In [89]:
def logistic_model(data):
    '''
    a为感染速度
    b为感染发生最多的一天
    c是在感染结束时记录的感染者总数
    '''
    from scipy.optimize import curve_fit
    len_ = data
    x = list(range(len_))
    y = list(data)
    funtion = lambda a,b,c,x : c/(1+np.exp(-(x-b)/a))
    fit = curve_fit(logistic_model,x,y,p0=[2,100,20000])
    a,b,c= tuple(fit[0])
    return fit


# L1正则化

In [90]:
def L1(theta):
    try:
        theta.shape[-1]
        return np.dot((theta*theta).T,np.ones(theta.shape[0]))
    except Exception:
        return np.dot((theta*theta),np.ones(theta.shape[0]))

In [91]:
theta = np.array([2,2,1,2])

In [92]:
L1(theta)

13.0

## 霍夫曼编码

In [93]:
def huffmanCoding(data):
    try:
        n = data.shape[0]
        frequency = [( i ,np.dot((data==i),np.ones(n))/n ) for i in np.unique(data)]
        List = []
        ListL = []
        ListR = []
        n = -1
        for i in frequency:
            newNode = min(frequency)
            if n == 0:
                List.append(newNode)
            else:
                ListL.append([frequency[-1]])
                frequency.pop
                ListR.append([frequency[-1]])
                frequency.pop
                List.append(ListL.ListR)
        return List            
    except Exception as Error:
        return Error

In [94]:
huffmanCoding(np.array([1,2,3,12,1]))

AttributeError("'list' object has no attribute 'ListR'")

## 加权平均值

In [95]:
def average(vector,weights=None):
    n = vector.shape[0]
    if weights:
        return np.dot(vector,vector[::-1])/np.array(weights).sum()
    else:
        return np.dot(vector,vector[::-1])/np.dot(vector[::-1],np.ones(n))
    

In [96]:
average(theta,weights=[6])

2.0