# Constant population

In [None]:
import numpy as np
import pandas as pd

# Power calculation function
def calculate_power(t1_index=0,
                   t2_labels=range(0, 34, 3),
                   data_dir="Crash300"):

    def load_data(model):
        return {
            'fst': np.load(f"{data_dir}/{model}_fst.npy"),
            'dxy': np.load(f"{data_dir}/{model}_dxy.npy"),
            'gr': np.load(f"{data_dir}/{model}_gr.npy")
        }
    
    constant = load_data("constant")
    decline = load_data("decline")
    
    results = []
    
    # Analyze for each t2 value
    for t2_idx, t2 in enumerate(t2_labels):
        data = {
            'fst': {
                'constant': constant['fst'][:, t1_index, t2_idx],
                'decline': decline['fst'][:, t1_index, t2_idx]
            },
            'dxy': {
                'constant': constant['dxy'][:, t1_index, t2_idx],
                'decline': decline['dxy'][:, t1_index, t2_idx]
            },
            'gr': {
                'constant': constant['gr'][:, t1_index, t2_idx],
                'decline': decline['gr'][:, t1_index, t2_idx]
            }
        }
        
        # Calculate power for each statistic
        for stat in ['fst', 'dxy', 'gr']:
            method = 'above' if stat == 'fst' else 'below'
            
            threshold = np.percentile(
                data[stat]['constant'],
                q=95 if method == 'above' else 5
            )
            
            if method == 'above':
                power = np.mean(data[stat]['decline'] > threshold)
            else:
                power = np.mean(data[stat]['decline'] < threshold)
            
            results.append({
                't1': 39,
                't2': t2,
                'statistic': stat,
                'method': method,
                'threshold': threshold,
                'power': power,
                'n_constant': len(data[stat]['constant']),
                'n_decline': len(data[stat]['decline']),
                'dataset': data_dir
            })
    
    return pd.DataFrame(results)


In [None]:
power_results_300 = calculate_power(data_dir="../data/SingleSite/Constant300")
power_results_3000 = calculate_power(data_dir="../data/SingleSite/Constant3000")

In [None]:
##### Plotting with rpy2 and ggplot2 #####

import rpy2.robjects as ro
from rpy2.robjects import pandas2ri
from rpy2.robjects.packages import importr
from rpy2.robjects.conversion import localconverter

ggplot2 = importr('ggplot2')
dplyr = importr('dplyr')
grid = importr('grid')

# Prepare data for R
with localconverter(ro.default_converter + pandas2ri.converter):
    power_results_300_clean = power_results_300.rename(columns={
        't2': 'timepoint',
        'statistic': 'variable',
        'dataset': 'population'
    })
    power_results_3000_clean = power_results_3000.rename(columns={
        't2': 'timepoint', 
        'statistic': 'variable',
        'dataset': 'population'
    })
    
    power_results_300_r = ro.conversion.py2rpy(power_results_300_clean)
    power_results_3000_r = ro.conversion.py2rpy(power_results_3000_clean)

# Assign to R global environment
ro.globalenv['power_results_300'] = power_results_300_r
ro.globalenv['power_results_3000'] = power_results_3000_r


# Plotting in R
ro.r('''
# Combine datasets
all_power_results <- rbind(
  transform(power_results_300, population = "99%", line_group = paste(variable, "99%", sep="_")),
  transform(power_results_3000, population = "90%", line_group = paste(variable, "90%", sep="_"))
)

all_power_results$population <- factor(
  all_power_results$population, 
  levels = c("99%", "90%")
)


variable_colors <- c(
  "fst" = "#ad5636",
  "dxy" = "#2b55c1",
  "gr" = "#3b9391"
)


p2 <- ggplot() +
  geom_line(
    data = all_power_results,
    aes(x = timepoint, y = power, 
        color = variable, 
        linetype = population,
        group = line_group),
    linewidth = 1
  ) +
     
  geom_point(
    data = all_power_results,
    aes(x = timepoint, y = power, 
        color = variable, 
        shape = population),
    size = 3
  ) +
     
  geom_hline(yintercept = 0.8, linetype = "dotdash", color = "red", linewidth = 0.8) +
  annotate("text", x = 30, y = 0.85, label = "P = 0.8", color = "red", size = 5) +
  
  scale_color_manual(
  name = "Statistics",
  values = variable_colors,
  labels = c("fst" = expression(F[ST]), "dxy" = expression(D[XY]), "gr" = "GR"),
  guide = guide_legend(
    override.aes = list(shape = NA)
  )
) +
  scale_linetype_manual(
    name = "Crash Extent",
    values = c("99%" = "solid", "90%" = "dashed")
  ) +
  scale_shape_manual(
    name = "Crash Extent",
    values = c("99%" = 16, "90%" = 17)
  ) +
  
  scale_x_reverse(
    name = "Generations ago (0 = present)",
    breaks = seq(0, 36, by = 6),
    limits = c(36, 0)
  ) +
  scale_y_continuous(
    name = "Power",
    limits = c(0, 1.05),
    expand = c(0, 0)
  ) +
  labs(
    title = "(B)",
  ) +
  
  theme_bw(base_size = 16) +
  theme(
    legend.position = "right",
    legend.box = "vertical",
    panel.grid.minor = element_blank(),
    panel.grid.major = element_blank(),
    legend.spacing.y = unit(0.3, "cm"),
    legend.title = element_text(face = "bold"),
    legend.key.width = unit(1.8, "cm"),
    legend.key.height = unit(0.8, "cm")
  )

ggsave("../outputs/combined_plots/multi_constant.pdf", p2, width = 9.15, height = 6, dpi = 300)
''')


