# Path analysis for data discovery

In [None]:
import sys
from pathlib import Path
import pandas as pd
import json
from typing import Tuple, Type

sys.path.append(f"{Path.cwd().parent.absolute()}/")
from setup import setup

db = setup()

## Setting parameters

In [None]:
OID = 139
skip_features = []
datetime_format = ""
# TODO: Concept hierarchy is needed to implement
concept_hierarchy = {"教育程度類別": ["其他", "高中高職", "專科", "大學", "碩士", "博士"], "產業別": ["其他", "百貨", "文教康樂", "行", "住", "衣", "食"]}
target = "信用卡交易金額[新台幣]"

## Fetch data from database

In [None]:
from sqlalchemy import text

query = text(f"SELECT * FROM [RawDB].[dbo].[D{OID}]")
data = db.execute(query)

query = text("SELECT * FROM [DV].[dbo].[Object] where OID = :OID")
object_table = db.execute(query, OID=OID).fetchall()

df = pd.DataFrame(data.fetchall())
copy_of_df = df.copy()

data_name = pd.DataFrame(object_table)["CName"][0]
print(f"Data name: {data_name}")

column_names = df.columns.tolist()

## Numerical column handler

In [None]:
quantile_mapping = {}

numerical_column_tuple: Tuple[str, int, int] = []
for col in column_names:
    column_ratio = len(df[col].unique()) / df[col].count()
    is_categorical_column = df[col].dtype == "object" or df[col].dtype == "category" or column_ratio < 0.01
    is_numerical_column = df[col].dtype == "int64" and not is_categorical_column
    if is_numerical_column:
        numerical_column_tuple.append([col, df[col].min(), df[col].max()])

numerical_column_tuple


for col in concept_hierarchy:
    df[col] = pd.Series(pd.Categorical(values=df[col], categories=concept_hierarchy[col], ordered=True))


quantile_numeric = []

for tuple in numerical_column_tuple:
    col = tuple[0]
    discrete_bin_num = 3
    quantile_labels = ["low", "middle", "high"]
    quantile = pd.qcut(df[col], q=[0, 0.1575, 0.8425, 1])
    quantile_numeric.append(col)
    df[col] = pd.qcut(df[col], q=[0, 0.1575, 0.8425, 1], labels=quantile_labels)
    quantile_mapping[col] = pd.Series(data=quantile.unique().tolist(), index=df[col].unique().tolist())

## Datetime column handler

In [None]:
from datetime import datetime
from math import log
from pandas import DatetimeIndex


datetime_format_list = ["%Y-%m-%d", "%Y-%m", "%Y%m%d", "%Y%m", "%Y"]
datetime_column_tuple: Tuple[str, Type[pd.Timestamp], Type[pd.Timestamp]] = []

if len(datetime_format) != 0:
    datetime_format_list.insert(0, datetime_format)


# to_datetime reference: https://pandas.pydata.org/docs/reference/api/pandas.to_datetime.html

for col in column_names:
    is_datetime = df[col].dtype == "datetime64[ns]"
    if is_datetime:
        datetime_column_tuple.append(
            [
                col,
                df[col].min(),
                df[col].max(),
            ]
        )
        continue
    for test_format in datetime_format_list:
        is_numeric = df[col].dtype == "int64"
        is_datetime = is_numeric and (
            True
            not in pd.to_datetime(arg=df[col].astype("str"), format=test_format, errors="coerce")
            .isna()
            .value_counts()
            .index.to_list()
        )
        if is_datetime:
            parsed_datetime = pd.to_datetime(arg=df[col], format=test_format, errors="coerce")
            df[col] = parsed_datetime
            datetime_column_tuple.append(
                [
                    col,
                    parsed_datetime.min(),
                    parsed_datetime.max(),
                ]
            )
            break

datetime_column_tuple

# Quantize datetime problem reference:
# https://stackoverflow.com/questions/43500894/pandas-pd-cut-binning-datetime-column-series
datetime_columns = []

# ! testing ######################


