# BigQuery Natural Language Analysis with Gemini

このノートブックでは、Gemini APIを使用してBigQueryのSalesforceデータを自然言語で分析します。

## 使用モデル（2025年1月最新）
- **Gemini 2.5 Pro** (gemini-2.5-pro-exp-0117): データ分析最適化モデル
  - 最大200万トークンの長大なコンテキスト処理
  - 高精度なSQL生成と複雑な推論
  - 大規模データセットの一括分析
  - RAG（Retrieval-Augmented Generation）対応
  
- フォールバックモデル:
  - **Gemini 2.5 Flash** (gemini-2.5-flash-exp-0117): 高速バランス版
  - **Gemini 2.0 Flash** (gemini-2.0-flash-exp-01-21): 超高速処理版

## 対象テーブル
- `esperanto-drawer-prod.dm_business_planning.salesforce_account_mart`
- `esperanto-drawer-prod.dm_business_planning.salesforce_opportunity_mart`

## API Key設定方法

### Google Colabの場合（推奨）
1. Colabノートブックの左メニューから「🔑」（シークレット）アイコンをクリック
2. 「新しいシークレットを追加」をクリック
3. 名前に `GEMINI_API_KEY` を入力
4. 値にGemini API Keyを貼り付け
5. 「ノートブックからのアクセスを許可」をONにする

### ローカル環境の場合
環境変数 `GEMINI_API_KEY` に設定するか、実行時に入力プロンプトが表示されます。

## 1. 環境設定とライブラリのインストール

In [None]:
## 1. 環境設定とライブラリのインストール

## 2. Google Cloud認証

In [1]:
from google.colab import auth
auth.authenticate_user()
print('認証完了')

ModuleNotFoundError: No module named 'google.colab'

## 3. API設定と初期化

In [2]:
import os
from google.cloud import bigquery
import google.generativeai as genai
import pandas as pd
import json
from datetime import datetime, timedelta
import plotly.express as px
import plotly.graph_objects as go
from typing import Dict, List, Optional

# Gemini API キーの設定
# Colabのシークレットから取得 (Colab環境の場合)
try:
    from google.colab import userdata
    GEMINI_API_KEY = userdata.get('GEMINI_API_KEY')
    print('Colabシークレットから API Key を取得しました')
except:
    # Colabシークレット以外の環境変数から取得
    GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
    if not GEMINI_API_KEY:
        # 環境変数にもない場合は手動入力
        from getpass import getpass
        GEMINI_API_KEY = getpass('Gemini API Key を入力してください: ')

genai.configure(api_key=GEMINI_API_KEY)

# BigQueryクライアントの初期化
PROJECT_ID = 'esperanto-drawer-prod'
client = bigquery.Client(project=PROJECT_ID)

# Geminiモデルの初期化
# gemini-2.5-pro-exp-0117: 2025年1月時点の最新プロモデル
# - 最大200万トークンのコンテキスト長
# - データ分析・SQL生成に最適
# - 複雑な推論と高精度な分析が可能
try:
    model = genai.GenerativeModel('gemini-2.5-pro-exp-0117')
    print('初期化完了')
    print(f'使用モデル: gemini-2.5-pro-exp-0117 (最新データ分析特化版)')
except:
    # フォールバック1: 2.5 Flash (高速性と精度のバランス)
    try:
        model = genai.GenerativeModel('gemini-2.5-flash-exp-0117')
        print('初期化完了')
        print(f'使用モデル: gemini-2.5-flash-exp-0117 (高速バランス版)')
    except:
        # フォールバック2: 2.0 Flash (さらに高速)
        model = genai.GenerativeModel('gemini-2.0-flash-exp-01-21')
        print('初期化完了')
        print(f'使用モデル: gemini-2.0-flash-exp-01-21 (超高速版)')

ModuleNotFoundError: No module named 'google.generativeai'

## 4. テーブルスキーマの取得