0
'../combined_plots/multi_constant.pdf'


# seasonal

In [None]:
import numpy as np
import pandas as pd

# Power calculation function
def calculate_power(t1_index=0, t2_labels=range(0, 34, 3), data_dir="Dry300/DryTime_6"):

    def load_data(model):
        return {
            'fst': np.load(f"{data_dir}/{model}_fst.npy"),
            'dxy': np.load(f"{data_dir}/{model}_dxy.npy"),
            'gr': np.load(f"{data_dir}/{model}_gr.npy")
        }
    
    seasonal = load_data("seasonal")
    decline = load_data("decline")
    
    results = []
    
    for t2_idx, t2 in enumerate(t2_labels):
        data = {
            'fst': {
                'seasonal': seasonal['fst'][:, t1_index, t2_idx],
                'decline': decline['fst'][:, t1_index, t2_idx]
            },
            'dxy': {
                'seasonal': seasonal['dxy'][:, t1_index, t2_idx],
                'decline': decline['dxy'][:, t1_index, t2_idx]
            },
            'gr': {
                'seasonal': seasonal['gr'][:, t1_index, t2_idx],
                'decline': decline['gr'][:, t1_index, t2_idx]
            }
        }
        
        for stat in ['fst', 'dxy', 'gr']:
            method = 'above' if stat == 'fst' else 'below'
            threshold = np.percentile(
                data[stat]['seasonal'],
                q=95 if method == 'above' else 5
            )
            
            power = np.mean(data[stat]['decline'] > threshold) if method == 'above' else np.mean(data[stat]['decline'] < threshold)
            
            results.append({
                't1': 39, 't2': t2, 'statistic': stat,
                'method': method, 'threshold': threshold,
                'power': power,
                'n_seasonal': len(data[stat]['seasonal']),
                'n_decline': len(data[stat]['decline']),
                'dataset': "300" if "300" in data_dir else "3000"
            })
    
    return pd.DataFrame(results)

Plot for 6 generations wet/dry period

In [None]:
power_results_300 = calculate_power(data_dir="../data/SingleSite/Seasonal300/DryTime_6")
power_results_3000 = calculate_power(data_dir="../data/SingleSite/Seasonal3000/DryTime_6")

In [None]:
import rpy2.robjects as ro
from rpy2.robjects import pandas2ri
from rpy2.robjects.packages import importr
from rpy2.robjects.conversion import localconverter

ggplot2 = importr('ggplot2')
dplyr = importr('dplyr')

with localconverter(ro.default_converter + pandas2ri.converter):
    power_results_300_clean = power_results_300.rename(columns={
        't2': 'timepoint',
        'statistic': 'variable',
        'dataset': 'population'
    })
    power_results_3000_clean = power_results_3000.rename(columns={
        't2': 'timepoint', 
        'statistic': 'variable',
        'dataset': 'population'
    })
    
    power_results_300_r = ro.conversion.py2rpy(power_results_300_clean)
    power_results_3000_r = ro.conversion.py2rpy(power_results_3000_clean)

ro.globalenv['power_results_300'] = power_results_300_r
ro.globalenv['power_results_3000'] = power_results_3000_r