def count_feature_entropy(feature_name: str, target_name: str, feature_values: list, target_values):
    part_df = pd.DataFrame(columns=[feature_name, target_name])
    part_df[feature_name] = feature_values
    part_df[target_name] = target_values

    class_count = len(part_df[target_name].unique())
    total_rows = len(part_df[target_name])
    weight_average = 0
    accumulation = 0

    # FIXME: for nan value
    for value in part_df[feature_name].dropna().unique().tolist():
        value_df = part_df.loc[part_df[feature_name] == value]
        target_values = value_df[target_name].value_counts().values.tolist()
        H = [
            -(target_value / total_rows) * log(target_value / total_rows, class_count) for target_value in target_values
        ]
        entropy = sum(H)
        accumulation += 1
        weight_average += entropy

    weight_average /= accumulation
    return weight_average


def fill_label(freq: str, datetime_manifest: Type[DatetimeIndex]):
    labels: list[str] = []

    match freq:
        case "year":
            labels = [str(year) for year in list(datetime_manifest.year)]
            if len(labels) >= 0:
                labels.pop()
        case "quarter":
            for i in range(1, len(datetime_manifest)):
                lower_bound = datetime_manifest[i - 1].month
                higher_bound = datetime_manifest[i].month

                match [lower_bound, higher_bound]:
                    case [1, 4]:
                        labels.append("Q1")
                    case [4, 7]:
                        labels.append("Q2")
                    case [7, 10]:
                        labels.append("Q3")
                    case [10, 1]:
                        labels.append("Q4")
                    case _:
                        label_mapping = pd.Series(data=["Q1", "Q2", "Q3", "Q4"], index=[1, 4, 7, 10])
                        labels.append(label_mapping[lower_bound])
        case "month":
            labels = [str(month) for month in list(datetime_manifest.month)]
            if len(labels) >= 0:
                labels.pop()
        case "week":
            labels = [
                "{}".format(
                    datetime.strftime(datetime_manifest[i], format="%U"),
                )
                for i in range(0, len(datetime_manifest))
            ]
            if len(labels) >= 0:
                labels.pop()
        case "day":
            labels = ["{}".format(datetime_manifest[i].day_of_year) for i in range(0, len(datetime_manifest))]
            if len(labels) >= 0:
                labels.pop()
        case _:
            pass

    return labels


for tuple in datetime_column_tuple:
    # date_range reference: https://pandas.pydata.org/docs/reference/api/pandas.date_range.html
    # Frequency reference: https://pandas.pydata.org/docs/user_guide/timeseries.html#timeseries-offset-aliases

    col = tuple[0]
    datetime_columns.append(col)

    # * Compare which datetime frequency is the best
    datetime_freq_manifest = ["year", "quarter", "month", "week", "day"]
    freq_manifest = ["YS", "QS", "MS", "W", "D"]
    freq_mapping = pd.Series(data=freq_manifest, index=datetime_freq_manifest)
    datetime_freq_entropy = []

    for datetime_freq in datetime_freq_manifest:
        datetime_manifest: Type[pd.DatetimeIndex] = pd.date_range(
            start=tuple[1], end=tuple[2], freq=freq_mapping[datetime_freq]
        )
        datetime_manifest = datetime_manifest.union([tuple[2]])
        labels = fill_label(datetime_freq, datetime_manifest)
        discrete_datetime_series = pd.Series(
            pd.cut(df[col], bins=datetime_manifest, labels=labels, include_lowest=True, ordered=False)
        )

        feature_entropy = count_feature_entropy(
            feature_name=col, target_name=target, feature_values=discrete_datetime_series, target_values=df[target]
        )
        datetime_freq_entropy.append(feature_entropy)

    min_value = min(datetime_freq_entropy)
    best_index = datetime_freq_entropy.index(min_value)
    best_freq = datetime_freq_manifest[best_index]

    print(datetime_freq_manifest)
    print(datetime_freq_entropy)
    print(best_freq)

    datetime_manifest: Type[pd.DatetimeIndex] = pd.date_range(
        start=tuple[1], end=tuple[2], freq=freq_mapping[best_freq]
    )
    datetime_manifest = datetime_manifest.union([tuple[2]])
    labels = fill_label(datetime_freq, datetime_manifest)

    quantile_interval = pd.cut(df[col], bins=datetime_manifest, include_lowest=True)
    quantile_mapping[col] = pd.Series(data=quantile_interval.tolist(), index=df[col].tolist())
    df[col] = pd.Series(pd.cut(df[col], bins=datetime_manifest, labels=labels, include_lowest=True, ordered=False))

