In [1]:
import numpy as np
import pandas as pd

from typing import Dict, Any, Union
from typing_extensions import TypedDict

In [2]:
class BaseCorrelationAnalysis:

    def get_base_description(self):
        # correlation analysis의 전반적인 설명 분기가 되는 설명
        return (
            "This analysis is used to analyze the correlation between two variables. "
        )


class NormalCorrelationAnalysis(BaseCorrelationAnalysis):
    def __init__(
        self, input_data: Union[pd.DataFrame, np.ndarray], argument: Dict[str, Any]
    ):
        # super().__init__(input_data)
        self.input_data = input_data
        self.argument = argument  # argument example {"method": "pearson"}

    def get_input_requirements(self) -> str:
        return (
            "This analysis requires multiple variables as input. "
            "input data have to be a continuous data. like age, height, weight, etc."
        )

    def check_input_schema(self) -> Dict[str, Any]:

        result = {}

        # check the input data type if the input data is numpy array, convert it to pandas DataFrame
        if not isinstance(self.input_data, (pd.DataFrame, np.ndarray)):
            result["error"] = "input data have to be a pandas DataFrame or numpy array"

        # check the input data shape
        if self.input_data.shape[1] < 2:
            result["error"] = "input data have to be a matrix with at least two columns"

        if isinstance(self.input_data, np.ndarray):
            self.input_data = pd.DataFrame(self.input_data)

        return result

    def get_argument_explanation(self) -> Dict[str, Any]:
        """
        생성자 작성 필요
        return format: {
            "argument_name": ArgumentInfo(
                requierment: bool,
                explanation: str,
                type: type,
                available_value: list[dict[str, Any]]
            )
        }
        """
        argument_explanation = {
            "method": {
                "requierment": False,
                "explanation": "correlation method",
                "type": str,
                "available_value": [
                    {
                        "pearson": "Measures the linear relationship between two continuous variables."
                    },
                    {
                        "kendall": "Measures the ordinal association between two variables based on rank concordance."
                    },
                    {
                        "spearman": "Measures the monotonic relationship between two ranked variables."
                    },
                ],
            }
        }
        return argument_explanation

    def get_available_argument(self) -> list[str]:
        return list(self.get_argument_explanation().keys())

    def check_update_argument_schema(self) -> Dict[str, Any]:
        """
        check requierment argument and available value of the argument
        """
        argument_explanation = self.get_argument_explanation()

        # subsample available argument
        available_argument = self.get_available_argument()
        self.argument = {
            key: value
            for key, value in self.argument.items()
            if key in available_argument
        }

        # check requierment argument
        requierment_argument = [
            key for key, value in argument_explanation.items() if value["requierment"]
        ]

        missing_arguments = [
            arg for arg in requierment_argument if arg not in self.argument.keys()
        ]
        if missing_arguments:
            return {
                "error": "Missing requierment argument: " + ", ".join(missing_arguments)
            }

        # check available value of the argument
        for key, value in self.argument.items():
            # key :argument value : argument value
            for item in argument_explanation[key]["available_value"]:
                if value in item:
                    break
            else:
                return {
                    "error": f"Invalid argument value: {value} for argument {key}. Available values: {argument_explanation[key]['available_value']}"
                }

    def execute_analysis(self) -> Dict[str, Any]:

        error = self.check_update_argument_schema()
        if error:
            return error

        result = self.input_data.corr(**self.argument)

        return {
            "result": result,
            "output_discription": """This is the correlation matrix between the variables. 
            That value is between -1 and 1. if the value is close to 1, it means that the two variables are positively correlated. 
            if the value is close to -1, it means that the two variables are negatively correlated. 
            if the value is close to 0, it means that the two variables are not correlated.""",
        }

In [3]:
df = pd.DataFrame(
    [(0.2, 0.3), (0.0, 0.6), (0.6, 0.0), (0.2, 0.1)], columns=["dogs", "cats"]
)

In [4]:
analysis = NormalCorrelationAnalysis(input_data=df, argument={"method": "pearson"})

In [5]:
analysis.execute_analysis()

{'result':           dogs      cats
 dogs  1.000000 -0.851064
 cats -0.851064  1.000000,
 'output_discription': 'This is the correlation matrix between the variables. \n            That value is between -1 and 1. if the value is close to 1, it means that the two variables are positively correlated. \n            if the value is close to -1, it means that the two variables are negatively correlated. \n            if the value is close to 0, it means that the two variables are not correlated.'}

In [6]:
# first layer
analysis.get_base_description()

'This analysis is used to analyze the correlation between two variables. '

In [7]:
# second layer
analysis.get_input_requirements()

'This analysis requires multiple variables as input. input data have to be a continuous data. like age, height, weight, etc.'

In [8]:
analysis.get_argument_explanation()

{'method': {'requierment': False,
  'explanation': 'correlation method',
  'type': str,
  'available_value': [{'pearson': 'Measures the linear relationship between two continuous variables.'},
   {'kendall': 'Measures the ordinal association between two variables based on rank concordance.'},
   {'spearman': 'Measures the monotonic relationship between two ranked variables.'}]}}

