In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%cd /content/drive/My Drive/CSE 544/Project

/content/drive/My Drive/CSE 544/Project


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import poisson, geom, binom

b. Inference the equality of distributions between the two states (distribution of daily #cases and daily #deaths) for the last three months of 2021 (Oct, Nov, Dec) of your dataset using K-S test and Permutation test. For the K-S test, use both 1-sample and 2-sample tests. For the 1-sample test, try Poisson, Geometric, and Binomial. To obtain parameters of these distributions to check against in 1-sample KS, use MME on the Oct-Dec 2021 data of the first state in your dataset to obtain parameters of the distribution, and then check whether the Oct-Dec 2021 data for the second state in your dataset has the distribution with the obtained MME parameters. For the permutation test, use 1000 random permutations. Use a threshold of 0.05 for both K-S test and Permutation test.

Assume random variable X ~ Pois(λ), E[X] = λ and Var(X) = λ.

Assume random variable X ~ Geo(p), E[X] = 1/p and Var(X) = (1-p)/(p^2).

Assume random variable X ~ Bino(n, p), E[X] = np and Var(X) = npq where q = 1-p.


In [None]:
def MME_Pois(data):
  lambda_est = np.mean(data)
  return lambda_est

def MME_Geo(data):
  p_est = 1/np.mean(data)
  return p_est

def MME_Bino(data):
  mu_value = np.mean(data)
  var_value = np.mean(np.power(data, 2)) - np.power(mu_value, 2)
  p_est = 1 - var_value / mu_value
  n_est = mu_value / p_est
  return n_est, p_est

def KS_test1(data1, data2, distribution):
  ### remove negative samples
  data1 = data1[np.where(data1 >= 0)]
  data2 = data2[np.where(data2 >= 0)]
  if distribution == "Geometric":
    ### remove 0s
    data1 = data1[np.where(data1 > 0)]
    data2 = data2[np.where(data2 > 0)]

  d = 0
  data1 = np.sort(data1)
  data2_unique, data2_counts = np.unique(np.sort(data2), return_counts=True)

  prob_right = data2_counts / np.sum(data2_counts)
  prob_left = np.zeros(prob_right.shape)
  prob_left[1:] = prob_right[:-1]
  prob_left = np.cumsum(prob_left)
  prob_right = np.cumsum(prob_right)

  prob = np.zeros(data2_counts.shape)
  if distribution == "Poisson":
    parameter = MME_Pois(data1)
    prob = poisson.cdf(data2_unique, parameter)
  elif distribution == "Geometric":
    parameter = MME_Geo(data1)
    prob = geom.cdf(data2_unique, parameter)
  elif distribution == "Binomial":
    parameter = MME_Bino(data1)
    prob = binom.cdf(data2_unique, parameter[0], parameter[1])
  else:
    print("can not deal with this distribution!!!")

  right_max = np.max(np.abs(prob - prob_right))
  left_max = np.max(np.abs(prob - prob_left))
  d = max(right_max, left_max)
  return d

def KS_test2(data1, data2):
  ### remove negative samples
  data1 = data1[np.where(data1 >= 0)]
  data2 = data2[np.where(data2 >= 0)]

  d = 0
  data1_unique, data1_counts = np.unique(np.sort(data1), return_counts=True)
  data2_unique, data2_counts = np.unique(np.sort(data2), return_counts=True)
  prob_data1 = data1_counts / np.sum(data1_counts)
  prob_data1 = np.cumsum(prob_data1)

  prob_right = data2_counts / np.sum(data2_counts)
  prob_left = np.zeros(prob_right.shape)
  prob_left[1:] = prob_right[:-1]
  prob_left = np.cumsum(prob_left)
  prob_right = np.cumsum(prob_right)
  
  prob = np.zeros(data2_unique.shape)
  j = 0
  for i in range(data2_unique.shape[0]):
    if data2_unique[i] < data1_unique[0]:
      prob[i] = 0
      continue

    j = 0
    while j < data1_unique.shape[0] - 1:
      if data2_unique[i] == data1_unique[j]:
        break
      elif data2_unique[i] > data1_unique[j] and data2_unique[i] < data1_unique[j+1]:
        break
      else: j = j + 1

    index = min(j, data1_unique.shape[0] - 1)
    prob[i] = prob_data1[index]
  
  right_max = np.max(np.abs(prob - prob_right))
  left_max = np.max(np.abs(prob - prob_left))
  d = max(right_max, left_max)
  return d

In [None]:
begin_date = '2021-10-01'
end_date = '2021-12-31'

### read data 
def read_data_part_b(state_name, data_class):
  file_name = state_name + '_' + data_class + '.csv'
  data_pd = pd.read_csv(file_name)
  data_mask = (data_pd['submission_date'] >= begin_date) & (data_pd['submission_date'] <= end_date)
  data_pd = data_pd.loc[data_mask]
  data = data_pd[data_class].to_numpy()
  return data


In [None]:
threshold = 0.05
distributions = ["Poisson", "Geometric", "Binomial"]
data_classes = ["new_case", "pnew_case", "new_death", "pnew_death"]
### K-S test for 1-sample
for data_name in data_classes:
  AL_data = read_data_part_b("AL", data_name)
  AZ_data = read_data_part_b("AZ", data_name)
  print("K-S Test for", data_name, ":")
  for distribution in distributions: 
    print("try", distribution, ":")  
    d = KS_test1(AL_data, AZ_data, distribution)
    if d > threshold:
      print("d = ", d, "reject H0 (The distribution of", data_name, "in AL is the same as the distribution of", data_name, "in AZ)")
    else:
      print("d = ", d, "accept H0 (The distribution of", data_name, "in AL is the same as the distribution of", data_name, "in AZ)")
  print("\n")

K-S Test for new_case :
try Poisson :
d =  0.96 reject H0 (The distribution of new_case in AL is the same as the distribution of new_case in AZ)
try Geometric :
d =  0.857241666501106 reject H0 (The distribution of new_case in AL is the same as the distribution of new_case in AZ)
try Binomial :
d =  1.0 reject H0 (The distribution of new_case in AL is the same as the distribution of new_case in AZ)


K-S Test for pnew_case :
try Poisson :
d =  0.33262588485348554 reject H0 (The distribution of pnew_case in AL is the same as the distribution of pnew_case in AZ)
try Geometric :
d =  0.3853414198776546 reject H0 (The distribution of pnew_case in AL is the same as the distribution of pnew_case in AZ)
try Binomial :
d =  1.0 reject H0 (The distribution of pnew_case in AL is the same as the distribution of pnew_case in AZ)


K-S Test for new_death :
try Poisson :
d =  0.4374328757733563 reject H0 (The distribution of new_death in AL is the same as the distribution of new_death in AZ)
try Geo

In [None]:
### K-S test for 2-samples
threshold = 0.05
data_classes = ["new_case", "pnew_case", "new_death", "pnew_death"]
for data_name in data_classes:
  AL_data = read_data_part_b("AL", data_name)
  AZ_data = read_data_part_b("AZ", data_name)
  print("K-S Test for", data_name, ":")
  d = KS_test2(AL_data, AZ_data)
  if d > threshold:
    print("d = ", d, "reject H0 (The distribution of", data_name, "in AL is the same as the distribution of", data_name, "in AZ)")
  else:
    print("d = ", d, "accept H0 (The distribution of", data_name, "in AL is the same as the distribution of", data_name, "in AZ)")
  print("\n")

K-S Test for new_case :
d =  0.9344715447154478 reject H0 (The distribution of new_case in AL is the same as the distribution of new_case in AZ)


K-S Test for pnew_case :
d =  0.2579185520361992 reject H0 (The distribution of pnew_case in AL is the same as the distribution of pnew_case in AZ)


K-S Test for new_death :
d =  0.32342657342657344 reject H0 (The distribution of new_death in AL is the same as the distribution of new_death in AZ)


K-S Test for pnew_death :
d =  0.5978260869565217 reject H0 (The distribution of pnew_death in AL is the same as the distribution of pnew_death in AZ)




In [None]:
def Permutation_Test(data1, data2, permutation_number):
  ### remove negative samples
  data1 = data1[np.where(data1 >= 0)]
  data2 = data2[np.where(data2 >= 0)]
  T_obs = np.abs(np.mean(data1) - np.mean(data2))

  d = 0
  n1 = data1.shape[0]
  n2 = data2.shape[0]

  data = np.concatenate((data1, data2))

  results = []
  for i in range(permutation_number):
    array = np.random.permutation(data)
    data1_permutation = array[:n1]
    data2_permutation = array[n1:]
    results.append(np.mean(data1_permutation) - np.mean(data2_permutation))

  results = np.abs(results)
  d = np.sum(results > T_obs)/len(results)
  return d

In [None]:
### Permutation Test
permutation_number = 1000
threshold = 0.05
data_classes = ["new_case", "pnew_case", "new_death", "pnew_death"]
for data_name in data_classes:
  AL_data = read_data_part_b("AL", data_name)
  AZ_data = read_data_part_b("AZ", data_name)
  print("Permutation Test for", data_name, ":")
  d = Permutation_Test(AL_data, AZ_data, permutation_number)
  if d <= threshold:
    print("d = ", d, "reject H0 (The distribution of", data_name, "in AL is the same as the distribution of", data_name, "in AZ)")
  else:
    print("d = ", d, "accept H0 (The distribution of", data_name, "in AL is the same as the distribution of", data_name, "in AZ)")
  print("\n")

Permutation Test for new_case :
d =  0.0 reject H0 (The distribution of new_case in AL is the same as the distribution of new_case in AZ)


Permutation Test for pnew_case :
d =  0.693 accept H0 (The distribution of pnew_case in AL is the same as the distribution of pnew_case in AZ)


Permutation Test for new_death :
d =  0.007 reject H0 (The distribution of new_death in AL is the same as the distribution of new_death in AZ)


Permutation Test for pnew_death :
d =  0.0 reject H0 (The distribution of pnew_death in AL is the same as the distribution of pnew_death in AZ)


