In [None]:
# 必要なパッケージをインストール
!pip install boto3 pyarrow pandas awswrangler pyiceberg

In [None]:
import os

# AWS認証情報の設定
os.environ['AWS_ACCESS_KEY_ID'] = 'XXXXXXXXXXXXXXX'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'XXXXXXXXXXXXXXX'
os.environ['AWS_DEFAULT_REGION'] = 'XXXXXXXXXXXXXXX'

import boto3

# Glue クライアントの作成
glue_client = boto3.client('glue')

# フィードバックデータ用のデータベースを作成
try:
    response = glue_client.get_database(Name='customer_feedback_db')
    print("データベース 'customer_feedback_db' は既に存在します")
except glue_client.exceptions.EntityNotFoundException:
    glue_client.create_database(
        DatabaseInput={
            'Name': 'customer_feedback_db',
            'Description': 'Customer Feedback Database for RAG system'
        }
    )
    print("データベース 'customer_feedback_db' を作成しました")
except Exception as e:
    print(f"データベース操作中にエラーが発生しました: {e}")

In [None]:
# 既存のセッションを停止
if 'spark' in locals():
    spark.stop()

from pyspark.sql import SparkSession

# AWS Glue 5.0に適したIceberg設定
spark = SparkSession.builder \
    .appName("CustomerFeedbackIceberg") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://xxxxxxxxxxxxxxxxx/iceberg/") \
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
    .config("spark.sql.catalog.glue_catalog.lock-impl", "org.apache.iceberg.aws.dynamodb.DynamoDbLockManager") \
    .config("spark.sql.catalog.glue_catalog.lock.table", "iceberg_lock") \
    .config("spark.sql.defaultCatalog", "glue_catalog") \
    .getOrCreate()

print("Sparkセッションを構成しました")

In [None]:
# feedback_reports テーブルを作成
try:
    spark.sql("""
    CREATE TABLE glue_catalog.customer_feedback_db.feedback_reports (
        report_id STRING,
        report_date DATE,
        period_start DATE,
        period_end DATE,
        total_feedback_count INT,
        average_satisfaction FLOAT,
        nps_score INT,
        department STRING,
        pdf_path STRING
    ) USING iceberg
    """)
    print("feedback_reports テーブルを作成しました")
except Exception as e:
    print(f"feedback_reports テーブル作成中にエラーが発生しました: {e}")

# customer_feedback テーブルを作成（パーティション付き）
try:
    spark.sql("""
    CREATE TABLE glue_catalog.customer_feedback_db.customer_feedback (
        feedback_id STRING,
        report_id STRING,
        customer_id STRING,
        customer_name STRING,
        feedback_date DATE,
        rating INT,
        feedback_text STRING,
        product_id STRING,
        channel STRING,
        is_featured_in_report BOOLEAN
    ) USING iceberg
    PARTITIONED BY (days(feedback_date))
    """)
    print("customer_feedback テーブルを作成しました")
except Exception as e:
    print(f"customer_feedback テーブル作成中にエラーが発生しました: {e}")

# improvement_points テーブルを作成
try:
    spark.sql("""
    CREATE TABLE glue_catalog.customer_feedback_db.improvement_points (
        point_id STRING,
        report_id STRING,
        category STRING,
        description STRING,
        mention_count INT,
        priority INT,
        status STRING
    ) USING iceberg
    """)
    print("improvement_points テーブルを作成しました")
except Exception as e:
    print(f"improvement_points テーブル作成中にエラーが発生しました: {e}")

# action_items テーブルを作成
try:
    spark.sql("""
    CREATE TABLE glue_catalog.customer_feedback_db.action_items (
        action_id STRING,
        report_id STRING,
        description STRING,
        related_point_id STRING,
        due_date DATE,
        owner STRING,
        status STRING
    ) USING iceberg
    """)
    print("action_items テーブルを作成しました")
except Exception as e:
    print(f"action_items テーブル作成中にエラーが発生しました: {e}")

