## Unity Catalogの準備

In [0]:
CATALOG = "handson"
SCHEMA = "survey_analysis"
TABLE = "gold_survey_responses_final"
VOLUME = "raw_data"

In [0]:
print("Unity Catalogオブジェクトを作成しています...")

# Catalog（カタログ）の作成
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
print(f"  ✓ カタログ '{CATALOG}' を確認/作成しました")

# Catalogを使用対象として選択
spark.sql(f"USE CATALOG {CATALOG}")
print(f"  ✓ カタログ '{CATALOG}' を選択しました")

# Schema（スキーマ）の作成
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")
print(f"  ✓ スキーマ '{SCHEMA}' を確認/作成しました")

# Schemaを使用対象として選択
spark.sql(f"USE {SCHEMA}")
print(f"  ✓ スキーマ '{SCHEMA}' を選択しました")

# Volume（ボリューム）の作成
spark.sql(f"""
    CREATE VOLUME IF NOT EXISTS {VOLUME} 
    COMMENT 'アンケート生データの保存場所'
""")
print(f"  ✓ ボリューム '{VOLUME}' を確認/作成しました")

print("\n✓ Unity Catalogの準備が完了しました")

## テーブルデータファイルをUnity Catalog Volumeへコピー

In [0]:
import shutil

src = "../data/delta_export_data.csv"
dst = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/delta_export_data.csv"
shutil.copy(src, dst)

## テーブルを作成

In [0]:
# (1) テーブルを作成

new_table_name = f"{CATALOG}.{SCHEMA}.{TABLE}"

ddl_statement = f"""
CREATE TABLE {new_table_name} (
  id BIGINT COMMENT '回答一意識別子 (primary key)；feedback テーブルとの参照キー',
  submitted_at TIMESTAMP COMMENT '回答提出日時（タイムスタンプ）',
  age_group STRING COMMENT '年代グループ（例: u19, 20s, 30s, 40s, 50s, 60plus）',
  gender STRING COMMENT '性別カテゴリ：male, female, other, no_answer',
  occupation STRING COMMENT '職業カテゴリ（例: company_employee, student, homemaker, other）',
  region STRING COMMENT '地域カテゴリ（hokkaido_to_hoku, kanto, chubu, kinki, chugoku_shikoku, kyushu_okinawa, overseas, no_answer）',
  drives_weekly BOOLEAN COMMENT '週1 回以上運転するかどうか（真偽値）',
  drinking_freq STRING COMMENT '普段の飲酒頻度：daily, weekly, monthly, never',
  fav_alcohols_arr ARRAY<STRING> COMMENT '通常好む酒類（文字列、複数可）（ARRAY<STRING>）',
  nonalc_freq STRING COMMENT 'ノンアル飲用頻度：first_time, sometimes, weekly',
  health_reasons_arr ARRAY<STRING> COMMENT 'health_reasons を配列化したもの（ARRAY<STRING>）',
  purchase_intent STRING COMMENT '購入意向カテゴリ：def_buy, maybe, unsure, no',
  price_band STRING COMMENT '許容価格帯カテゴリ（例: under199, 200_249, 250_299, 300_349, 350plus）',
  satisfaction_int INT COMMENT '満足度を整数型に変換したもの（1～5 の INT）',
  free_comment STRING COMMENT '自由記述による感想文',
  positive_point STRING COMMENT 'AI 抽出によるポジティブ文句群（ARRAY<STRING>）',
  negative_feedback STRING COMMENT 'AI 抽出によるネガティブ・改善要望文句群（ARRAY<STRING>）',
  scene STRING COMMENT '飲用シーン語句（ARRAY<STRING>）',
  positive_point_category STRING COMMENT '
positive_pointの分類カテゴリ：以下 18 種類のいずれか。  
1. Appearance：外観・見た目（色・透明度・泡質）  
2. Aroma：香り（ホップ香・モルト香・果実香・オフ香）  
3. Flavor：味覚（甘味・苦味・酸味・コク・バランス）  
4. MouthfeelCarbonation：口当たり・炭酸感（泡・ガス強度・舌触り）  
5. DrinkabilityAftertaste：飲みやすさ・後味（ゴクゴク・キレ・残留感）  
6. ABV：アルコール度数への言及（例: “0.00%”）  
7. CaloriesCarbs：カロリー・糖質関係の言及  
8. AdditivesAllergen：添加物・香料・アレルゲンの言及  
9. Ingredients：原材料・素材・産地・自然由来  
10. PackagingDesign：包装・ラベル・外装デザイン  
11. PackServingFormat：容量・パッケージ形態（缶・瓶・パック数）  
12. ShelfLifeStorage：保存性・賞味期限・開封後品質  
13. PriceValue：価格・コスパ・値ごろ感  
14. Availability：流通性・店舗・EC 販売可能性  
15. PromotionVisibility：販促・広告・POP・訴求力  
16. BrandImageStory：ブランドイメージ・理念・物語性  
17. SocialAcceptability：運転中・妊娠中などでも飲める許容感  
18. SustainabilityEthics：環境配慮・サステナビリティ・倫理性',
  positive_point_confidence STRING COMMENT 'positive_point_category に対するConfidence Score。分類信頼度（0.0～1.0）',
  negative_feedback_category STRING COMMENT '
negative_feedbackの分類カテゴリ：以下 18 種類のいずれか。  
1. Appearance：外観・見た目（色・透明度・泡質）  
2. Aroma：香り（ホップ香・モルト香・果実香・オフ香）  
3. Flavor：味覚（甘味・苦味・酸味・コク・バランス）  
4. MouthfeelCarbonation：口当たり・炭酸感（泡・ガス強度・舌触り）  
5. DrinkabilityAftertaste：飲みやすさ・後味（ゴクゴク・キレ・残留感）  
6. ABV：アルコール度数への言及（例: “0.00%”）  
7. CaloriesCarbs：カロリー・糖質関係の言及  
8. AdditivesAllergen：添加物・香料・アレルゲンの言及  
9. Ingredients：原材料・素材・産地・自然由来  
10. PackagingDesign：包装・ラベル・外装デザイン  
11. PackServingFormat：容量・パッケージ形態（缶・瓶・パック数）  
12. ShelfLifeStorage：保存性・賞味期限・開封後品質  
13. PriceValue：価格・コスパ・値ごろ感  
14. Availability：流通性・店舗・EC 販売可能性  
15. PromotionVisibility：販促・広告・POP・訴求力  
16. BrandImageStory：ブランドイメージ・理念・物語性  
17. SocialAcceptability：運転中・妊娠中などでも飲める許容感  
18. SustainabilityEthics：環境配慮・サステナビリティ・倫理性',
  negative_feedback_confidence STRING COMMENT 'negative_feedback_category に対するConfidence Score。分類信頼度（0.0～1.0）')
USING delta
COMMENT '
アンケート回答データ。加えて、さまざまなデータ加工を行い、いくつかの分析用のデータを追加したもの。'
TBLPROPERTIES (
  'delta.enableDeletionVectors' = 'true',
  'delta.feature.appendOnly' = 'supported',
  'delta.feature.deletionVectors' = 'supported',
  'delta.feature.invariants' = 'supported',
  'delta.minReaderVersion' = '3',
  'delta.minWriterVersion' = '7')
"""