In [None]:
def get_table_schema(dataset_id: str, table_id: str) -> pd.DataFrame:
    """テーブルのスキーマ情報を取得"""
    # COLUMNS テーブルを使用（より標準的）
    query = f"""
    SELECT 
        column_name,
        data_type,
        IFNULL(description, '') as description
    FROM `{PROJECT_ID}.{dataset_id}.INFORMATION_SCHEMA.COLUMNS`
    WHERE table_name = '{table_id}'
    ORDER BY column_name
    """
    try:
        return client.query(query).to_dataframe()
    except Exception as e:
        print(f"COLUMNS使用時のエラー: {e}")
        # フォールバック: COLUMN_FIELD_PATHSを使用
        query_fallback = f"""
        SELECT 
            column_name,
            data_type,
            IFNULL(description, '') as description
        FROM `{PROJECT_ID}.{dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS`
        WHERE table_name = '{table_id}'
        ORDER BY column_name
        """
        return client.query(query_fallback).to_dataframe()

# スキーマ情報を取得
account_schema = get_table_schema('dm_business_planning', 'salesforce_account_mart')
opportunity_schema = get_table_schema('dm_business_planning', 'salesforce_opportunity_mart')

print("Account Mart カラム数:", len(account_schema))
print("Opportunity Mart カラム数:", len(opportunity_schema))

# 主要カラムを表示
print("\n=== Account Mart 主要カラム ===")
display(account_schema.head(10))

print("\n=== Opportunity Mart 主要カラム ===")
display(opportunity_schema.head(10))

## 5. 自然言語→SQL変換クラス

In [None]:
class NLToBigQueryAnalyzer:
    def __init__(self, client: bigquery.Client, model):
        self.client = client
        self.model = model
        self.account_schema = account_schema
        self.opportunity_schema = opportunity_schema
        
    def generate_sql(self, natural_language_query: str) -> str:
        """自然言語クエリからSQLを生成"""
        
        # スキーマ情報を文字列化
        account_schema_str = self.account_schema[['column_name', 'data_type', 'description']].to_string()
        opportunity_schema_str = self.opportunity_schema[['column_name', 'data_type', 'description']].to_string()
        
        prompt = f"""
        あなたはBigQueryのSQLエキスパートです。
        以下のテーブルスキーマを使用して、ユーザーの質問に答えるSQLクエリを生成してください。
        
        利用可能なテーブル:
        1. `esperanto-drawer-prod.dm_business_planning.salesforce_account_mart`
        スキーマ:
        {account_schema_str[:2000]}  # 最初の2000文字のみ
        
        2. `esperanto-drawer-prod.dm_business_planning.salesforce_opportunity_mart`
        スキーマ:
        {opportunity_schema_str[:2000]}  # 最初の2000文字のみ
        
        重要な注意事項:
        - 日付フィールドは TIMESTAMP 型です
        - 金額フィールドは Amount, AnnualRevenue などです
        - ステージ名は StageName フィールドです
        - 日本語のカラム説明を参考にしてください
        - LIMIT 1000 を必ず付けてください（大量データ防止）
        - JOINする場合は AccountId で結合してください
        
        ユーザーの質問: {natural_language_query}
        
        SQLクエリのみを返してください。説明は不要です。
        SQLは```sql と ``` で囲んでください。
        """
        
        response = self.model.generate_content(prompt)
        
        # SQLを抽出
        sql = response.text
        if '```sql' in sql:
            sql = sql.split('```sql')[1].split('```')[0].strip()
        elif '```' in sql:
            sql = sql.split('```')[1].split('```')[0].strip()
            
        return sql
    
    def execute_query(self, sql: str) -> pd.DataFrame:
        """SQLを実行してDataFrameを返す"""
        try:
            # ドライラン
            job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
            dry_run_job = self.client.query(sql, job_config=job_config)
            
            # 処理バイト数を確認
            print(f"処理予定バイト数: {dry_run_job.total_bytes_processed:,} bytes")
            print(f"処理予定GB: {dry_run_job.total_bytes_processed / 1e9:.2f} GB")
            
            # 実際に実行
            query_job = self.client.query(sql)
            df = query_job.to_dataframe()
            
            return df
            
        except Exception as e:
            print(f"エラー: {e}")
            return pd.DataFrame()
    
    def analyze(self, natural_language_query: str) -> Dict:
        """自然言語クエリを分析して結果を返す"""
        print(f"質問: {natural_language_query}")
        print("\nSQL生成中...")
        
        # SQL生成
        sql = self.generate_sql(natural_language_query)
        print(f"\n生成されたSQL:\n{sql}")
        
        # SQL実行
        print("\nSQL実行中...")
        df = self.execute_query(sql)
        
        if df.empty:
            return {
                'query': natural_language_query,
                'sql': sql,
                'data': df,
                'summary': 'データが取得できませんでした'
            }
        
        # 結果のサマリーを生成
        summary = self.generate_summary(natural_language_query, df)
        
        return {
            'query': natural_language_query,
            'sql': sql,
            'data': df,
            'summary': summary
        }
    
    def generate_summary(self, query: str, df: pd.DataFrame) -> str:
        """結果のサマリーを生成"""
        
        # DataFrameの情報を文字列化（最大100行）
        df_str = df.head(100).to_string()
        
        prompt = f"""
        以下のデータ分析結果を日本語で要約してください。
        
        元の質問: {query}
        
        結果データ（最大100行）:
        {df_str[:3000]}  # 最初の3000文字のみ
        
        データ統計:
        - 行数: {len(df)}
        - カラム数: {len(df.columns)}
        
        以下の観点で要約してください：
        1. 主要な発見・インサイト
        2. 数値的な傾向
        3. 特筆すべきポイント
        
        簡潔に3-5文で要約してください。
        """
        
        response = self.model.generate_content(prompt)
        return response.text