# product_metrics テーブルを作成
try:
    spark.sql("""
    CREATE TABLE glue_catalog.customer_feedback_db.product_metrics (
        metric_id STRING,
        report_id STRING,
        product_id STRING,
        metric_type STRING,
        score FLOAT,
        previous_score FLOAT,
        trend FLOAT
    ) USING iceberg
    """)
    print("product_metrics テーブルを作成しました")
except Exception as e:
    print(f"product_metrics テーブル作成中にエラーが発生しました: {e}")

# feedback_categories テーブルを作成
try:
    spark.sql("""
    CREATE TABLE glue_catalog.customer_feedback_db.feedback_categories (
        category_id STRING,
        report_id STRING,
        category_name STRING,
        feedback_count INT,
        average_rating FLOAT,
        positive_percentage FLOAT,
        negative_percentage FLOAT
    ) USING iceberg
    """)
    print("feedback_categories テーブルを作成しました")
except Exception as e:
    print(f"feedback_categories テーブル作成中にエラーが発生しました: {e}")

In [None]:
# feedback_reports テーブルにデータを挿入
spark.sql("""
INSERT INTO glue_catalog.customer_feedback_db.feedback_reports VALUES
('REP202504', CAST('2025-04-30' AS DATE), CAST('2025-04-01' AS DATE), CAST('2025-04-30' AS DATE), 247, 4.2, 68, '顧客サポート部門', 's3://xxxxxxxxxxxxxxxxx/reports/feedback_report_202504.pdf'),
('REP202503', CAST('2025-03-31' AS DATE), CAST('2025-03-01' AS DATE), CAST('2025-03-31' AS DATE), 215, 3.9, 62, '顧客サポート部門', 's3://xxxxxxxxxxxxxxxxx/reports/feedback_report_202503.pdf'),
('REP202502', CAST('2025-02-28' AS DATE), CAST('2025-02-01' AS DATE), CAST('2025-02-28' AS DATE), 198, 4.0, 65, '顧客サポート部門', 's3://xxxxxxxxxxxxxxxxx/reports/feedback_report_202502.pdf')
""")
print("feedback_reports テーブルにデータを挿入しました")

# customer_feedback テーブルにデータを挿入
spark.sql("""
INSERT INTO glue_catalog.customer_feedback_db.customer_feedback VALUES
('FB001', 'REP202504', 'CUST001', '山田太郎', CAST('2025-04-15' AS DATE), 5, '新しい分析機能がとても使いやすく、業務効率が大幅に向上しました。特にデータ可視化のための直感的なインターフェースが気に入っています。', 'PROD-A', 'email', true),
('FB002', 'REP202504', 'CUST002', '鈴木花子', CAST('2025-04-08' AS DATE), 4, 'カスタマーサポートの対応が素晴らしかったです。技術的な問題が発生した際も、担当者が丁寧に説明してくれて安心しました。', 'PROD-B', 'support_call', true),
('FB003', 'REP202504', 'CUST003', '田中健太', CAST('2025-04-22' AS DATE), 3, '製品自体は良いのですが、マニュアルが不足していると感じました。特に新機能の使い方についてもっと詳しい説明があるとより活用できると思います。', 'PROD-A', 'app', true),
('FB004', 'REP202504', 'CUST004', '佐藤美咲', CAST('2025-04-05' AS DATE), 4, 'モバイルアプリのレスポンスが少し遅いと感じることがありますが、機能自体は大変満足しています。', 'PROD-C', 'app', false),
('FB005', 'REP202504', 'CUST005', '伊藤雄太', CAST('2025-04-18' AS DATE), 2, 'データエクスポート機能が使いづらいです。もっと直感的な操作ができると良いと思います。', 'PROD-B', 'email', false),
('FB006', 'REP202503', 'CUST006', '渡辺智子', CAST('2025-03-15' AS DATE), 5, '先日リリースされたアップデートで使い勝手が格段に向上しました。ありがとうございます。', 'PROD-A', 'app', true),
('FB007', 'REP202503', 'CUST007', '小林誠', CAST('2025-03-22' AS DATE), 3, 'インターフェースの一貫性がないように感じます。画面によって操作方法が異なるのは混乱する原因です。', 'PROD-C', 'email', true)
""")
print("customer_feedback テーブルにデータを挿入しました")