ro.r('''
all_power_results <- rbind(
  transform(power_results_300, 
            population = "99%",
            line_group = paste(variable, "99%", sep="_")),
  transform(power_results_3000, 
            population = "90%",
            line_group = paste(variable, "90%", sep="_"))
)

all_power_results$population <- factor(
  all_power_results$population, 
  levels = c("99%", "90%")
)


background_zones <- data.frame(
  xstart = seq(0, 30, by = 6),
  xend = seq(6, 36, by = 6),
  fill_col = rep(c("#9BC3EC", "#EC9595"), length.out = 6)
)

variable_colors <- c(
  "fst" = "#ad5636",
  "dxy" = "#2b55c1",
  "gr" = "#3b9391"
)

p1 <- ggplot() +
  geom_rect(
    data = background_zones,
    aes(xmin = xstart, xmax = xend, ymin = 0, ymax = 1.05, fill = fill_col),
    alpha = 0.3
  ) +
  
  geom_line(
    data = all_power_results,
    aes(x = timepoint, y = power, 
        color = variable, 
        linetype = population,
        group = line_group),
    linewidth = 1
  ) +
  
  geom_point(
    data = all_power_results,
    aes(x = timepoint, y = power, 
        color = variable, 
        shape = population),
    size = 3
  ) +
  
  geom_hline(yintercept = 0.8, linetype = "dotdash", color = "red", linewidth = 0.8) +
  annotate("text", x = 30, y = 0.85, label = "P = 0.8", color = "red", size = 5) +
  
  scale_fill_manual(
    name = "Season",
    values = c("#9BC3EC" = "#9BC3EC", "#EC9595" = "#EC9595"),
    labels = c("Wet", "Dry"),
    guide = guide_legend(order = 3)
  ) +
  scale_color_manual(
    name = "Statistics",
    values = variable_colors,
    labels = c("fst" = expression(F[ST]), "dxy" = expression(D[XY]), "gr" = "GR"),
    guide = guide_legend(
      order = 1,
      override.aes = list(linetype = "solid", shape = NA)  # 仅显示线
    )
  ) +
  scale_linetype_manual(
    name = "Crash Extent",
    values = c("99%" = "solid", "90%" = "dashed"),
    guide = guide_legend(
      order = 2,
      override.aes = list(color = "black")  # 统一颜色
    )
  ) +
  scale_shape_manual(
    name = "Crash Extent",
    values = c("99%" = 16, "90%" = 17),
    guide = guide_legend(
      order = 2,
      override.aes = list(color = "black")  # 统一颜色
    )
  ) +
  
  scale_x_reverse(
    name = "Generations ago (0 = present)",
    breaks = seq(0, 36, by = 6),
    limits = c(36, 0)
  ) +
  scale_y_continuous(
    name = "Power",
    limits = c(0, 1.05),
    expand = c(0, 0)
  ) +
  labs(
    title = "(C)",
  ) +

  theme_bw(base_size = 16) +
  theme(
    legend.position = "right",
    legend.box = "vertical",
    panel.grid.minor = element_blank(),
    panel.grid.major = element_blank(),
    legend.spacing.y = unit(0.3, "cm"),
    legend.title = element_text(face = "bold"),
    legend.key.width = unit(1.8, "cm"),
    legend.key.height = unit(0.8, "cm")
  )

ggsave("../combined_plots/season_multi1.pdf", p1, width = 10, height = 6, dpi = 300)
''')

0
'../combined_plots/season_multi1.pdf'


Plot for 3 generations wet/dry period

In [None]:
power_results_300 = calculate_power(data_dir="../data/SingleSite/Seasonal300/DryTime_3")
power_results_3000 = calculate_power(data_dir="../data/SingleSite/Seasonal3000/DryTime_3")

In [None]:
import rpy2.robjects as ro
from rpy2.robjects import pandas2ri
from rpy2.robjects.packages import importr
from rpy2.robjects.conversion import localconverter

ggplot2 = importr('ggplot2')
dplyr = importr('dplyr')

with localconverter(ro.default_converter + pandas2ri.converter):
    power_results_300_clean = power_results_300.rename(columns={
        't2': 'timepoint',
        'statistic': 'variable',
        'dataset': 'population'
    })
    power_results_3000_clean = power_results_3000.rename(columns={
        't2': 'timepoint', 
        'statistic': 'variable',
        'dataset': 'population'
    })
    
    power_results_300_r = ro.conversion.py2rpy(power_results_300_clean)
    power_results_3000_r = ro.conversion.py2rpy(power_results_3000_clean)