# アナライザーのインスタンス作成
analyzer = NLToBigQueryAnalyzer(client, model)
print("アナライザー初期化完了")

## 6. 分析実行関数

In [None]:
def run_analysis(question: str, show_chart: bool = True):
    """自然言語で分析を実行"""
    
    # 分析実行
    result = analyzer.analyze(question)
    
    # 結果表示
    print("\n" + "="*50)
    print("📊 分析結果")
    print("="*50)
    
    print(f"\n📝 サマリー:\n{result['summary']}")
    
    print(f"\n📈 データ（上位10行）:")
    display(result['data'].head(10))
    
    # グラフ表示（可能な場合）
    if show_chart and not result['data'].empty:
        df = result['data']
        
        # 数値カラムを特定
        numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns.tolist()
        
        if len(numeric_cols) > 0 and len(df) > 1:
            # 最初の数値カラムでグラフ作成
            if len(df) <= 20:  # 20行以下なら棒グラフ
                fig = px.bar(
                    df.head(20), 
                    y=numeric_cols[0],
                    x=df.index if df.index.name else range(len(df)),
                    title=f"{question}の結果"
                )
            else:  # それ以上なら線グラフ
                fig = px.line(
                    df.head(100),
                    y=numeric_cols[0],
                    x=df.index if df.index.name else range(len(df)),
                    title=f"{question}の結果"
                )
            
            fig.show()
    
    return result

print("分析関数の準備完了")

## 7. 分析例の実行

以下のセルで自然言語による分析を実行できます。

In [None]:
# 分析例1: 売上上位の企業
result1 = run_analysis("年間売上が最も高い上位10社を教えて")

In [None]:
# 分析例2: 商談の分析
result2 = run_analysis("2024年のClosedWonの商談金額の合計を月別に集計して")

In [None]:
# 分析例3: 業界別分析
result3 = run_analysis("業界別の企業数と平均従業員数を教えて")

In [None]:
# 分析例4: 商談パイプライン
result4 = run_analysis("現在進行中の商談をステージ別に金額集計して")

In [None]:
# 分析例5: 顧客セグメント分析
result5 = run_analysis("ENTフラグがTrueの企業の商談の勝率を計算して")

## 8. インタラクティブな分析

自由に質問を入力して分析できます。

In [None]:
# 自由に質問を入力
question = input("分析したい内容を自然言語で入力してください: ")
result = run_analysis(question)

## 9. 複数質問の一括分析