# improvement_points テーブルにデータを挿入
spark.sql("""
INSERT INTO glue_catalog.customer_feedback_db.improvement_points VALUES
('IP001', 'REP202504', 'パフォーマンス', 'モバイルアプリのパフォーマンス最適化', 42, 1, 'identified'),
('IP002', 'REP202504', 'ドキュメント', 'オンラインマニュアルの充実', 37, 2, 'in_progress'),
('IP003', 'REP202504', '機能', 'データエクスポート機能の拡張', 29, 3, 'identified'),
('IP004', 'REP202504', 'UI/UX', 'ユーザーインターフェースの一貫性向上', 24, 4, 'planned'),
('IP005', 'REP202504', '言語サポート', '多言語対応の強化', 18, 5, 'identified'),
('IP006', 'REP202503', 'パフォーマンス', 'サーバーレスポンス時間の改善', 31, 1, 'resolved'),
('IP007', 'REP202503', '機能', 'バッチ処理オプションの追加', 25, 2, 'in_progress')
""")
print("improvement_points テーブルにデータを挿入しました")

# action_items テーブルにデータを挿入
spark.sql("""
INSERT INTO glue_catalog.customer_feedback_db.action_items VALUES
('ACT001', 'REP202504', 'モバイルアプリの最適化プロジェクトを第2四半期に開始', 'IP001', CAST('2025-06-30' AS DATE), '開発部門', 'planned'),
('ACT002', 'REP202504', 'オンラインヘルプの内容見直しと拡充', 'IP002', CAST('2025-05-31' AS DATE), 'ドキュメント部門', 'in_progress'),
('ACT003', 'REP202504', 'ユーザー向けウェビナーの開催', 'IP002', CAST('2025-05-10' AS DATE), 'マーケティング部門', 'planned'),
('ACT004', 'REP202504', '主要機能のチュートリアルビデオの作成', 'IP002', CAST('2025-06-30' AS DATE), 'ドキュメント部門', 'planned'),
('ACT005', 'REP202504', 'データエクスポート機能のUI改善', 'IP003', CAST('2025-07-15' AS DATE), '開発部門', 'planned'),
('ACT006', 'REP202503', 'サーバー増強によるレスポンス改善', 'IP006', CAST('2025-03-25' AS DATE), 'インフラ部門', 'completed'),
('ACT007', 'REP202503', 'バッチ処理API開発', 'IP007', CAST('2025-04-30' AS DATE), '開発部門', 'in_progress')
""")
print("action_items テーブルにデータを挿入しました")

# product_metrics テーブルにデータを挿入
spark.sql("""
INSERT INTO glue_catalog.customer_feedback_db.product_metrics VALUES
('MET001', 'REP202504', 'PROD-A', 'satisfaction', 4.3, 4.1, 0.2),
('MET002', 'REP202504', 'PROD-A', 'usability', 4.1, 3.8, 0.3),
('MET003', 'REP202504', 'PROD-A', 'reliability', 4.5, 4.4, 0.1),
('MET004', 'REP202504', 'PROD-B', 'satisfaction', 3.9, 3.7, 0.2),
('MET005', 'REP202504', 'PROD-B', 'usability', 3.6, 3.5, 0.1),
('MET006', 'REP202504', 'PROD-C', 'satisfaction', 4.0, 3.9, 0.1),
('MET007', 'REP202503', 'PROD-A', 'satisfaction', 4.1, 3.9, 0.2),
('MET008', 'REP202503', 'PROD-B', 'satisfaction', 3.7, 3.6, 0.1)
""")
print("product_metrics テーブルにデータを挿入しました")