## Target and features handler

In [None]:
X: pd.DataFrame
try:
    X = df.drop([target] + skip_features, axis=1)
except KeyError:
    print("Column of target or skip features are not exist in data frame")

feature_names = X.columns.tolist()

y = df[target].astype("string")
class_names = y.unique().tolist()

## Encode category column

In [None]:
import category_encoders as ce

category_frame = X.select_dtypes(include=["object", "category"])
encoder = ce.OrdinalEncoder(cols=category_frame.columns)
X = pd.DataFrame(encoder.fit_transform(X))

category_column_mapping = encoder.mapping
category_column_mapping

## Fitting decision tree model

In [None]:
from sklearn.tree import DecisionTreeClassifier

row_counts = len(X.index)
max_depth = 30
min_samples_split = 0
min_samples_leaf = 0

is_big_data = row_counts > 10000

if is_big_data:
    # 確保葉節點有足夠的樣本進行有意義的分析，同時避免過度細分
    # 100 - 1000
    min_samples_leaf = 100
    # 確保在分割內部節點之前有足夠的樣本數
    # 10 - 50
    min_samples_split = 10
else:
    # 確保每個葉節點至少有一些樣本進行分析
    # 1 or 2
    min_samples_leaf = 1
    # 確保在內部節點的樣本數較少時也可以進行分割
    # 2 - 5
    min_samples_split = 2


clf = DecisionTreeClassifier(
    criterion="entropy",
    max_depth=max_depth,
    random_state=42,
    min_samples_split=min_samples_split,
    min_samples_leaf=min_samples_leaf,
)

clf.fit(X, y)

### Feature importance

In [None]:
feature_importance = clf.feature_importances_
feature_importance_pairs = list(zip(feature_names, feature_importance))
feature_importance_pairs.sort(key=lambda pair: pair[1], reverse=True)
feature_importance_pairs

### Cross validation and model accuracy

In [None]:
from sklearn.model_selection import cross_val_score

# cv: k-fold, default is 5-fold
cross_validation_score = cross_val_score(clf, X, y, cv=5)
print("交叉驗證分數:", cross_validation_score)
print("平均分數:", cross_validation_score.mean())

## Resolve decision tree path data as json format

In [None]:
from dataclasses import dataclass


# Type definition of decision tree path data


@dataclass
class DecisionTreeNode:
    id: int
    labels: list[str]


@dataclass
class DecisionTreeEdge:
    id: int
    label: str
    head: int
    tail: int


@dataclass
class DecisionTreeGraph:
    nodes: list[DecisionTreeNode]
    edges: dict[str, DecisionTreeEdge]  # node1_node2 as key value


@dataclass
class DecisionTreePath:
    path: list[int]
    nodeLabel: dict[int, list[str]]

In [None]:
from sklearn import tree
from graphviz import Source

out_file_path = f"{Path.cwd().absolute()}/temp/temp.dot"

# * Scikit-learn decision tree:
# Using optimized version of the CART algorithm
# Not support categorical variable for now, that is, categorical variable need to encode

# * Entropy range:
# From 0 to 1 for binary classification (target has only two classes, true or false)
# From 0 to log base 2 k where k is the number of classes

class_names = clf.classes_.tolist()

dot_file = tree.export_graphviz(
    clf,
    out_file=out_file_path,
    feature_names=feature_names,
    class_names=class_names,
    max_depth=max_depth,
    label="all",
    rounded=True,
    filled=True,
)

with open(out_file_path, "r", encoding="utf-8") as f:
    dot_file = f.read()

# Use graphviz lib to convert dot format to json format
source = Source(dot_file)
json_graph = source.pipe(format="json").decode("utf-8")
dict_graph: dict = json.loads(json_graph)

