In [1]:
# 简化版BigQuery查询工具
# 只执行一个特定查询并保存为CSV

import csv
import os
from datetime import datetime
from google.cloud import bigquery
from tqdm import tqdm
from auth import BigQueryAuth

print("模块导入成功")


模块导入成功


In [2]:
# 定义查询执行器类
class QueryRunner:
    def __init__(self):
        self.auth = BigQueryAuth()
        self.client = None
    
    def setup(self):
        """设置认证和BigQuery客户端"""
        if self.auth.authenticate():
            self.client = self.auth.get_client()
            return True
        return False
    
    def execute_query_and_save(self, sql_query, folder, filename_pattern, legacy=False):
        """执行查询并将结果保存到CSV文件"""
        if not self.client:
            raise Exception("客户端未初始化，请先运行setup()")
        
        # 确保文件夹存在
        os.makedirs(folder, exist_ok=True)
        
        # 配置作业以使用legacy SQL（如果需要）
        job_config = bigquery.QueryJobConfig(use_legacy_sql=legacy)
        
        # 执行查询
        print(f"执行查询: {sql_query[:100]}...")
        query_job = self.client.query(sql_query, job_config=job_config)
        
        # 等待查询完成
        results = query_job.result()
        print("查询完成，正在保存结果...")
        
        # 生成带时间戳的文件名
        current_date = datetime.now()
        month = current_date.strftime("%m")  # 两位数月份
        year = current_date.strftime("%Y")   # 四位数年份
        
        # 标准化所有文件名为yyyymm_name格式
        if "yyyymm" in filename_pattern:
            filename = f"{filename_pattern.replace('yyyymm', f'{year}{month}')}.csv"
        else:
            # 清理名称中的日期模式
            clean_name = filename_pattern.replace('mm_yyyy', '').replace('yyyy_mm', '')
            clean_name = clean_name.replace('mm', '').replace('yyyy', '')
            clean_name = clean_name.strip('_')
            filename = f"{year}{month}_{clean_name}.csv"
            
        filepath = os.path.join(folder, filename)
        
        # 提取字段名（列标题）
        field_names = [field.name for field in results.schema]
        
        # 直接写入CSV
        with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            # 写入标题
            writer.writerow(field_names)
            
            # 使用进度条写入数据行
            row_count = 0
            for row in tqdm(results, desc=f"写入 {filename}", unit="行"):
                # 将行转换为值列表
                row_values = [row[field] for field in field_names]
                writer.writerow(row_values)
                row_count += 1
        
        print(f"已成功保存 {row_count} 行数据到 {filepath}")
        return filepath


In [3]:
# 执行特定查询

# 初始化查询执行器
runner = QueryRunner()

# 设置认证
print("正在设置认证...")
if not runner.setup():
    print("设置失败。请运行 'gcloud auth application-default login'")
else:
    print("认证成功！")
    
    # 创建输出目录
    output_dir = "output"
    os.makedirs(output_dir, exist_ok=True)
    
    # 设置查询参数
    query = "SELECT * FROM [wego-cloud:appannie.appannie_tableau_report_updated_2019_03_05]"
    folder_path = os.path.join(output_dir, "market_share")
    filename_pattern = "yyyymm_appannie"
    legacy = True
    
    try:
        # 执行查询并保存结果
        filepath = runner.execute_query_and_save(
            sql_query=query,
            folder=folder_path,
            filename_pattern=filename_pattern,
            legacy=legacy
        )
        print(f"查询成功完成！结果已保存到: {filepath}")
    except Exception as e:
        print(f"执行查询时出错: {e}")


正在设置认证...
认证成功！
执行查询: SELECT * FROM [wego-cloud:appannie.appannie_tableau_report_updated_2019_03_05]...
查询完成，正在保存结果...


写入 202507_appannie.csv: 458450行 [00:55, 8251.50行/s] 