# feedback_categories テーブルにデータを挿入
spark.sql("""
INSERT INTO glue_catalog.customer_feedback_db.feedback_categories VALUES
('CAT001', 'REP202504', 'UI/UX', 85, 4.2, 78.5, 21.5),
('CAT002', 'REP202504', 'パフォーマンス', 67, 3.8, 65.2, 34.8),
('CAT003', 'REP202504', '機能性', 45, 4.3, 82.1, 17.9),
('CAT004', 'REP202504', 'サポート', 30, 4.6, 90.3, 9.7),
('CAT005', 'REP202504', 'ドキュメント', 20, 3.2, 45.5, 54.5),
('CAT006', 'REP202503', 'UI/UX', 75, 3.9, 70.5, 29.5),
('CAT007', 'REP202503', 'パフォーマンス', 60, 3.5, 62.1, 37.9)
""")
print("feedback_categories テーブルにデータを挿入しました")

In [None]:
# テーブル一覧を確認
print("データベース内のテーブル一覧:")
spark.sql("SHOW TABLES IN glue_catalog.customer_feedback_db").show()

# 各テーブルのデータ確認
print("\nfeedback_reports テーブルのデータ:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.feedback_reports").show(truncate=False)

print("\ncustomer_feedback テーブルのデータ:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback").show(truncate=False)

print("\nimprovement_points テーブルのデータ:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.improvement_points").show(truncate=False)

# 特定の製品に関するフィードバックを取得
print("\n製品「PROD-A」に関するフィードバック:")
spark.sql("""
SELECT c.customer_name, c.feedback_date, c.rating, c.feedback_text
FROM glue_catalog.customer_feedback_db.customer_feedback c
WHERE c.product_id = 'PROD-A'
ORDER BY c.feedback_date DESC
""").show(truncate=False)

# 平均評価スコアが高い製品を取得
print("\n平均評価スコアが高い製品:")
spark.sql("""
SELECT m.product_id, AVG(m.score) as avg_score
FROM glue_catalog.customer_feedback_db.product_metrics m
JOIN glue_catalog.customer_feedback_db.feedback_reports r ON m.report_id = r.report_id
WHERE m.metric_type = 'satisfaction'
GROUP BY m.product_id
ORDER BY avg_score DESC
""").show()

# 改善が必要な項目とアクションアイテムの関連を確認
print("\n改善ポイントと関連するアクションアイテム:")
spark.sql("""
SELECT i.category, i.description as improvement_point, i.mention_count, a.description as action_item, a.status
FROM glue_catalog.customer_feedback_db.improvement_points i
JOIN glue_catalog.customer_feedback_db.action_items a ON i.point_id = a.related_point_id
WHERE i.report_id = 'REP202504'
ORDER BY i.priority
""").show(truncate=False)

In [None]:
# customer_feedback テーブルの履歴を確認
print("customer_feedback テーブルの履歴:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback.history").show(truncate=False)

# 新しいフィードバックを追加
spark.sql("""
INSERT INTO glue_catalog.customer_feedback_db.customer_feedback VALUES
('FB008', 'REP202504', 'CUST008', '松本潤', CAST('2025-04-25' AS DATE), 5, 'チュートリアル動画がわかりやすく、短時間で機能を理解できました。', 'PROD-A', 'app', false),
('FB009', 'REP202504', 'CUST009', '高橋真央', CAST('2025-04-26' AS DATE), 2, 'アプリが頻繁にクラッシュする問題を解決してほしいです。', 'PROD-C', 'support_call', false)
""")
print("新しいフィードバックを追加しました")

# テーブル履歴の確認（追加操作後）
print("追加操作後のテーブル履歴:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback.history").show(truncate=False)

# feedback_reports テーブルのデータを更新
spark.sql("""
UPDATE glue_catalog.customer_feedback_db.feedback_reports
SET total_feedback_count = 249, average_satisfaction = 4.1
WHERE report_id = 'REP202504'
""")
print("feedback_reports テーブルのデータを更新しました")

# 更新後のデータを確認
print("更新後の feedback_reports テーブル:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.feedback_reports WHERE report_id = 'REP202504'").show(truncate=False)