In [None]:
# 複数の質問を一括で分析
questions = [
    "MIDフラグがTrueの企業数は？",
    "2024年の新規商談数を月別に集計",
    "Company_FitがHighの企業の特徴は？",
    "商談の平均クローズ期間は？",
    "地域別の企業分布を教えて"
]

results = {}
for q in questions:
    print(f"\n{'='*60}")
    print(f"分析中: {q}")
    print('='*60)
    try:
        results[q] = run_analysis(q, show_chart=False)
    except Exception as e:
        print(f"エラー: {e}")
        results[q] = None

# 結果サマリー
print("\n" + "="*60)
print("📊 全体サマリー")
print("="*60)
for q, r in results.items():
    if r:
        print(f"\n【{q}】")
        print(r['summary'][:200] + "..." if len(r['summary']) > 200 else r['summary'])

## 10. 結果のエクスポート

In [None]:
# 分析結果をCSVとしてダウンロード
from google.colab import files
import io

def export_results(result_dict, filename="analysis_result.csv"):
    """分析結果をCSVファイルとしてエクスポート"""
    if result_dict and 'data' in result_dict and not result_dict['data'].empty:
        # DataFrameをCSVに変換
        result_dict['data'].to_csv(filename, index=False, encoding='utf-8-sig')
        print(f"✅ {filename} を作成しました")
        
        # ダウンロード
        files.download(filename)
    else:
        print("エクスポートするデータがありません")

# 最後の分析結果をエクスポート
if 'result' in locals():
    export_results(result, "last_analysis.csv")

## 11. よく使う分析テンプレート

In [None]:
# よく使う分析のテンプレート
analysis_templates = {
    "営業パフォーマンス": [
        "今月のClosedWon商談の合計金額は？",
        "営業担当者別の商談成約率を計算して",
        "平均商談期間をステージ別に集計"
    ],
    "顧客分析": [
        "従業員数1000人以上の大企業リストを表示",
        "過去1年間で最も取引金額が多い上位20社",
        "業界別の顧客分布と平均取引額"
    ],
    "パイプライン分析": [
        "現在のパイプライン総額をステージ別に表示",
        "今四半期にクローズ予定の商談リスト",
        "商談の勝率を金額レンジ別に分析"
    ],
    "トレンド分析": [
        "過去12ヶ月の新規商談数の推移",
        "月別の受注金額トレンド",
        "新規顧客獲得数の四半期別推移"
    ]
}

# テンプレート選択
print("利用可能なテンプレート:")
for i, category in enumerate(analysis_templates.keys(), 1):
    print(f"{i}. {category}")

# カテゴリ選択
category_num = int(input("\nカテゴリ番号を選択 (1-4): "))
selected_category = list(analysis_templates.keys())[category_num - 1]

print(f"\n{selected_category}の分析を実行します")
print("="*60)

# 選択したカテゴリの分析を実行
for question in analysis_templates[selected_category]:
    print(f"\n▶ {question}")
    try:
        result = run_analysis(question, show_chart=True)
    except Exception as e:
        print(f"エラー: {e}")

## 12. カスタムダッシュボード生成

In [None]:
import plotly.subplots as sp

