In [1]:
import os
import sys
import glob
import re
import ast
import warnings

import csv
import json
import pickle

import math
import random
import numpy as np
import scipy as sp
import datetime as dt
import pandas as pd
import swifter
from scipy.stats import gaussian_kde
from scipy.integrate import quad
from scipy.optimize import minimize
from sklearn.metrics import mean_squared_error

import portion as P
import itertools as it
import copy
from tqdm.notebook import tqdm
from collections import namedtuple
from pprint import pprint
from pytictoc import TicToc

import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns

# Configure display options
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 100)
%config InlineBackend.figure_format = 'retina'

# Set plot style
# plt.style.use('ggplot')

In [2]:
def estimate_conditional_parameters(data1, data2):
    data = {
        'first_distribution_samples': np.array(data1),  # 第一個分布的樣本數據
        'second_distribution_samples': np.array(data2)  # 第二個分布的樣本數據
    }
    
    # 定義對數似然函數
    def log_likelihood(params, data):
        p, success_prob_when_first_is_success, success_prob_when_first_is_failure = params
        first_samples = data['first_distribution_samples']
        second_samples = data['second_distribution_samples']
        
        log_likelihood_val = 0.0
        for first_sample, second_sample in zip(first_samples, second_samples):
            # 添加檢查以確保概率值在有效範圍內
            if not 0 <= success_prob_when_first_is_success <= 1:
                return float('inf')  # 返回無窮大值表示非法參數
            if not 0 <= success_prob_when_first_is_failure <= 1:
                return float('inf')  # 返回無窮大值表示非法參數
            # 計算對數似然函數值
            if first_sample == 1:
                log_likelihood_val += np.log(success_prob_when_first_is_success if second_sample == 1 else (1 - success_prob_when_first_is_success))
            else:
                log_likelihood_val += np.log(success_prob_when_first_is_failure if second_sample == 1 else (1 - success_prob_when_first_is_failure))
                
        return -log_likelihood_val  # 取負對數似然函數，因為我們要最大化對數似然函數

    # 使用最大似然估計估算參數
    initial_guess = [0.5, 0.5, 0.5]  # 初始猜測值
    result = minimize(log_likelihood, initial_guess, args=(data,), method='Nelder-Mead')

    # 輸出估計的參數值
    estimated_params = result.x
    return np.mean(data1), estimated_params[1], estimated_params[2]

def estimate_joint_parameters(data1, data2):
    p = np.mean(data1)
    q = np.mean(data2)
    data = [(element1, element2) for element1, element2 in zip(data1, data2)]
    a = np.mean([1 if pair == (1, 1) else 0 for pair in data])
    b = np.mean([1 if pair == (1, 0) else 0 for pair in data])
    c = np.mean([1 if pair == (0, 1) else 0 for pair in data])
    d = np.mean([1 if pair == (0, 0) else 0 for pair in data])
    return p, q, a, b, c, d

In [3]:
def joint_to_conditional(a, b, c, d):
    """
    a := P(X = 1, Y = 1)
    b := P(X = 1, Y = 0)
    c := P(X = 0, Y = 1)
    d := P(X = 0, Y = 0)
    alpha := P(Y = 1 | X = 1)
    beta  := P(Y = 1 | X = 0)
    gamma := P(X = 1 | Y = 1)
    delta := P(X = 1 | Y = 0)
    """
    p = min(max(1e-9, a + b), 1 - 1e-9)
    q = min(max(1e-9, a + c), 1 - 1e-9)
    alpha = a / p
    beta = c / (1 - p)
    gamma = a / q
    delta = b / (1 - q)
    return alpha, beta, gamma, delta

def contional_to_joint(p, q, alpha, beta, gamma, delta):
    """
    a := P(X = 1, Y = 1)
    b := P(X = 1, Y = 0)
    c := P(X = 0, Y = 1)
    d := P(X = 0, Y = 0)
    alpha := P(Y = 1 | X = 1)
    beta  := P(Y = 1 | X = 0)
    gamma := P(X = 1 | Y = 1)
    delta := P(X = 1 | Y = 0)
    """
    a = p * alpha
    b = (1 - q) * delta
    c = (1 - p) * beta
    d = 1 - a - b - c
    return a, b, c, d

