In [None]:
%pip install cron-descriptor
dbutils.library.restartPython()

In [None]:
%run ./cron_descriptor

In [0]:
from pyspark.sql import SparkSession as SparkSession, functions as F, types as T
import os
import pandas as pd
from typing import Optional

In [None]:
LINE_SEP = "  \n"
SLACK_URL = "https://www.youtube.com/"


In [None]:
def _clean_str(s:Optional[str]) -> str:
    if s is None:
        return ""
    return s.strip()


def _normalize_text(text:Optional[str]) -> Optional[str]:
    """ 文字列の整形(explanation、type_conversion、rule用)
    """
    return _clean_str(text).replace("\n", LINE_SEP)


def _normalize_url(prefix_text:str, link_text:str, url:Optional[str]) -> str:
    """ 文字列の整形(link、query、reference用)
    """
    url = _clean_str(url)
    if url != "":
        return f"{prefix_text}[{link_text}]({url})"
    return url


def _generate_description_schema(name_ja:Optional[str], explanation:Optional[str]) -> str:
    """ 下記の形式で説明文を生成する
    データソース名/システム名：{name_ja}
    Overview：このスキーマに含まれるデータは、主に上記のシステムから取り込まれています。
    データソース概要：{explanation}
    連絡先：[#sys-データ基盤の相談や情報共有](SLACK_URL)
    """
    explanation_text = _normalize_text(explanation)
    description = []
    parts = [
        f"データソース名/システム名：{_clean_str(name_ja)}",
        "Overview：このスキーマに含まれるデータは、主に上記のシステムから取り込まれています。",
        f"データソース概要：{explanation_text}" if explanation_text != "" else "",
        f"連絡先：[#sys-データ基盤の相談や情報共有]({SLACK_URL})"
    ]
    description.extend([s for s in parts if s != ""])
    return LINE_SEP.join(description)


def _generate_description_table(name_ja:Optional[str], explanation:Optional[str], type_conversion:Optional[str], rule:Optional[str], schedule:Optional[str], query:Optional[str], reference:Optional[str], link:Optional[str]=None) -> str:
    """ 下記の形式で説明文を生成する
    TableName：{name_ja}
    Overview：
    {explanation}
    {type_conversion}
    {rule}
    更新頻度：{schedule}
    元テーブル仕様書：[link]({link})
    作成クエリ：[query]({query})
    対応チケット：[reference]({reference})
    連絡先：[#sys-データ基盤の相談や情報共有](SLACK_URL)
    """
    description = []
    parts = [
        f"TableName：{_clean_str(name_ja)}",
        "Overview：",
        _normalize_text(explanation),
        _normalize_text(type_conversion),
        _normalize_text(rule),
        f"更新頻度：{schedule}" if schedule is not None else "",
        _normalize_url("元テーブル仕様書：", "link", link),
        _normalize_url("作成クエリ：", "query", query),
        _normalize_url("対応チケット：", "reference", reference),
        f"連絡先：[#sys-データ基盤の相談や情報共有]({SLACK_URL})",
    ]
    description.extend([s for s in parts if s != ""])
    return LINE_SEP.join(description)


def _generate_description_view(name_ja:Optional[str], schedule:Optional[str], reference:Optional[str]) -> str:
    """ 下記の形式で説明文を生成する
    TableName：{name_ja}
    Overview：元テーブル名から必要なカラムを取り出したビュー。詳細を確認したい場合は、元テーブルを参照してください。
    更新頻度：{schedule}
    対応チケット：[reference]({reference})
    連絡先：[#sys-データ基盤の相談や情報共有]({SLACK_URL})
    """
    description = []
    parts = [
        f"TableName：{_clean_str(name_ja)}",
        "Overview：元テーブル名から必要なカラムを取り出したビュー。詳細を確認したい場合は、元テーブルを参照してください。",
        f"更新頻度：{schedule}" if schedule is not None else "",
        _normalize_url("対応チケット：", "reference", reference),
        f"連絡先：[#sys-データ基盤の相談や情報共有]({SLACK_URL})",
    ]
    description.extend([s for s in parts if s != ""])
    return LINE_SEP.join(description)