In [78]:
import scipy.stats as ss
from sklearn.preprocessing import LabelEncoder


class PointBiserialCorrelationAnalysis:
    def __init__(self, input_data: Union[pd.DataFrame, np.ndarray]):
        # super().__init__(input_data)
        self.input_data = input_data
        self.binary_columns = []

    def get_input_requirements(self) -> str:
        return """
        This analysis requires multiple variables as input. 
        input data have to include at least one continuous variable and one categorical variable.
        The categorical variable have to be binary data.
        """

    def check_input_schema(self) -> Dict[str, Any]:

        # check the input data type if the input data is numpy array, convert it to pandas DataFrame
        if not isinstance(self.input_data, (pd.DataFrame, np.ndarray)):
            return {"error": "input data have to be a pandas DataFrame or numpy array"}

        # check the input data shape
        if self.input_data.shape[1] < 2:
            return {"error": "input data have to be a matrix with at least two columns"}

        if isinstance(self.input_data, np.ndarray):
            self.input_data = pd.DataFrame(self.input_data)

        # 입력 데이터가 이진 변수가 가능한지 확인하고 이진으로 변경하는 절차가 필요하다.
        # 만약 이진 column이 없다면 error dict가 반환되도록 한다.
        binary_column_exists = False
        for column in self.input_data.columns:
            if (
                self.input_data[column].nunique() == 2
                and self.input_data[column].dtype == pd.CategoricalDtype
            ):
                binary_column_exists = True
                le = LabelEncoder()
                le.fit(self.input_data[column])
                self.input_data[column] = le.transform(self.input_data[column])
                self.binary_columns.append(column)
            elif self.input_data[column].nunique() == 2:
                self.binary_columns.append(column)
                binary_column_exists = True
            elif self.input_data[column].nunique() < 3:
                binary_column_exists = False
                break

        if not binary_column_exists:
            return {"error": "At least one binary column is required."}

    def execute_analysis(self) -> Dict[str, Any]:

        binary_columns = self.binary_columns
        continuous_columns = self.input_data.iloc[
            :,
            self.input_data.dtypes.apply(lambda x: np.issubdtype(x, np.number)).values,
        ].columns
        result = {}

        for binary_column in binary_columns:
            for continuous_column in continuous_columns:
                r, p = ss.pointbiserialr(
                    self.input_data[binary_column], self.input_data[continuous_column]
                )
                result[f"{binary_column} and {continuous_column} r"] = r
                result[f"{binary_column} and {continuous_column} p"] = p

        return {
            "result": result,
            "result_discription": """
            p: Indicates whether the relationship between two variables is statistically significant.
            r: Indicates the strength and direction of the linear relationship between two variables.
            
            If the p-value is less than 0.05 and the r-value is large: There is a strong correlation between the variables, and the relationship is statistically significant.
            If the p-value is greater than 0.05 and the r-value is small: There is little or no correlation, and the relationship is not statistically significant.
            """,
        }

In [86]:
df = pd.DataFrame(
    [(0.2, 10), (0.0, 20), (0.6, 30), (0.2, 40)],
    columns=["dogs", "rabbits"],
)
# df = pd.DataFrame(
#     [(0.2, 1, 10), (0.0, 0, 20), (0.6, 1, 30), (0.2, 1, 40)],
#     columns=["dogs", "cats", "rabbits"],
# )

# df = pd.DataFrame(
#     [(0.2, 1.0, 10), (0.0, 0.0, 20), (0.6, 1.0, 30), (0.2, 1.0, 40)],
#     columns=["dogs", "cats", "rabbits"],
# )
# df = pd.DataFrame(
#     [(0.2, "1.", 10), (0.0, "0.", 20), (0.6, "1.", 30), (0.2, "1.", 40)],
#     columns=["dogs", "cats", "rabbits"],
# )

In [88]:
analysis = PointBiserialCorrelationAnalysis(input_data=df)
result = analysis.check_input_schema()
if result["error"] is not None:
    print(result["error"])
else:
    result = analysis.execute_analysis()
    print(result)

At least one binary column is required.


In [77]:
analysis.input_data

Unnamed: 0,dogs,cats,rabbits
0,0.2,1,10
1,0.0,0,20
2,0.6,1,30
3,0.2,1,40


In [71]:
analysis.input_data.dtypes

dogs       float64
cats          bool
rabbits      int64
dtype: object

In [89]:
import numpy as np
import pandas as pd
import scipy.stats as ss