def calculate_rho_conditional(p, q, alpha):
    """
    p := P(X = 1); P(X = 0) = 1 - p
    q := P(Y = 1); P(Y = 0) = 1 - q
    alpha := P(Y = 1 | X = 1); P(Y = 0 | X = 1) = 1 - alpha
    rho: correlation coefficient
    """
    sigma = max(math.sqrt(p * q * (1 - p) * (1 - q)), 1e-9)  # sigma_x * sigma_y
    rho = (p * alpha - p * q) / sigma
    return rho

def calculate_rho_joint(p, q, a):
    """
    p := P(X = 1); P(X = 0) = 1 - p
    q := P(Y = 1); P(Y = 0) = 1 - q
    a := P(X = 1, Y = 1)
    rho: correlation coefficient
    """
    sigma = max(math.sqrt(p * q * (1 - p) * (1 - q)), 1e-9)  # sigma_x * sigma_y
    rho = (a - p * q) / sigma
    return rho

def calculate_conditional_probabilities(p, q, rho):
    """
    p := P(X = 1); P(X = 0) = 1 - p
    q := P(Y = 1); P(Y = 0) = 1 - q
    rho: correlation coefficient
    alpha := P(Y = 1 | X = 1)
    beta  := P(Y = 1 | X = 0)
    gamma := P(X = 1 | Y = 1)
    delta := P(X = 1 | Y = 0)
    """
    sigma = max(math.sqrt(p * q * (1 - p) * (1 - q)), 1e-9)  # sigma_x * sigma_y
    p = min(max(1e-9, p), 1 - 1e-9)
    q = min(max(1e-9, q), 1 - 1e-9)
    alpha = (p * q + rho * sigma) / p
    beta = (q * (1 - p) - rho * sigma) / (1 - p)
    gamma = (p * q + rho * sigma) / q
    delta = (p * (1 - q) - rho * sigma) / (1 - q)
    return alpha, beta, gamma, delta

def calculate_joint_probabilities(p, q, rho):
    """
    p := P(X = 1); P(X = 0) = 1 - p
    q := P(Y = 1); P(Y = 0) = 1 - q
    rho: correlation coefficient
    a := P(X = 1, Y = 1)
    b := P(X = 1, Y = 0)
    c := P(X = 0, Y = 1)
    d := P(X = 0, Y = 0)
    """
    sigma = max(math.sqrt(p * q * (1 - p) * (1 - q)), 1e-9)  # sigma_x * sigma_y
    a = p * q + rho * sigma
    b = p * (1 - q) - rho * sigma
    c = q * (1 - p) - rho * sigma
    d = (1 - p) * (1 - q) + rho * sigma
    return a, b, c, d

In [4]:
def generate_random_boolean(probability_true):
    return 1 if random.random() < probability_true else 0

def generate_data_single(p, size=10000):
    data = np.array([generate_random_boolean(p) for _ in range(size)])
    return data

def generate_data_conditional(p, q, rho, size=10000):
    alpha, beta, gamma, delta = calculate_conditional_probabilities(p, q, rho)
    
    # a, b, c, d = contional_to_joint(p, q, alpha, beta, gamma, delta)
    # print("p, q, rho:", p, q, rho)
    # print("alpha, beta, gamma, delta:", alpha, beta, gamma, delta)
    # print("a, b, c, d:", a, b, c, d)
    # print(sum([a, b, c, d]))
    
    data1 = np.array([generate_random_boolean(p) for _ in range(size)])
    data2 = np.array([generate_random_boolean(alpha) if element == 1 else generate_random_boolean(beta) for element in data1])
    return data1, data2

def generate_data_joint(p, q, rho, size=10000):
    a, b, c, d = calculate_joint_probabilities(p, q, rho)
    events = [1, 2, 3, 4]
    probabilities = [a, b, c, d]
    
    # alpha, beta, gamma, delta = joint_to_conditional(a, b, c, d)
    # print("p, q, rho:", p, q, rho)
    # print("a, b, c, d:", a, b, c, d)
    # print("alpha, beta, gamma, delta:", alpha, beta, gamma, delta)
    # print(sum([a, b, c, d]))
    
    premiere_data = np.random.choice(events, size=size, p=probabilities)
    data = [(1 if element in [1, 2] else 0, 1 if element in [1, 3] else 0) for element in premiere_data]
    data1 = np.array([pair[0] for pair in data])
    data2 = np.array([pair[1] for pair in data])
    return data1, data2