# フィードバックの削除
spark.sql("""
DELETE FROM glue_catalog.customer_feedback_db.customer_feedback
WHERE feedback_id = 'FB005'
""")
print("FB005のフィードバックを削除しました")

# 削除後のテーブル履歴を確認
print("削除操作後のテーブル履歴:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback.history").show(truncate=False)

In [None]:
# 現在のスナップショットを確認
print("customer_feedback テーブルのスナップショット情報:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback.snapshots").show(truncate=False)

# 最新のスナップショットIDを取得
latest_snapshot_id = spark.sql("""
SELECT snapshot_id FROM glue_catalog.customer_feedback_db.customer_feedback.snapshots 
ORDER BY committed_at DESC LIMIT 1
""").collect()[0][0]
print(f"最新のスナップショットID: {latest_snapshot_id}")

# 最新のスナップショットデータを確認
print("最新スナップショットのデータ:")
spark.sql(f"SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback VERSION AS OF {latest_snapshot_id}").show(truncate=False)

# 最初の操作の後のスナップショットIDを取得（通常は最古のスナップショット）
first_snapshot_id = spark.sql("""
SELECT snapshot_id FROM glue_catalog.customer_feedback_db.customer_feedback.snapshots 
ORDER BY committed_at ASC LIMIT 1
""").collect()[0][0]
print(f"最初のスナップショットID: {first_snapshot_id}")

# 最初のスナップショットデータを確認
print("最初のスナップショットデータ:")
spark.sql(f"SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback VERSION AS OF {first_snapshot_id}").show(truncate=False)

# 特定時点のデータとテーブルを比較
print("最初のスナップショットと現在のデータ量の比較:")
initial_count = spark.sql(f"SELECT COUNT(*) AS snapshot_rows FROM glue_catalog.customer_feedback_db.customer_feedback VERSION AS OF {first_snapshot_id}").collect()[0][0]
current_count = spark.sql("SELECT COUNT(*) AS current_rows FROM glue_catalog.customer_feedback_db.customer_feedback").collect()[0][0]
print(f"最初のスナップショット行数: {initial_count}, 現在の行数: {current_count}, 差分: {current_count - initial_count}")

In [None]:
# パーティション情報を確認
print("customer_feedback テーブルのパーティション情報:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback.partitions").show(truncate=False)

# 小さなファイルのコンパクション
print("小さなファイルを最適化します:")
spark.sql("CALL glue_catalog.system.rewrite_data_files(table => 'customer_feedback_db.customer_feedback')")

# ファイル情報を最適化前後で確認
print("ファイル最適化後の情報:")
spark.sql("SELECT file_path, file_size_in_bytes, record_count FROM glue_catalog.customer_feedback_db.customer_feedback.files").show(truncate=False)

# 特定日付のパーティションのみ取得
specific_date = '2025-04-15'
print(f"{specific_date}のフィードバックのみ取得:")
spark.sql(f"""
SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback 
WHERE feedback_date = CAST('{specific_date}' AS DATE)
""").show(truncate=False)

In [None]:
# 特定日付のパーティションのみ取得
specific_date = '2025-04-15'
print(f"{specific_date}のフィードバックのみ取得:")
spark.sql(f"""
SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback 
WHERE feedback_date = CAST('{specific_date}' AS DATE)
""").show(truncate=False)

# パーティション単位での統計情報を確認
print("日付別フィードバック数:")
spark.sql("""
SELECT feedback_date, COUNT(*) as feedback_count
FROM glue_catalog.customer_feedback_db.customer_feedback
GROUP BY feedback_date
ORDER BY feedback_date
""").show()

In [None]:
# テーブルプロパティの設定
print("テーブルプロパティを設定します:")
spark.sql("""
ALTER TABLE glue_catalog.customer_feedback_db.customer_feedback 
SET TBLPROPERTIES (
  'comment' = 'Contains customer feedback data partitioned by date',
  'write.format.default' = 'parquet',
  'write.parquet.compression-codec' = 'snappy',
  'read.split.target-size' = '134217728',
  'write.distribution-mode' = 'hash'
)
""")