已成功保存 458450 行数据到 output\market_share\202507_appannie.csv
查询成功完成！结果已保存到: output\market_share\202507_appannie.csv





In [4]:
# 定义查询执行器类
class QueryRunner:
    def __init__(self):
        self.auth = BigQueryAuth()
        self.client = None
    
    def setup(self):
        """设置认证和BigQuery客户端"""
        if self.auth.authenticate():
            self.client = self.auth.get_client()
            return True
        return False
    
    def execute_query_and_save(self, sql_query, folder, filename_pattern, legacy=False):
        """执行查询并将结果保存到CSV文件"""
        if not self.client:
            raise Exception("客户端未初始化，请先运行setup()")
        
        # 确保文件夹存在
        os.makedirs(folder, exist_ok=True)
        
        # 配置作业以使用legacy SQL（如果需要）
        job_config = bigquery.QueryJobConfig(use_legacy_sql=legacy)
        
        # 执行查询
        print(f"执行查询: {sql_query[:100]}...")
        query_job = self.client.query(sql_query, job_config=job_config)
        
        # 等待查询完成
        results = query_job.result()
        print("查询完成，正在保存结果...")
        
        # 生成带时间戳的文件名
        current_date = datetime.now()
        month = current_date.strftime("%m")  # 两位数月份
        year = current_date.strftime("%Y")   # 四位数年份
        
        # 标准化所有文件名为yyyymm_name格式
        if "yyyymm" in filename_pattern:
            filename = f"{filename_pattern.replace('yyyymm', f'{year}{month}')}.csv"
        else:
            # 清理名称中的日期模式
            clean_name = filename_pattern.replace('mm_yyyy', '').replace('yyyy_mm', '')
            clean_name = clean_name.replace('mm', '').replace('yyyy', '')
            clean_name = clean_name.strip('_')
            filename = f"{year}{month}_{clean_name}.csv"
            
        filepath = os.path.join(folder, filename)
        
        # 提取字段名（列标题）
        field_names = [field.name for field in results.schema]
        
        # 直接写入CSV
        with open(filepath, 'w', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            # 写入标题
            writer.writerow(field_names)
            
            # 使用进度条写入数据行
            row_count = 0
            for row in tqdm(results, desc=f"写入 {filename}", unit="行"):
                # 将行转换为值列表
                row_values = [row[field] for field in field_names]
                writer.writerow(row_values)
                row_count += 1
        
        print(f"已成功保存 {row_count} 行数据到 {filepath}")
        return filepath




In [5]:
# 执行特定查询

# 初始化查询执行器
runner = QueryRunner()

# 设置认证
print("正在设置认证...")
if not runner.setup():
    print("设置失败。请运行 'gcloud auth application-default login'")
else:
    print("认证成功！")
    
    # 创建输出目录
    output_dir = "output"
    os.makedirs(output_dir, exist_ok=True)
    
    # 设置查询参数
    query = "SELECT * FROM [wego-cloud:appannie.appannie_tableau_report_updated_2019_03_05]"
    folder_path = os.path.join(output_dir, "market_share")
    filename_pattern = "yyyymm_appannie"
    legacy = True
    
    try:
        # 执行查询并保存结果
        filepath = runner.execute_query_and_save(
            sql_query=query,
            folder=folder_path,
            filename_pattern=filename_pattern,
            legacy=legacy
        )
        print(f"查询成功完成！结果已保存到: {filepath}")
    except Exception as e:
        print(f"执行查询时出错: {e}")

正在设置认证...
认证成功！
执行查询: SELECT * FROM [wego-cloud:appannie.appannie_tableau_report_updated_2019_03_05]...
查询完成，正在保存结果...


写入 202507_appannie.csv: 458450行 [00:57, 8042.68行/s] 

已成功保存 458450 行数据到 output\market_share\202507_appannie.csv
查询成功完成！结果已保存到: output\market_share\202507_appannie.csv





In [10]:
# 测试App Annie App Ratings并使用正确的日期格式

# 导入SQL处理模块
import sys
from sql_processor import load_query_template, get_date_params_by_template

# 测试App Annie App Ratings
query_config = {
    "name": "App Annie App Ratings",
    "template": "appannie_app_ratings.sql",
    "filename_pattern": "yyyymm_appannie_app_ratings",
    "legacy": False,
    "folder": "key_brands"
}

# 确保runner已经设置
if not hasattr(runner, 'client') or runner.client is None:
    print("重新设置认证...")
    if not runner.setup():
        print("设置失败。请运行 'gcloud auth application-default login'")
        sys.exit(1)
    print("认证成功！")

try:
    print(f"\n开始执行 {query_config['name']} 查询...")
    
    # 加载模板
    template_path = query_config.get('template')
    
    # 获取适合该模板的日期参数（App Annie需要无横杠格式）
    params = get_date_params_by_template(template_path)
    print(f"根据模板类型计算出的日期范围: {params['start_date']} 到 {params['end_date']}")
    
    # 显示模板文件名
    print(f"加载模板文件: {template_path}")
    
    # 加载并替换模板
    query = load_query_template(template_path, params)
    if query is None:
        print(f"加载模板 {template_path} 失败")
    else:
        print(f"成功替换模板中的参数: {params}")
        
        # 打印生成的SQL查询
        print("\n--- 生成的SQL查询 ---")
        print(query)
        print("--- 查询结束 ---\n")
        
        # 设置输出文件夹
        folder_path = os.path.join(output_dir, query_config['folder'])
        
        # 执行查询并保存结果
        filepath = runner.execute_query_and_save(
            sql_query=query,
            folder=folder_path,
            filename_pattern=query_config['filename_pattern'],
            legacy=query_config['legacy']
        )
        
        print(f"{query_config['name']} 查询成功完成！结果已保存到: {filepath}")
except Exception as e:
    print(f"执行 {query_config['name']} 查询时出错: {str(e)}")



开始执行 App Annie App Ratings 查询...
根据模板类型计算出的日期范围: 20230601 到 20250601
加载模板文件: appannie_app_ratings.sql
成功替换模板中的参数: {'start_date': '20230601', 'end_date': '20250601'}

--- 生成的SQL查询 ---
-- App Annie App Ratings SQL Template
-- Replace the date values when pasting your SQL from Google Sheets:
-- WHERE month BETWEEN "20230601" AND "20250601"

-- Paste your SQL content below this line: 


SELECT	
date,	
store,	
app_id,	
country_code,	
brand,	
CASE rating	
WHEN '1' THEN 'One'	
WHEN '2' THEN 'Two'	
WHEN '3' THEN 'Three'	
WHEN '4' THEN 'Four'	
WHEN '5' THEN 'Five'	
END AS rating,	
total_count	
FROM (	
SELECT	
date,	
store,	
app_id,	
country_code,	
brand,	
one AS total_count, '1' AS rating	
	
FROM `wego-cloud.appannie.app_ratings_over_time*`	
WHERE _TABLE_SUFFIX BETWEEN "20230601" AND "20250601"
UNION ALL	
	
SELECT	
date,	
store,	
app_id,	
country_code,	
brand,	
two AS total_count,	
'2' AS rating	
FROM `wego-cloud.appannie.app_ratings_over_time*`	
WHERE _TABLE_SUFFIX BETWEEN "20230601" AND "202

写入 202507_appannie_app_ratings.csv: 49260行 [00:06, 8036.02行/s] 

已成功保存 49260 行数据到 output\key_brands\202507_appannie_app_ratings.csv
App Annie App Ratings 查询成功完成！结果已保存到: output\key_brands\202507_appannie_app_ratings.csv





In [7]:
# 测试Web Traffic查询并使用正确的日期格式

# 导入SQL处理模块
import sys
from sql_processor import load_query_template, get_date_params_by_template

# 测试Web Traffic查询
query_config = {
    "name": "Web Traffic",
    "template": "web_traffic.sql",
    "filename_pattern": "yyyymm_web_traffic",
    "legacy": False,
    "folder": "key_brands"
}

# 确保runner已经设置
if not hasattr(runner, 'client') or runner.client is None:
    print("重新设置认证...")
    if not runner.setup():
        print("设置失败。请运行 'gcloud auth application-default login'")
        sys.exit(1)
    print("认证成功！")

try:
    print(f"\n开始执行 {query_config['name']} 查询...")
    
    # 加载模板
    template_path = query_config.get('template')
    
    # 获取适合该模板的日期参数（Web Traffic需要带横杠格式）
    params = get_date_params_by_template(template_path)
    print(f"根据模板类型计算出的日期范围: {params['start_date']} 到 {params['end_date']}")
    
    # 显示模板文件名
    print(f"加载模板文件: {template_path}")
    
    # 加载并替换模板
    query = load_query_template(template_path, params)
    if query is None:
        print(f"加载模板 {template_path} 失败")
    else:
        print(f"成功替换模板中的参数: {params}")
        
        # 打印生成的SQL查询
        print("\n--- 生成的SQL查询 ---")
        print(query)
        print("--- 查询结束 ---\n")
        
        # 设置输出文件夹
        folder_path = os.path.join(output_dir, query_config['folder'])
        
        # 执行查询并保存结果
        filepath = runner.execute_query_and_save(
            sql_query=query,
            folder=folder_path,
            filename_pattern=query_config['filename_pattern'],
            legacy=query_config['legacy']
        )
        
        print(f"{query_config['name']} 查询成功完成！结果已保存到: {filepath}")
except Exception as e:
    print(f"执行 {query_config['name']} 查询时出错: {str(e)}")



开始执行 Web Traffic 查询...
根据模板类型计算出的日期范围: 2023-06-01 到 2025-06-01
加载模板文件: web_traffic.sql
成功替换模板中的参数: {'start_date': '2023-06-01', 'end_date': '2025-06-01'}

--- 生成的SQL查询 ---
-- Web Traffic SQL Template
-- Replace the date values when pasting your SQL from Google Sheets:
-- WHERE month BETWEEN "2023-06-01" AND "2025-06-01"

SELECT * from SimilarWeb.traffic_data
WHERE month BETWEEN "2023-06-01" AND "2025-06-01"
AND company_type IN ('Meta','OTA')
AND company IN ('Agoda', 'Almosafer', 'Aviasales', 'Booking.com', 'Cleartrip', 'Expedia', 'HotelsCombined', 'Ixigo', 'Jetcost', 'Kayak', 'Kojaro', 'MakeMyTrip', 'Momondo', 'Neredekal', 'Rome2Rio', 'Safarmarket', 'Skyscanner', 'Swoodoo', 'Travelstart', 'TripAdvisor', 'Trivago', 'Turismocity', 'Wego')

--- 查询结束 ---

执行查询: -- Web Traffic SQL Template
-- Replace the date values when pasting your SQL from Google Sheets:
-- ...
查询完成，正在保存结果...


写入 202507_web_traffic.csv: 759530行 [01:47, 7080.74行/s] 

已成功保存 759530 行数据到 output\key_brands\202507_web_traffic.csv
Web Traffic 查询成功完成！结果已保存到: output\key_brands\202507_web_traffic.csv





In [11]:
import pandas as pd

df1 = pd.read_csv(r"C:\Users\gzh\Downloads\bq-results-20250711-032348-1752204240551.csv")
df2 = pd.read_csv(r"D:\GitHubProjects\GBQ-automation\output\key_brands\202507_appannie_app_ratings.csv")

if df1.equals(df2):
    print("数据完全一致")
else:
    print("数据不同")


数据完全一致