In [5]:
def rho_restriction(p, q):
    sigma = max(math.sqrt(p * q * (1 - p) * (1 - q)), 1e-9)  # sigma_x * sigma_y
    R1 = P.closed(-1, 1)  # -1 <= rho <= 1
    R2 = P.closed(-(p * q) / sigma, (1 - p * q) / sigma)  # 0 <= P(X=1, Y=1) <= 1
    R3 = P.closed((p * (1 - q) - 1) / sigma, p * (1 - q) / sigma)  # 0 <= P(X=1, Y=0) <= 1
    R4 = P.closed((q * (1 - p) - 1) / sigma, q * (1 - p) / sigma)  # 0 <= P(X=0, Y=1) <= 1
    R5 = P.closed(-((1 - p) * (1 - q)) / sigma, (1 - (1 - p) * (1 - q)) / sigma)  # 0 <= P(X=0, Y=0) <= 1
    R = R1 & R2 & R3 & R4 & R5
    return R

In [6]:
p = 0.7
q = 0.9
n = 20
rho_bound = rho_restriction(p, q)
print("rho restriction:", rho_bound)
lower_bd = rho_bound.lower
upper_bd = rho_bound.upper
step = (upper_bd - lower_bd) / n

table = pd.DataFrame(columns="p, q, rho, alpha, beta, gamma, delta, a, b, c, d, sum".split(", "))
rho = lower_bd
for i in range(n + 1):
    alpha, beta, gamma, delta = calculate_conditional_probabilities(p, q, rho)
    a, b, c, d = calculate_joint_probabilities(p, q, rho)
    probability = [p, q, rho, alpha, beta, gamma, delta, a, b, c, d, sum([a, b, c, d])]
    for i, item in enumerate(probability):
        probability[i] = round(item, 3)
    table.loc[len(table)] = probability    
    rho += step

display(table)

rho restriction: [-0.2182178902359924,0.5091750772173154]


Unnamed: 0,p,q,rho,alpha,beta,gamma,delta,a,b,c,d,sum
0,0.7,0.9,-0.218,0.857,1.0,0.667,1.0,0.6,0.1,0.3,-0.0,1.0
1,0.7,0.9,-0.182,0.864,0.983,0.672,0.95,0.605,0.095,0.295,0.005,1.0
2,0.7,0.9,-0.145,0.871,0.967,0.678,0.9,0.61,0.09,0.29,0.01,1.0
3,0.7,0.9,-0.109,0.879,0.95,0.683,0.85,0.615,0.085,0.285,0.015,1.0
4,0.7,0.9,-0.073,0.886,0.933,0.689,0.8,0.62,0.08,0.28,0.02,1.0
5,0.7,0.9,-0.036,0.893,0.917,0.694,0.75,0.625,0.075,0.275,0.025,1.0
6,0.7,0.9,-0.0,0.9,0.9,0.7,0.7,0.63,0.07,0.27,0.03,1.0
7,0.7,0.9,0.036,0.907,0.883,0.706,0.65,0.635,0.065,0.265,0.035,1.0
8,0.7,0.9,0.073,0.914,0.867,0.711,0.6,0.64,0.06,0.26,0.04,1.0
9,0.7,0.9,0.109,0.921,0.85,0.717,0.55,0.645,0.055,0.255,0.045,1.0


# Validate formulas

In [7]:
# 測試函數
# data1 = np.random.randint(2, size=10000)
# data2 = np.random.randint(2, size=10000)

# 隨機生成序列1
data1 = np.random.randint(2, size=10000)  # 生成長度為 10 的 {0, 1} 序列
# 創建具有相關性的序列2
correlation = 0.3  # 相關性的強度
# 對序列1進行微調以生成序列2
data2 = np.array([1 if np.random.rand() < correlation * element else 0 for element in data1])

print("Random sequence 1:", data1)
print("Random sequence 2:", data2)

rho = np.corrcoef(data1[:], data2[:])[0, 1]
print("rho:", rho)

Random sequence 1: [0 0 0 ... 1 0 1]
Random sequence 2: [0 0 0 ... 0 0 0]
rho: 0.41954535327628617


In [8]:
p, alpha, beta = estimate_conditional_parameters(data1[:], data2[:])
q, gamma, delta = estimate_conditional_parameters(data2[:], data1[:])
rho1 = calculate_rho_conditional(p, q, alpha)
rho2 = calculate_rho_conditional(q, p, gamma)

print("Use simple average to estimate p, q")
print("Use maximum likelihood to estimate alpha, beta, gamma, delta")
print("----------------------------------------------------------")
print("Estimated parameters:")
print(f"p := P(X=1): {p}")
print(f"q := P(Y=1): {q}")
print(f"alpha := P(Y=1 | X=1) {alpha}")
print(f"beta  := P(Y=1 | X=0) {beta}")
print(f"gamma := P(X=1 | Y=1) {gamma}")
print(f"delta := P(X=1 | Y=0) {delta}")
print("rho:", rho, rho1, rho2)
print("----------------------------------------------------------")