# Filter needed part
nodes = list(
    map(
        lambda o: {"id": o.get("_gvid"), "labels": o.get("label").split("\\n")},
        dict_graph.get("objects"),
    )
)

edges = dict(
    map(
        lambda o: (
            str(o.get("tail")) + "_" + str(o.get("head")),
            {
                "id": o.get("_gvid"),
                "label": o.get("headlabel"),
                "head": o.get("tail"),
                "tail": o.get("head"),
            },
        ),
        dict_graph.get("edges"),
    )
)

## Store useful information

In [None]:
from math import nan

data_information: dict[str, str or list or dict] = {}
data_information["target_name"] = target
data_information["target_values"] = class_names
data_information["feature_names"] = feature_names

# Numeric
feature_values: dict[str, dict[str, str or list]] = {}

for n in numerical_column_tuple:
    feature_values[n[0]] = {"type": "numeric", "value": [n[1], n[2]]}

# Datetime
for part_df in datetime_column_tuple:
    format = "%Y-%m-%d %X"
    feature_values[part_df[0]] = {
        "type": "datetime",
        "value": [part_df[1].strftime(format), part_df[2].strftime(format)],
    }

# Category
for c in category_column_mapping:
    is_datetime_column = c["col"] in datetime_columns
    feature_values[c["col"]] = {
        "type": "datetime" if is_datetime_column else "category",
        "value": (c["mapping"].index.to_list()),
        "mapping": pd.Series(dict((v, k) for k, v in c["mapping"].items())),
    }
    feature_values[c["col"]]["value"].pop()

unstored_features = list(set(feature_names) - set(list(feature_values.keys())))

for f in unstored_features:
    split_value = df[f].unique().astype("str").tolist()
    df[f] = df[f].astype("str")

    mapping_pairs = dict((i + 1, split_value[i]) for i in range(len(split_value)))
    mapping_pairs[-2] = nan
    mapping = pd.Series(mapping_pairs)
    feature_values[f] = {
        "type": "category",
        "value": split_value,
        "mapping": mapping,
    }

data_information["feature_values"] = feature_values
data_information

## Decision tree path resolver

In [None]:
from math import log


def DecisionTreePathResolver(graph: DecisionTreeGraph, root_id: int = 0):
    paths: list[DecisionTreePath] = []

    # DFS: Depth-First Search
    def SearchPathByDFS(current_id: int = 0, path: list[int] = []):
        if not graph:
            return

        path.append(current_id)

        edge_values = list(map(lambda edge: DecisionTreeEdge(**edge), list(graph.edges.values())))
        outgoing_edges = list(filter(lambda edge: edge.head == current_id, edge_values))

        # 如果目前節點沒有出邊（即為最底層節點），將路徑加入結果中
        if len(outgoing_edges) == 0:
            last_id = path[len(path) - 1]
            last_node = DecisionTreeNode(**(graph.nodes[last_id]))
            node_labels: dict[int, list[str]] = {}

            # ! 排除 entropy 高的 path
            entropy = float(last_node.labels[0].split(" ")[2])

            if entropy > log(len(feature_names), 2) / 2:
                path.pop()
                return
            # ! ####################

            node_labels[last_id] = last_node.labels

            for i in range(0, len(path) - 1):
                node_id = path[i]
                labels = DecisionTreeNode(**(graph.nodes[node_id])).labels

                # 如果下一個的 node id 是上一個 +1 則是 true，不然的話是 false
                # * left edge <= => true
                # * right edge > => false

                next_id = path[i + 1]

                if node_id + 1 != next_id:
                    new_labels = [*labels]
                    condition = new_labels[0]
                    split_condition = condition.split(" ")
                    split_condition[1] = ">"
                    new_labels[0] = " ".join(split_condition)
                    node_labels[node_id] = new_labels
                    continue

                node_labels[node_id] = [*labels]

            paths.append(DecisionTreePath([*path], node_labels))

        # 遍歷目前節點的所有出邊
        else:
            for edge in outgoing_edges:
                next_id = edge.tail

                # 遞迴呼叫深度優先搜索
                SearchPathByDFS(next_id, path)

        # 回溯，從路徑中移除目前節點
        path.pop()

    SearchPathByDFS(root_id)

    return paths