# テーブルプロパティを確認
print("テーブルプロパティ:")
spark.sql("SHOW TBLPROPERTIES glue_catalog.customer_feedback_db.customer_feedback").show(truncate=False)

# テーブル構造の確認
print("テーブル構造:")
spark.sql("DESCRIBE TABLE glue_catalog.customer_feedback_db.customer_feedback").show(truncate=False)

# テーブル詳細情報の確認
print("テーブル詳細情報:")
spark.sql("DESCRIBE TABLE EXTENDED glue_catalog.customer_feedback_db.customer_feedback").show(truncate=False)

In [None]:
# 製品別の評価平均とフィードバック数を分析
print("製品別の評価分析:")
spark.sql("""
SELECT 
    cf.product_id,
    COUNT(*) as feedback_count,
    AVG(cf.rating) as avg_rating,
    MIN(cf.rating) as min_rating,
    MAX(cf.rating) as max_rating,
    COUNT(CASE WHEN cf.rating >= 4 THEN 1 END) as positive_count,
    COUNT(CASE WHEN cf.rating <= 2 THEN 1 END) as negative_count
FROM glue_catalog.customer_feedback_db.customer_feedback cf
GROUP BY cf.product_id
ORDER BY avg_rating DESC
""").show()

# 月ごとの満足度傾向分析
print("月ごとの満足度傾向:")
spark.sql("""
SELECT 
    fr.report_id,
    DATE_FORMAT(fr.period_start, 'yyyy-MM') as month,
    fr.average_satisfaction,
    fr.nps_score,
    fr.total_feedback_count
FROM glue_catalog.customer_feedback_db.feedback_reports fr
ORDER BY fr.period_start
""").show()

# 改善カテゴリごとのアクションアイテムステータス分布
print("改善カテゴリごとのアクションアイテムステータス:")
spark.sql("""
SELECT 
    ip.category,
    COUNT(*) as total_actions,
    COUNT(CASE WHEN ai.status = 'planned' THEN 1 END) as planned_count,
    COUNT(CASE WHEN ai.status = 'in_progress' THEN 1 END) as in_progress_count,
    COUNT(CASE WHEN ai.status = 'completed' THEN 1 END) as completed_count
FROM glue_catalog.customer_feedback_db.improvement_points ip
JOIN glue_catalog.customer_feedback_db.action_items ai ON ip.point_id = ai.related_point_id
GROUP BY ip.category
ORDER BY total_actions DESC
""").show()

# 低評価フィードバックと関連する改善ポイントの分析
print("低評価フィードバックと関連する改善ポイント:")
spark.sql("""
WITH low_ratings AS (
    SELECT 
        cf.feedback_id,
        cf.product_id,
        cf.feedback_text,
        cf.rating
    FROM glue_catalog.customer_feedback_db.customer_feedback cf
    WHERE cf.rating <= 3
)
SELECT 
    lr.product_id,
    COUNT(*) as low_rating_count,
    ip.category,
    ip.description,
    ip.mention_count
FROM low_ratings lr
JOIN glue_catalog.customer_feedback_db.customer_feedback cf ON lr.product_id = cf.product_id
JOIN glue_catalog.customer_feedback_db.feedback_reports fr ON cf.report_id = fr.report_id
JOIN glue_catalog.customer_feedback_db.improvement_points ip ON fr.report_id = ip.report_id
GROUP BY lr.product_id, ip.category, ip.description, ip.mention_count
ORDER BY low_rating_count DESC, ip.mention_count DESC
""").show(truncate=False)

In [None]:
# 日付ごとの評価平均の推移
print("日付ごとの評価平均の推移:")
spark.sql("""
SELECT 
    feedback_date,
    COUNT(*) as count,
    AVG(rating) as avg_rating
FROM glue_catalog.customer_feedback_db.customer_feedback
GROUP BY feedback_date
ORDER BY feedback_date
""").show()