Use simple average to estimate p, q
Use maximum likelihood to estimate alpha, beta, gamma, delta
----------------------------------------------------------
Estimated parameters:
p := P(X=1): 0.4965
q := P(Y=1): 0.1479
alpha := P(Y=1 | X=1) 0.29787533671463823
beta  := P(Y=1 | X=0) 1.0154079569508447e-09
gamma := P(X=1 | Y=1) 0.9999999999840055
delta := P(X=1 | Y=0) 0.4091064642091209
rho: 0.41954535327628617 0.4195177733908629 0.4195453532629588
----------------------------------------------------------


In [9]:
p, q, a, b, c, d = estimate_joint_parameters(data1[:], data2[:])
rho1 = calculate_rho_joint(p, q, a)

print("Use simple average to estimate p, q, a, b, c, d")
print("----------------------------------------------------------")
print("Estimated parameters:")
print(f"p := P(X=1): {p}")
print(f"q := P(Y=1): {q}")
print(f"a := P(X=1, Y=1) {a}")
print(f"b := P(X=1, Y=0) {b}")
print(f"c := P(X=0, Y=1) {c}")
print(f"d := P(X=0, Y=0) {d}")
print("rho:", rho, rho1)
print("----------------------------------------------------------")

Use simple average to estimate p, q, a, b, c, d
----------------------------------------------------------
Estimated parameters:
p := P(X=1): 0.4965
q := P(Y=1): 0.1479
a := P(X=1, Y=1) 0.1479
b := P(X=1, Y=0) 0.3486
c := P(X=0, Y=1) 0.0
d := P(X=0, Y=0) 0.5035
rho: 0.41954535327628617 0.4195453532762863
----------------------------------------------------------


# Generate data: conditional

In [10]:
p, q = 0.9, 0.3
rho_bound = rho_restriction(p, q)
print(rho_bound)
rho = -0.4

[-0.5091750772173154,0.21821789023599233]


In [11]:
data1, data2 = generate_data_conditional(p, q, rho)

print("----------------------------------------------------------")
print("Random sequence 1:", data1)
print("Random sequence 2:", data2)
print("p, q, rho:", p, q, rho)

rho = np.corrcoef(data1[:], data2[:])[0, 1]
print("----------------------------------------------------------")
print("rho:", rho)

----------------------------------------------------------
Random sequence 1: [1 1 1 ... 1 0 1]
Random sequence 2: [0 0 0 ... 0 1 0]
p, q, rho: 0.9 0.3 -0.4
----------------------------------------------------------
rho: -0.38419176718188447


In [12]:
p, alpha, beta = estimate_conditional_parameters(data1[:], data2[:])
q, gamma, delta = estimate_conditional_parameters(data2[:], data1[:])
rho1 = calculate_rho_conditional(p, q, alpha)
rho2 = calculate_rho_conditional(q, p, gamma)

print("Use simple average to estimate p, q")
print("Use maximum likelihood to estimate alpha, beta, gamma, delta")
print("----------------------------------------------------------")
print("Estimated parameters:")
print(f"p := P(X=1): {p}")
print(f"q := P(Y=1): {q}")
print(f"alpha := P(Y=1 | X=1) {alpha}")
print(f"beta  := P(Y=1 | X=0) {beta}")
print(f"gamma := P(X=1 | Y=1) {gamma}")
print(f"delta := P(X=1 | Y=0) {delta}")
print("rho:", rho, rho1, rho2)
print("----------------------------------------------------------")

Use simple average to estimate p, q
Use maximum likelihood to estimate alpha, beta, gamma, delta
----------------------------------------------------------
Estimated parameters:
p := P(X=1): 0.9011
q := P(Y=1): 0.2978
alpha := P(Y=1 | X=1) 0.23959412521752743
beta  := P(Y=1 | X=0) 0.8281150439128597
gamma := P(X=1 | Y=1) 0.7250157463309137
delta := P(X=1 | Y=0) 0.9757960582442076
rho: -0.38419176718188447 -0.3842044674591928 -0.38412079092516327
----------------------------------------------------------


In [13]:
p, q, a, b, c, d = estimate_joint_parameters(data1[:], data2[:])
rho1 = calculate_rho_joint(p, q, a)