# https://www.kaggle.com/datasets/spscientist/students-performance-in-exams
parental_level_of_education = [
    "some college",
    "bachelor's degree",
    "some high school",
    "some college",
    "some college",
    "some college",
    "high school",
    "high school",
    "associate's degree",
    "some college",
    "some college",
    "bachelor's degree",
    "high school",
    "bachelor's degree",
    "associate's degree",
    "high school",
    "some college",
    "some college",
    "associate's degree",
    "some high school",
    "bachelor's degree",
    "some college",
    "high school",
    "high school",
    "associate's degree",
    "some college",
    "some high school",
    "some college",
    "some college",
    "master's degree",
    "associate's degree",
    "some high school",
    "some college",
    "bachelor's degree",
    "high school",
    "high school",
    "high school",
    "some high school",
    "associate's degree",
    "some high school",
    "associate's degree",
    "some college",
    "associate's degree",
    "some college",
    "bachelor's degree",
    "associate's degree",
    "associate's degree",
    "high school",
    "associate's degree",
    "associate's degree",
    "some college",
    "associate's degree",
    "master's degree",
    "master's degree",
    "some high school",
    "high school",
    "bachelor's degree",
    "associate's degree",
    "high school",
    "some college",
    "some college",
    "associate's degree",
    "some high school",
    "some high school",
    "bachelor's degree",
    "high school",
    "high school",
    "high school",
    "some college",
    "some high school",
    "some high school",
    "associate's degree",
    "associate's degree",
    "high school",
    "associate's degree",
    "bachelor's degree",
    "high school",
    "some college",
    "some college",
    "associate's degree",
    "some college",
    "master's degree",
    "associate's degree",
    "high school",
    "bachelor's degree",
    "some high school",
    "some college",
    "some high school",
    "associate's degree",
    "associate's degree",
    "some high school",
    "high school",
    "some high school",
    "bachelor's degree",
    "high school",
    "master's degree",
    "high school",
    "associate's degree",
    "some high school",
    "some college",
]
course = [
    "completed",
    "completed",
    "completed",
    "none",
    "none",
    "none",
    "completed",
    "none",
    "completed",
    "completed",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "completed",
    "none",
    "none",
    "none",
    "completed",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "completed",
    "none",
    "completed",
    "none",
    "completed",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "completed",
    "completed",
    "none",
    "completed",
    "none",
    "completed",
    "none",
    "none",
    "completed",
    "completed",
    "none",
    "completed",
    "completed",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "none",
    "completed",
    "completed",
    "none",
    "none",
    "none",
    "completed",
    "none",
    "none",
    "none",
    "completed",
    "completed",
    "completed",
    "none",
    "none",
    "none",
    "completed",
    "none",
    "completed",
    "completed",
    "none",
    "completed",
    "none",
    "completed",
    "none",
    "none",
    "none",
    "none",
    "none",
    "completed",
    "none",
]
gender = [
    "male",
    "male",
    "male",
    "male",
    "male",
    "female",
    "male",
    "male",
    "female",
    "male",
    "female",
    "male",
    "male",
    "female",
    "female",
    "female",
    "female",
    "female",
    "male",
    "female",
    "female",
    "female",
    "male",
    "female",
    "female",
    "male",
    "male",
    "female",
    "female",
    "female",
    "female",
    "female",
    "female",
    "male",
    "male",
    "female",
    "female",
    "male",
    "male",
    "female",
    "male",
    "female",
    "female",
    "female",
    "female",
    "female",
    "female",
    "female",
    "male",
    "male",
    "female",
    "female",
    "male",
    "female",
    "male",
    "female",
    "male",
    "female",
    "female",
    "female",
    "female",
    "male",
    "female",
    "female",
    "female",
    "male",
    "male",
    "male",
    "female",
    "male",
    "male",
    "female",
    "male",
    "male",
    "male",
    "female",
    "female",
    "female",
    "female",
    "male",
    "male",
    "female",
    "female",
    "female",
    "male",
    "male",
    "female",
    "male",
    "female",
    "female",
    "female",
    "male",
    "female",
    "female",
    "female",
    "female",
    "male",
    "male",
    "female",
    "female",
]
students = pd.DataFrame(
    {
        "course": course,
        "parental_level_of_education": parental_level_of_education,
        "gender": gender,
    }
)


# https://stackoverflow.com/questions/46498455/categorical-features-correlation/46498792#46498792
def cramers_v(confusion_matrix):
    """calculate Cramers V statistic for categorial-categorial association.
    uses correction from Bergsma and Wicher,
    Journal of the Korean Statistical Society 42 (2013): 323-328
    """
    chi2 = ss.chi2_contingency(confusion_matrix)[0]
    n = confusion_matrix.sum()
    phi2 = chi2 / n
    r, k = confusion_matrix.shape
    phi2corr = max(0, phi2 - ((k - 1) * (r - 1)) / (n - 1))
    rcorr = r - ((r - 1) ** 2) / (n - 1)
    kcorr = k - ((k - 1) ** 2) / (n - 1)
    return np.sqrt(phi2corr / min((kcorr - 1), (rcorr - 1)))


confusion_matrix = pd.crosstab(
    students["parental_level_of_education"], students["course"]
)
cramers_v(confusion_matrix.values)

confusion_matrix = pd.crosstab(
    students["parental_level_of_education"], students["gender"]
)
cramers_v(confusion_matrix.values)

np.float64(0.04603264414335656)

In [91]:
a = cramers_v(confusion_matrix.values)

In [96]:
a

np.float64(0.04603264414335656)

In [95]:
# how to get just float value from numpy array

float(a)

0.04603264414335656