# 製品ごとの評価スコア推移
print("製品ごとの評価スコア推移:")
spark.sql("""
SELECT 
    pm.product_id,
    fr.period_start,
    pm.metric_type,
    pm.score,
    pm.previous_score,
    pm.trend
FROM glue_catalog.customer_feedback_db.product_metrics pm
JOIN glue_catalog.customer_feedback_db.feedback_reports fr ON pm.report_id = fr.report_id
WHERE pm.metric_type = 'satisfaction'
ORDER BY pm.product_id, fr.period_start
""").show()

# カテゴリごとのポジティブ・ネガティブ比率の推移
print("カテゴリごとのポジティブ・ネガティブ比率の推移:")
spark.sql("""
SELECT 
    fc.category_name,
    fr.period_start,
    fc.average_rating,
    fc.positive_percentage,
    fc.negative_percentage
FROM glue_catalog.customer_feedback_db.feedback_categories fc
JOIN glue_catalog.customer_feedback_db.feedback_reports fr ON fc.report_id = fr.report_id
ORDER BY fc.category_name, fr.period_start
""").show()

In [None]:
# customer_feedback テーブルにカラムを追加
print("テーブルにカラムを追加します:")
spark.sql("""
ALTER TABLE glue_catalog.customer_feedback_db.customer_feedback
ADD COLUMNS (
    sentiment_score FLOAT COMMENT '感情分析スコア (-1.0〜1.0)',
    tags ARRAY<STRING> COMMENT 'フィードバックに関連するタグ'
)
""")

# 新しいカラムの確認
print("カラム追加後のテーブル構造:")
spark.sql("DESCRIBE TABLE glue_catalog.customer_feedback_db.customer_feedback").show(truncate=False)

# 既存データに新しいカラムの値を更新
print("既存データの新カラムを更新します:")
spark.sql("""
UPDATE glue_catalog.customer_feedback_db.customer_feedback
SET 
    sentiment_score = CASE 
        WHEN rating >= 4 THEN rating / 5.0 * 0.8 
        WHEN rating = 3 THEN 0.0
        ELSE (rating - 3) / 5.0 * 0.8
    END,
    tags = CASE
        WHEN product_id = 'PROD-A' THEN ARRAY('分析機能', 'データ可視化')
        WHEN product_id = 'PROD-B' THEN ARRAY('サポート', '技術')
        WHEN product_id = 'PROD-C' THEN ARRAY('モバイル', 'パフォーマンス')
        ELSE ARRAY('一般')
    END
""")

# 更新されたデータを確認
print("更新後のデータ:")
spark.sql("SELECT feedback_id, customer_name, rating, sentiment_score, tags FROM glue_catalog.customer_feedback_db.customer_feedback").show(truncate=False)

# 追加した列を使った集計クエリ
print("感情スコアの範囲別フィードバック数:")
spark.sql("""
SELECT
    CASE
        WHEN sentiment_score > 0.5 THEN '非常にポジティブ'
        WHEN sentiment_score > 0.0 THEN 'ポジティブ'
        WHEN sentiment_score = 0.0 THEN '中立'
        WHEN sentiment_score > -0.5 THEN 'ネガティブ'
        ELSE '非常にネガティブ'
    END AS sentiment_category,
    COUNT(*) as count
FROM glue_catalog.customer_feedback_db.customer_feedback
GROUP BY 
    CASE
        WHEN sentiment_score > 0.5 THEN '非常にポジティブ'
        WHEN sentiment_score > 0.0 THEN 'ポジティブ'
        WHEN sentiment_score = 0.0 THEN '中立'
        WHEN sentiment_score > -0.5 THEN 'ネガティブ'
        ELSE '非常にネガティブ'
    END
ORDER BY sentiment_category
""").show()

In [None]:
# テーブルのデータファイル情報を確認
print("customer_feedback テーブルのファイル情報:")
spark.sql("SELECT * FROM glue_catalog.customer_feedback_db.customer_feedback.files").show(truncate=False)