print("Use simple average to estimate p, q, a, b, c, d")
print("----------------------------------------------------------")
print("Estimated parameters:")
print(f"p := P(X=1): {p}")
print(f"q := P(Y=1): {q}")
print(f"a := P(X=1, Y=1) {a}")
print(f"b := P(X=1, Y=0) {b}")
print(f"c := P(X=0, Y=1) {c}")
print(f"d := P(X=0, Y=0) {d}")
print("rho:", rho, rho1)
print("----------------------------------------------------------")

Use simple average to estimate p, q, a, b, c, d
----------------------------------------------------------
Estimated parameters:
p := P(X=1): 0.9011
q := P(Y=1): 0.2978
a := P(X=1, Y=1) 0.2159
b := P(X=1, Y=0) 0.6852
c := P(X=0, Y=1) 0.0819
d := P(X=0, Y=0) 0.017
rho: -0.38419176718188447 -0.38419176718188436
----------------------------------------------------------


# Generate data: joint

In [14]:
p, q = 0.9, 0.3
rho_bound = rho_restriction(p, q)
print(rho_bound)
rho = -0.4

[-0.5091750772173154,0.21821789023599233]


In [15]:
data1, data2 = generate_data_joint(p, q, rho)

print("Random sequence 1:", data1)
print("Random sequence 2:", data2)
print("p, q, rho:", p, q, rho)

rho = np.corrcoef(data1[:], data2[:])[0, 1]
print("----------------------------------------------------------")
print("rho:", rho)

Random sequence 1: [1 1 0 ... 0 0 1]
Random sequence 2: [0 0 0 ... 1 1 0]
p, q, rho: 0.9 0.3 -0.4
----------------------------------------------------------
rho: -0.39573468888946894


In [16]:
p, alpha, beta = estimate_conditional_parameters(data1[:], data2[:])
q, gamma, delta = estimate_conditional_parameters(data2[:], data1[:])
rho1 = calculate_rho_conditional(p, q, alpha)
rho2 = calculate_rho_conditional(q, p, gamma)

print("Use simple average to estimate p, q")
print("Use maximum likelihood to estimate alpha, beta, gamma, delta")
print("----------------------------------------------------------")
print("Estimated parameters:")
print(f"p := P(X=1): {p}")
print(f"q := P(Y=1): {q}")
print(f"alpha := P(Y=1 | X=1) {alpha}")
print(f"beta  := P(Y=1 | X=0) {beta}")
print(f"gamma := P(X=1 | Y=1) {gamma}")
print(f"delta := P(X=1 | Y=0) {delta}")
print("rho:", rho, rho1, rho2)
print("----------------------------------------------------------")

Use simple average to estimate p, q
Use maximum likelihood to estimate alpha, beta, gamma, delta
----------------------------------------------------------
Estimated parameters:
p := P(X=1): 0.9044
q := P(Y=1): 0.3034
alpha := P(Y=1 | X=1) 0.24425064495242355
beta  := P(Y=1 | X=0) 0.8629634292647778
gamma := P(X=1 | Y=1) 0.72814431112238
delta := P(X=1 | Y=0) 0.9811792266453301
rho: -0.39573468888946894 -0.39573259318461107 -0.3955942527592385
----------------------------------------------------------


In [17]:
p, q, a, b, c, d = estimate_joint_parameters(data1[:], data2[:])
rho1 = calculate_rho_joint(p, q, a)

print("Use simple average to estimate p, q, a, b, c, d")
print("----------------------------------------------------------")
print("Estimated parameters:")
print(f"p := P(X=1): {p}")
print(f"q := P(Y=1): {q}")
print(f"a := P(X=1, Y=1) {a}")
print(f"b := P(X=1, Y=0) {b}")
print(f"c := P(X=0, Y=1) {c}")
print(f"d := P(X=0, Y=0) {d}")
print("rho:", rho, rho1)
print("----------------------------------------------------------")

Use simple average to estimate p, q, a, b, c, d
----------------------------------------------------------
Estimated parameters:
p := P(X=1): 0.9044
q := P(Y=1): 0.3034
a := P(X=1, Y=1) 0.2209
b := P(X=1, Y=0) 0.6835
c := P(X=0, Y=1) 0.0825
d := P(X=0, Y=0) 0.0131
rho: -0.39573468888946894 -0.395734688889469
----------------------------------------------------------


In [20]:
if not os.path.isdir(os.path.dirname('../test/dir1/dir2/file')):
    os.makedirs(os.path.dirname('../test/dir1/dir2/file'))