ro.globalenv['power_results_300'] = power_results_300_r
ro.globalenv['power_results_3000'] = power_results_3000_r


ro.r('''
all_power_results <- rbind(
  transform(power_results_300, 
            population = "99%",
            line_group = paste(variable, "99%", sep="_")),
  transform(power_results_3000, 
            population = "90%",
            line_group = paste(variable, "90%", sep="_"))
)

all_power_results$population <- factor(
  all_power_results$population,
  levels = c("99%", "90%")
)


background_zones <- data.frame(
  xstart = seq(0, 33, by = 3),
  xend = seq(3, 36, by = 3),
  fill_col = rep(c("#9BC3EC", "#EC9595"), length.out = 6)
)

variable_colors <- c(
  "fst" = "#ad5636",
  "dxy" = "#2b55c1",
  "gr" = "#3b9391"
)

p2 <- ggplot() +
  # 1. 背景色
  geom_rect(
    data = background_zones,
    aes(xmin = xstart, xmax = xend, ymin = 0, ymax = 1.05, fill = fill_col),
    alpha = 0.3
  ) +
  
  geom_line(
    data = all_power_results,
    aes(x = timepoint, y = power, 
        color = variable, 
        linetype = population,
        group = line_group),
    linewidth = 1
  ) +
  
  geom_point(
    data = all_power_results,
    aes(x = timepoint, y = power, 
        color = variable, 
        shape = population),
    size = 3
  ) +
  
  geom_hline(yintercept = 0.8, linetype = "dotdash", color = "red", linewidth = 0.8) +
  annotate("text", x = 30, y = 0.85, label = "P = 0.8", color = "red", size = 5) +
  
  scale_fill_manual(
    name = "Season",
    values = c("#9BC3EC" = "#9BC3EC", "#EC9595" = "#EC9595"),
    labels = c("Wet", "Dry"),
    guide = guide_legend(order = 3)
  ) +
  scale_color_manual(
    name = "Statistics",
    values = variable_colors,
    labels = c("fst" = expression(F[ST]), "dxy" = expression(D[XY]), "gr" = "GR"),
    guide = guide_legend(
      order = 1,
      override.aes = list(linetype = "solid", shape = NA)  # 仅显示线
    )
  ) +
  scale_linetype_manual(
    name = "Crash Extent",
    values = c("99%" = "solid", "90%" = "dashed"),
    guide = guide_legend(
      order = 2,
      override.aes = list(color = "black")  # 统一颜色
    )
  ) +
  scale_shape_manual(
    name = "Crash Extent",
    values = c("99%" = 16, "90%" = 17),
    guide = guide_legend(
      order = 2,
      override.aes = list(color = "black")  # 统一颜色
    )
  ) +
  
  scale_x_reverse(
    name = "Generations ago (0 = present)",
    breaks = seq(0, 36, by = 6),
    limits = c(36, 0)
  ) +
  scale_y_continuous(
    name = "Power",
    limits = c(0, 1.05),
    expand = c(0, 0)
  ) +
  labs(
    title = "(D)",
  ) +

  theme_bw(base_size = 16) +
  theme(
    legend.position = "right",
    legend.box = "vertical",
    panel.grid.minor = element_blank(),
    panel.grid.major = element_blank(),
    legend.spacing.y = unit(0.3, "cm"),
    legend.title = element_text(face = "bold"),
    legend.key.width = unit(1.8, "cm"),
    legend.key.height = unit(0.8, "cm")
  )

ggsave("../combined_plots/season_multi2.pdf", p2, width = 10, height = 6, dpi = 300)
''')

0
'../combined_plots/season_multi2.pdf'


In [None]:
ro.r('''
library(gridExtra)
library(ggpubr)

combined_plot <- ggarrange(
    p1, p2,
    ncol = 2, nrow = 1, 
    widths = c(8, 8),
    heights = c(1, 1),
    common.legend = TRUE, 
    legend = "right"
)
options(repr.plot.width = 12, repr.plot.height = 6)

ggsave(combined_plot, filename = "../combined_plots/multi_seasonal.pdf", width = 13.15, height = 6)
''')

R[write to console]: 
Attaching package: ‘gridExtra’


R[write to console]: The following object is masked from ‘package:dplyr’:

    combine




0
'../combined_plots/season_multi.pdf'