# 期限切れスナップショットの削除
print("期限切れスナップショットの有効期限を3日に設定:")
spark.sql("""
ALTER TABLE glue_catalog.customer_feedback_db.customer_feedback
SET TBLPROPERTIES (
    'history.expire.min-snapshots-to-keep' = '2',
    'history.expire.max-snapshot-age-ms' = '259200000'
)
""")

# 期限切れスナップショットのクリーンアップを実行
print("期限切れスナップショットをクリーンアップします:")
spark.sql("CALL glue_catalog.system.expire_snapshots(table => 'customer_feedback_db.customer_feedback')")

# データファイルを再構成してテーブルを最適化
print("データファイルを再構成します:")
spark.sql("CALL glue_catalog.system.rewrite_data_files(table => 'customer_feedback_db.customer_feedback', options => map('min-input-files','2'))")

# ファイルの最適化とコンパクションのステータスを確認
print("最適化後のファイル情報:")
spark.sql("""
SELECT 
    partition, 
    COUNT(*) as file_count, 
    SUM(file_size_in_bytes) as total_size, 
    MIN(file_size_in_bytes) as min_size, 
    MAX(file_size_in_bytes) as max_size,
    AVG(file_size_in_bytes) as avg_size
FROM glue_catalog.customer_feedback_db.customer_feedback.files
GROUP BY partition
""").show(truncate=False)

In [None]:
# 最新のスナップショットIDを取得
latest_snapshot_id = spark.sql("SELECT snapshot_id FROM glue_catalog.customer_feedback_db.customer_feedback.snapshots ORDER BY committed_at DESC LIMIT 1").collect()[0][0]
print(f"最新のスナップショットID: {latest_snapshot_id}")

# フィードバックデータをエクスポート
print("フィードバックデータをエクスポートします:")
feedback_export_df = spark.sql(f"""
SELECT 
    cf.feedback_id,
    cf.customer_id,
    cf.customer_name,
    cf.feedback_date,
    cf.rating,
    cf.feedback_text,
    cf.product_id,
    cf.sentiment_score,
    cf.tags,
    p.pdf_path
FROM glue_catalog.customer_feedback_db.customer_feedback FOR VERSION AS OF {latest_snapshot_id} cf
JOIN glue_catalog.customer_feedback_db.feedback_reports p ON cf.report_id = p.report_id
""")

# もし上記も動作しない場合は、以下の方法を試してください
# 方法2: スナップショットID指定の別の構文
if not feedback_export_df:
    feedback_export_df = spark.sql(f"""
    SELECT 
        cf.feedback_id,
        cf.customer_id,
        cf.customer_name,
        cf.feedback_date,
        cf.rating,
        cf.feedback_text,
        cf.product_id,
        cf.sentiment_score,
        cf.tags,
        p.pdf_path
    FROM glue_catalog.customer_feedback_db.customer_feedback/*$snapshot={latest_snapshot_id}*/ cf
    JOIN glue_catalog.customer_feedback_db.feedback_reports p ON cf.report_id = p.report_id
    """)

# 方法3: スナップショットIDを指定せずに最新データを使用
if not feedback_export_df:
    feedback_export_df = spark.sql("""
    SELECT 
        cf.feedback_id,
        cf.customer_id,
        cf.customer_name,
        cf.feedback_date,
        cf.rating,
        cf.feedback_text,
        cf.product_id,
        cf.sentiment_score,
        cf.tags,
        p.pdf_path
    FROM glue_catalog.customer_feedback_db.customer_feedback cf
    JOIN glue_catalog.customer_feedback_db.feedback_reports p ON cf.report_id = p.report_id
    """)

# 結果をParquetファイルとして出力
export_path = "s3://xxxxxxxxxxxxxxxxx/rag_data/customer_feedback/"
feedback_export_df.coalesce(1).write.mode("overwrite").parquet(export_path)
print(f"フィードバックデータを {export_path} にエクスポートしました")