In [None]:
decision_tree_graph = DecisionTreeGraph(nodes, edges)
paths = DecisionTreePathResolver(decision_tree_graph, 0)
print("Path counts = {}".format(len(paths)))

## Decision tree path analyzer

In [None]:
from math import ceil, floor


def DecisionTreePathAnalyzer(paths: list[DecisionTreePath], target_values: list[str], feature_names: list[str]):
    path_analysis_result: dict = {}
    for split_value in target_values:
        path_analysis_result[split_value] = []

    for path in paths:
        path_analysis_result_part = {}

        for feature_name in feature_names:
            path_analysis_result_part[feature_name] = data_information["feature_values"][feature_name]["value"].copy()

        for node_id in path.path:
            labels = path.nodeLabel[node_id][0].split(" ")
            feature_name = labels[0]
            split_symbol = labels[1]
            split_value = float(labels[2])

            if node_id == path.path[len(path.path) - 1]:
                class_name = path.nodeLabel[node_id][3].split(" ")[2]

                sample_value = " ".join(path.nodeLabel[node_id][2].split(" ")[2:]).split(", ")
                sample_value[0] = sample_value[0][1:]
                sample_value[len(sample_value) - 1] = sample_value[len(sample_value) - 1][0:-1]

                path_analysis_result_part["entropy"] = float(split_value)
                path_analysis_result_part["samples"] = list(map(lambda value: int(value), sample_value))
                path_analysis_result_part["labels"] = target_values
                path_analysis_result_part["class"] = class_name
                path_analysis_result_part["target"] = target

                path_analysis_result[class_name].append(path_analysis_result_part)
                break

            feature_type = data_information["feature_values"][labels[0]]["type"]
            split_situation = [split_symbol, feature_type]
            mapping: pd.Series = data_information["feature_values"][feature_name]["mapping"]

            match split_situation:
                case ["<=", "category"]:
                    index = path_analysis_result_part[feature_name].index(mapping[floor(split_value)])
                    path_analysis_result_part[feature_name] = path_analysis_result_part[feature_name][0 : index + 1]
                case ["<=", "datetime"]:
                    index = path_analysis_result_part[feature_name].index(mapping[floor(split_value)])
                    path_analysis_result_part[feature_name] = path_analysis_result_part[feature_name][0 : index + 1]
                case [">", "category"]:
                    index = path_analysis_result_part[feature_name].index(mapping[ceil(split_value)])
                    path_analysis_result_part[feature_name] = path_analysis_result_part[feature_name][index:]
                case [">", "datetime"]:
                    index = path_analysis_result_part[feature_name].index(mapping[ceil(split_value)])
                    path_analysis_result_part[feature_name] = path_analysis_result_part[feature_name][index:]
                case _:
                    print("no match case")

    return path_analysis_result

In [None]:
path_analysis_result = DecisionTreePathAnalyzer(paths=paths, target_values=class_names, feature_names=feature_names)
path_analysis_result

## Convert path analysis result to json string

In [None]:
json_str = json.dumps(path_analysis_result)
json_object = json.loads(json_str)

for k in json_object:
    print(k, len(json_object[k]))

## Final result

In [None]:
df

In [None]:
data_information

In [None]:
quantile_mapping

## Tests

In [None]:
import random

target_class = random.choice(class_names)
paths = path_analysis_result[target_class]
print(f"{target_class} path count: {len(paths)}")
target_path = random.choice(paths)

part_df = df.copy()

for feature in feature_names:
    value_df = pd.DataFrame(columns=df.columns)
    for value in target_path[feature]:
        value_df = pd.concat([value_df, part_df.loc[(part_df[feature] == value)]], ignore_index=False)
    part_df = value_df

print(f"Target column of analysis: {target_path['target']}")
print(f"Most class of this path: {target_path['class']}")
print(f"Time range: {datetime_column_tuple[0][1]} to {datetime_column_tuple[0][2]}")
print(part_df[target].value_counts())

part_df