spark.sql(f"DROP TABLE IF EXISTS {new_table_name}")
spark.sql(ddl_statement)

## テーブルへデータとメタデータをインポート

In [0]:
import pandas as pd
from pyspark.sql import functions as F

# ---------- (2) データをインポート（型を揃えてから append） ----------
csv_path = f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME}/delta_export_data.csv"

# 2-1) CSVはまず文字列としてSparkで読み込み（後で明示キャスト）
raw = (
    spark.read
         .option("header", "true")
         .option("multiLine", "true")   # 改行を含む列を安全に読み込み
         .option("quote", '"')
         .option("escape", '"')
         .csv(csv_path)                 # 既定では全列がSTRINGで入る
)

# 2-2) 文字列で読んだ配列っぽい列を array<string> へ正規化
def parse_str_array(colname: str):
    c = F.col(colname)
    # nullは空配列へ
    c_norm = F.when(c.isNull(), F.lit(""))
    # 角括弧とクォート、カンマを除去 → 空白で分割（"['beer' 'wine']" / "beer, wine" どちらも対応）
    c_norm = F.regexp_replace(F.coalesce(c, F.lit("")), r"[\\[\\]]", "")
    c_norm = F.regexp_replace(c_norm, r"[\"']", "")
    c_norm = F.regexp_replace(c_norm, r",", " ")
    c_norm = F.trim(c_norm)
    arr = F.when(F.length(c_norm) == 0, F.array().cast("array<string>")) \
            .otherwise(F.split(c_norm, r"\\s+"))
    return arr