def create_dashboard():
    """複数の分析結果を組み合わせたダッシュボードを作成"""
    
    # 4つの主要分析を実行
    analyses = {
        "月別売上": "2024年の月別受注金額を集計",
        "ステージ別商談": "現在の商談をステージ別に件数集計",
        "業界別顧客": "業界別の顧客数を上位10件",
        "商談勝率": "過去6ヶ月の月別商談勝率"
    }
    
    results = {}
    for title, query in analyses.items():
        try:
            print(f"分析中: {title}")
            results[title] = analyzer.analyze(query)
        except Exception as e:
            print(f"エラー ({title}): {e}")
            results[title] = None
    
    # ダッシュボード作成
    fig = sp.make_subplots(
        rows=2, cols=2,
        subplot_titles=list(analyses.keys()),
        specs=[
            [{'type': 'bar'}, {'type': 'bar'}],
            [{'type': 'bar'}, {'type': 'scatter'}]
        ]
    )
    
    # 各グラフを追加
    positions = [(1, 1), (1, 2), (2, 1), (2, 2)]
    
    for (title, result), (row, col) in zip(results.items(), positions):
        if result and not result['data'].empty:
            df = result['data'].head(20)
            
            # 最初のカラムをX軸、2番目の数値カラムをY軸として使用
            if len(df.columns) >= 2:
                x_col = df.columns[0]
                
                # 数値カラムを探す
                numeric_cols = df.select_dtypes(include=['int64', 'float64']).columns
                if len(numeric_cols) > 0:
                    y_col = numeric_cols[0]
                    
                    if row == 2 and col == 2:  # 最後は線グラフ
                        fig.add_trace(
                            go.Scatter(
                                x=df[x_col].astype(str),
                                y=df[y_col],
                                mode='lines+markers',
                                name=title
                            ),
                            row=row, col=col
                        )
                    else:  # その他は棒グラフ
                        fig.add_trace(
                            go.Bar(
                                x=df[x_col].astype(str),
                                y=df[y_col],
                                name=title
                            ),
                            row=row, col=col
                        )
    
    # レイアウト設定
    fig.update_layout(
        height=800,
        showlegend=False,
        title_text="Salesforceデータ分析ダッシュボード",
        title_font_size=20
    )
    
    fig.show()
    
    return results

# ダッシュボード作成
dashboard_results = create_dashboard()

## 13. 分析履歴の保存

In [None]:
# 分析履歴を保存
import pickle
from datetime import datetime

# 分析履歴クラス
class AnalysisHistory:
    def __init__(self):
        self.history = []
    
    def add(self, query, sql, result_shape, summary):
        self.history.append({
            'timestamp': datetime.now(),
            'query': query,
            'sql': sql,
            'result_shape': result_shape,
            'summary': summary
        })
    
    def show_history(self, last_n=10):
        """最近の分析履歴を表示"""
        for item in self.history[-last_n:]:
            print(f"\n📅 {item['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"❓ 質問: {item['query']}")
            print(f"📊 結果: {item['result_shape']}")
            print(f"📝 サマリー: {item['summary'][:100]}...")
    
    def save(self, filename='analysis_history.pkl'):
        with open(filename, 'wb') as f:
            pickle.dump(self.history, f)
        print(f"履歴を {filename} に保存しました")
    
    def load(self, filename='analysis_history.pkl'):
        try:
            with open(filename, 'rb') as f:
                self.history = pickle.load(f)
            print(f"履歴を {filename} から読み込みました")
        except FileNotFoundError:
            print("履歴ファイルが見つかりません")

# 履歴インスタンス作成
history = AnalysisHistory()

# これまでの分析を履歴に追加（もし結果があれば）
if 'results' in locals():
    for q, r in results.items():
        if r:
            history.add(
                query=r['query'],
                sql=r['sql'],
                result_shape=str(r['data'].shape) if 'data' in r else 'N/A',
                summary=r.get('summary', 'N/A')
            )

# 履歴表示
print("=== 分析履歴 ===")
history.show_history()

# 履歴保存
history.save()

## まとめ

このノートブックでは、以下の機能を提供しています：

1. **自然言語でのクエリ**: 日本語でデータに関する質問をするだけで分析可能
2. **自動SQL生成**: Gemini APIがスキーマを理解してSQLを生成
3. **結果の可視化**: Plotlyによる自動グラフ生成
4. **サマリー生成**: AIによる分析結果の要約
5. **ダッシュボード**: 複数分析の統合表示
6. **履歴管理**: 分析履歴の保存と参照

### 使い方のヒント

- 具体的な数値や期間を含めると、より正確な分析ができます
- 「上位10件」「月別」「業界別」などの集計キーワードを使うと効果的です
- 複雑な分析は段階的に実行することをお勧めします

### 注意事項

- 大量データのクエリはコストがかかるため、LIMIT句が自動追加されます
- Gemini APIの利用制限に注意してください
- 機密情報の取り扱いには十分注意してください