In [None]:
try:
    # 引数の受け取り
    dbutils.widgets.text("SOURCE_TABLE", "")
    source_table = dbutils.widgets.get("SOURCE_TABLE")
    
    stage = os.environ.get("STAGE") 
    where_clause = "WHERE catalog RLIKE '_dev$'" if (stage != "prod") else ""

    # SQL文の生成
    sql_text = f"""
    SELECT * 
    FROM {source_table} 
    {where_clause}
    """
    print(sql_text)
    spark = SparkSession.builder.getOrCreate()
    df_target = spark.sql(sql_text)

    display(df_target) 
except Exception as e:
    raise Exception(f"`{source_table}`テーブルの取得に失敗しました。SQL: \"{sql_text}\", Error: {e}")

In [None]:
if df_target.count() == 0:
    print("Descriptionの更新対象がなかったため、処理をスキップして終了しました。")
else:
    error_records = []
    df_target_sorted = df_target.orderBy(F.col("catalog"), F.col("schema"), F.col("table"))
    for row in df_target_sorted.toLocalIterator():
        # カラムの取得
        catalog         = row['catalog']
        schema          = row['schema']
        table           = row['table']
        name_ja         = row['name_ja']
        table_category  = row['table_category']
        cron_schedule   = row['cron_schedule']
        explanation     = row['explanation']
        type_conversion = row['type_conversion']
        rule            = row['rule']
        link            = row['link']
        query           = row['query']
        reference       = row['reference']

        # cron式の自然言語変換
        update_schedule_expr = None
        try:
            cron_schedule = _clean_str(cron_schedule)
            if cron_schedule != "":
                update_schedule_expr = cron_to_japanese(cron_schedule)
        except Exception as e:
            error_records.append({
                "catalog": catalog,
                "schema": schema,
                "table": table,
                "table_category": table_category,
                "error_category": "cron_schedule_error",
                "cron_schedule": cron_schedule,
                "sql": None,
                "error_message": str(e)
            })
            continue

        # Descriptionの生成
        description = ""
        try:
            if table_category == 0:
                # description = _generate_description_schema(name_ja, explanation)
                raise ValueError(f"スキーマはジョブでの更新対象外です。手動で更新してください。: table_category={table_category}")
            elif table_category == 1:
                if "staging" in catalog:
                    description = _generate_description_table(name_ja, explanation, type_conversion, rule, update_schedule_expr, query, reference, link)
                elif "conformed" in catalog or "published" in catalog:
                    description = _generate_description_table(name_ja, explanation, type_conversion, rule, update_schedule_expr, query, reference)
                else:
                    raise ValueError(f"カタログがジョブの更新対象外です。このジョブは(staging, conformed, published)を含むカタログのみを対象にしています。: catalog=\"{catalog}\"")
            elif table_category == 2:
                description = _generate_description_view(name_ja, update_schedule_expr, reference)
            elif table_category == 3:
                raise ValueError(f"ビューテーブルはジョブでの更新対象外です。手動で更新してください。: table_category={table_category}")
            else:
                raise ValueError(f"table_categoryカラムの値が不適切です。table_categoryは(1, 2)のいずれかにしてください。: table_category={table_category}")
        except Exception as e:
            error_records.append({
                "catalog": catalog,
                "schema": schema,
                "table": table,
                "table_category": table_category,
                "error_category": "generate_description_error",
                "cron_schedule": cron_schedule,
                "sql": None,
                "error_message": str(e)
            })
            continue
        # print(description) # デバッグ用

        # SQLの実行
        sql = ""
        try:
            if table_category == 1: # Tableの場合
                sql += f"COMMENT ON TABLE {catalog}.{schema}.{table} IS '{description}'"
            elif table_category == 2: # Materialized Viewの場合
                sql += f"COMMENT ON VIEW {catalog}.{schema}.{table} IS '{description}'"
            else:
                raise ValueError(f"table_categoryカラムの値が不適切です。table_categoryは(1, 2)のいずれかにしてください。: table_category={table_category}")
            spark.sql(sql)
        except Exception as e:
            error_records.append({
                "catalog": catalog,
                "schema": schema,
                "table": table,
                "table_category": table_category,
                "error_category": "update_description_error",
                "cron_schedule": cron_schedule,
                "sql": sql,
                "error_message": str(e)
            })

    # エラーがある場合、エラーを表示
    if len(error_records) > 0:
        df_error = pd.DataFrame(error_records).astype("string")
        display(df_error)
        raise Exception("Descriptionの更新に失敗したレコードがあります。")
    else:
        print("Descriptionの更新がすべて成功しました。")