# 2-3) 既存テーブルのスキーマに明示キャスト
df = (
    raw
    .withColumn("id", F.col("id").cast("bigint"))
    .withColumn("submitted_at", F.to_timestamp("submitted_at", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("age_group", F.col("age_group").cast("string"))
    .withColumn("gender", F.col("gender").cast("string"))
    .withColumn("occupation", F.col("occupation").cast("string"))
    .withColumn("region", F.col("region").cast("string"))
    .withColumn("drives_weekly",
                F.lower(F.col("drives_weekly")).isin("true","t","1","yes","y"))
    .withColumn("drinking_freq", F.col("drinking_freq").cast("string"))
    .withColumn("fav_alcohols_arr", parse_str_array("fav_alcohols_arr").cast("array<string>"))
    .withColumn("nonalc_freq", F.col("nonalc_freq").cast("string"))
    .withColumn("health_reasons_arr", parse_str_array("health_reasons_arr").cast("array<string>"))
    .withColumn("purchase_intent", F.col("purchase_intent").cast("string"))
    .withColumn("price_band", F.col("price_band").cast("string"))
    .withColumn("satisfaction_int", F.col("satisfaction_int").cast("int"))
    .withColumn("free_comment", F.col("free_comment").cast("string"))
    .withColumn("positive_point", F.col("positive_point").cast("string"))
    .withColumn("negative_feedback", F.col("negative_feedback").cast("string"))
    .withColumn("scene", F.col("scene").cast("string"))
    .withColumn("positive_point_category", F.col("positive_point_category").cast("string"))
    # DDL は STRING なので数値(1.0)が来ても STRING に統一
    .withColumn("positive_point_confidence", F.col("positive_point_confidence").cast("string"))
    .withColumn("negative_feedback_category", F.col("negative_feedback_category").cast("string"))
    .withColumn("negative_feedback_confidence", F.col("negative_feedback_confidence").cast("string"))
)

cols = [
    "id","submitted_at","age_group","gender","occupation","region","drives_weekly",
    "drinking_freq","fav_alcohols_arr","nonalc_freq","health_reasons_arr",
    "purchase_intent","price_band","satisfaction_int","free_comment",
    "positive_point","negative_feedback","scene","positive_point_category",
    "positive_point_confidence","negative_feedback_category","negative_feedback_confidence"
]
df.select(*cols).write.format("delta").mode("append").saveAsTable(new_table_name)

# ---------- (3) カラムコメントを復元（COMMENT ON COLUMN を使用） ----------
columns_df = pd.read_csv("../data/delta_export_column_comments.csv")
for row in columns_df.itertuples():
    if pd.notna(row.comment) and row.comment:
        col = row.column_name
        comment_escaped = str(row.comment).replace("'", "''")
        spark.sql(f"COMMENT ON COLUMN {new_table_name}.{col} IS '{comment_escaped}'")

# ---------- (4) テーブルコメントを復元（COMMENT ON TABLE を使用） ----------
with open("../data/delta_export_table_comment.txt", "r") as f:
    table_comment = f.read().strip()
if table_comment:
    table_comment_escaped = table_comment.replace("'", "''")
    spark.sql(f"COMMENT ON TABLE {new_table_name} IS '{table_comment_escaped}'")

print(f"テーブル {new_table_name} のインポートが完了しました。")

---

## テーブルの内容を確認

In [0]:
table_df = spark.table(new_table_name)
display(table_df)

## データ加工のポイント：LLMを活用した非構造データの構造化

In [0]:
%sql
SELECT
  id,
  free_comment,
  ai_query(
    'databricks-gpt-oss-120b',
    CONCAT('''以下の日本語の感想文から情報抽出してください。

【出力仕様】
- positive_point: 
  感想の中のポジティブな意見のうち、最も強調されているものを短文で一つ抽出してください（例: 香りがよい、後味がすっきり）。
  同様の内容は一つの文章にまとめ、サマライズして、簡潔にしてください。日本語で出力してください。句読点は不要です。
  ただし、シーンに関するポジティブな言及はscenesに含め、positive_pointには含めないでください。
  Positiveなコメントが見つからない場合は空文字を入れてください。
- negative_feedback: ネガティブな意見、または、要望や提案のうち、最も強調されているものを短文で一つ抽出してください。
  同様の内容は一つの文章にまとめ、サマライズして、簡潔にしてください。
  原文の言葉表現がカジュアルな場合は、同じ意味のビジネスライクな表現に書き直してください。
  日本語で出力してください。句読点は不要です。
  Negativeなコメントが見つからない場合は空文字を入れてください。
- scene: 飲用シーン（例: 夕食、風呂上がり、運転前、スポーツ後 など）のうち、最も強調されているものを単語で一つ出力してください。
  同様の内容は一つのワードにまとめ、サマライズして、簡潔にしてください。日本語で出力してください。句読点は不要です。

【出力形式】
構造化: positive_point, negative_feedback, scene

【対象の感想文】
''', COALESCE(free_comment, '')),
    responseFormat => '{
        "type": "json_schema",
        "json_schema": {
          "name": "extracted_data",
          "schema": {
            "type": "object",
            "properties": {
              "positive_point": {"type": "string"},
              "negative_feedback": {"type": "string"},
              "scene": {"type": "string"}
            }
        },
        "strict": true
      }
    }'
  ) AS extracted_data
FROM handson.survey_analysis.gold_survey_responses_final
LIMIT 5

---