In [None]:
!pip install schedule pytz google-cloud-bigquery

Collecting schedule
  Downloading schedule-1.2.2-py3-none-any.whl.metadata (3.8 kB)
Downloading schedule-1.2.2-py3-none-any.whl (12 kB)
Installing collected packages: schedule
Successfully installed schedule-1.2.2


In [None]:
import requests
import pandas as pd
from urllib.parse import quote
import time
import datetime
import pytz
import schedule
from google.cloud import bigquery
from google.oauth2 import service_account
from google.colab import userdata


# API 키 설정 (본인의 인증키로 변경 필요)
service_key = userdata.get('SEOUL_API_KEY')
encoded_key = quote(service_key)

# BigQuery 설정
# 서비스 계정 키 파일 경로 설정
credentials_path = "/content/bike-analysis-project-452711-b1805df1f871.json"
credentials = service_account.Credentials.from_service_account_file(credentials_path)

# BigQuery 클라이언트 생성
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# BigQuery 데이터셋 및 테이블 정보
dataset_id = "bike_analysis_project"  # 예: "bicycle_data"
table_id = "seoul_bike_stations"  # 테이블 이름

# 전체 테이블 ID
table_full_id = f"{credentials.project_id}.{dataset_id}.{table_id}"

In [None]:
# 데이터셋 생성 함수
def create_dataset_if_not_exists():
    try:
        # 데이터셋 참조 생성
        dataset_ref = client.dataset(dataset_id)

        # 데이터셋이 존재하는지 확인
        try:
            client.get_dataset(dataset_ref)
            print(f"데이터셋 {dataset_id}가 이미 존재합니다.")
        except Exception:
            # 데이터셋이 없으면 생성
            dataset = bigquery.Dataset(dataset_ref)
            dataset.location = "asia-northeast3"  # 한국 리전으로 설정
            dataset = client.create_dataset(dataset)
            print(f"데이터셋 {dataset_id}가 생성되었습니다.")
    except Exception as e:
        print(f"데이터셋 생성 오류: {e}")

# 현재 시간을 한국 시간으로 가져오는 함수
def get_korea_time():
    # 한국 시간대로 현재 시간 가져오기
    korea_tz = pytz.timezone('Asia/Seoul')
    return datetime.datetime.now(korea_tz)

def collect_and_upload_bike_data():
    # 한국 시간으로 현재 시간 가져오기
    korea_tz = pytz.timezone('Asia/Seoul')
    current_time = datetime.datetime.now(korea_tz)
    print(f"데이터 수집 시작: {current_time}")

    # 데이터를 저장할 리스트
    bike_stations = []

    # 범위 설정 (1~1000, 1001~2000, 2001~3000)
    ranges = [(1, 1000), (1001, 2000), (2001, 3000)]

    # 각 범위마다 API 호출하여 데이터 수집
    for start_index, end_index in ranges:
        # API URL 설정
        url = f"http://openapi.seoul.go.kr:8088/{encoded_key}/json/bikeList/{start_index}/{end_index}"

        # 재시도 메커니즘 추가
        max_retries = 3
        retry_delay = 5  # 초

        for retry in range(max_retries):
            try:
                # API 요청 및 응답 받기 (타임아웃 설정)
                response = requests.get(url, timeout=30)
                result = response.json()

                # 응답에서 정류소 데이터 추출
                if 'rentBikeStatus' in result and 'row' in result['rentBikeStatus']:
                    bike_stations.extend(result['rentBikeStatus']['row'])
                    print(f"{start_index}~{end_index} 구간 데이터 {len(result['rentBikeStatus']['row'])}개 수집 완료")
                else:
                    print(f"{start_index}~{end_index} 구간 데이터 없음 또는 오류 발생")

                # 성공했으면 재시도 루프 종료
                break

            except Exception as e:
                if retry < max_retries - 1:
                    print(f"API 요청 오류, {retry_delay}초 후 재시도 ({retry+1}/{max_retries}): {e}")
                    time.sleep(retry_delay)
                else:
                    print(f"최대 재시도 횟수 초과, 이 구간 건너뜀: {e}")

        # 연속 호출 사이에 지연 추가
        time.sleep(1)

    # 판다스 데이터프레임으로 변환
    if bike_stations:
        try:
            df_bikes = pd.DataFrame(bike_stations)

            # 방법 1: 한국 시간을 문자열로 저장 (KST 시간대 표시 포함)
            # 시간 포맷: YYYY-MM-DD HH:MM:SS+09:00
            df_bikes['collection_time_str'] = current_time.strftime('%Y-%m-%d %H:%M:%S+09:00')

            # 방법 2: 한국 시간을 UTC로 변환하여 저장하고, kst_hour 컬럼 추가
            # UTC 시간 저장 (BigQuery는 내부적으로 UTC로 저장)
            df_bikes['collection_time'] = current_time

            # 한국 시간 시간/분/초를 별도 컬럼으로 저장 (쿼리할 때 유용)
            df_bikes['collection_hour_kst'] = current_time.hour
            df_bikes['collection_minute'] = current_time.minute
            df_bikes['collection_date'] = current_time.strftime('%Y-%m-%d')

            # 결과 확인
            print(f"수집된 정류소 수: {len(df_bikes)}")

            # BigQuery에 데이터 업로드
            upload_to_bigquery(df_bikes)

            return df_bikes
        except Exception as e:
            print(f"데이터프레임 생성 오류: {e}")
            return None
    else:
        print("수집된 데이터가 없습니다.")
        return None

# BigQuery 업로드 함수
def upload_to_bigquery(df):
    try:
        # 테이블이 존재하는지 확인
        try:
            client.get_table(table_full_id)
            table_exists = True
        except Exception:
            table_exists = False

        # 테이블이 없으면 생성
        if not table_exists:
            print(f"테이블 {table_full_id}가 존재하지 않습니다. 새로 생성합니다.")

            # 스키마 설정 - 한국 시간 관련 필드 추가
            schema = [
                bigquery.SchemaField("stationId", "STRING"),
                bigquery.SchemaField("stationName", "STRING"),
                bigquery.SchemaField("parkingBikeTotCnt", "INTEGER"),
                bigquery.SchemaField("rackTotCnt", "INTEGER"),
                bigquery.SchemaField("shared", "INTEGER"),
                bigquery.SchemaField("stationLatitude", "FLOAT"),
                bigquery.SchemaField("stationLongitude", "FLOAT"),
                bigquery.SchemaField("collection_time", "TIMESTAMP"),  # UTC 기준 시간
                bigquery.SchemaField("collection_time_str", "STRING"),  # 문자열 형태의 KST 시간
                bigquery.SchemaField("collection_hour_kst", "INTEGER"),  # KST 시간
                bigquery.SchemaField("collection_minute", "INTEGER"),  # 분
                bigquery.SchemaField("collection_date", "STRING")  # YYYY-MM-DD 형식의 날짜
            ]

            # 테이블 생성
            table = bigquery.Table(table_full_id, schema=schema)
            table = client.create_table(table)
            print(f"테이블 {table_full_id}가 생성되었습니다.")

        # 데이터 타입 변환 (BigQuery와 호환되도록)
        df['parkingBikeTotCnt'] = df['parkingBikeTotCnt'].astype(int)
        df['rackTotCnt'] = df['rackTotCnt'].astype(int)
        df['shared'] = df['shared'].astype(int)
        df['stationLatitude'] = df['stationLatitude'].astype(float)
        df['stationLongitude'] = df['stationLongitude'].astype(float)
        df['collection_hour_kst'] = df['collection_hour_kst'].astype(int)
        df['collection_minute'] = df['collection_minute'].astype(int)

        # 데이터 업로드 설정
        job_config = bigquery.LoadJobConfig(
            schema=client.get_table(table_full_id).schema,
            write_disposition="WRITE_APPEND"  # 기존 데이터에 추가
        )

        # 데이터 업로드
        job = client.load_table_from_dataframe(df, table_full_id, job_config=job_config)
        job.result()  # 업로드 완료될 때까지 대기

        print(f"BigQuery 테이블 {table_full_id}에 {len(df)} 행이 추가되었습니다.")

    except Exception as e:
        print(f"BigQuery 업로드 오류: {e}")

# 데이터셋 생성 확인 및 생성
create_dataset_if_not_exists()

# 처음 한 번 실행
collect_and_upload_bike_data()

# 10분마다 실행하도록 스케줄 설정
schedule.every(10).minutes.do(collect_and_upload_bike_data)

# 무한 루프로 스케줄 실행
print("10분마다 자동 데이터 수집 및 BigQuery 업로드를 시작합니다...")
try:
    while True:
        schedule.run_pending()
        time.sleep(1)
except KeyboardInterrupt:
    print("데이터 수집이 중단되었습니다.")

데이터셋 bike_analysis_project가 이미 존재합니다.
데이터 수집 시작: 2025-03-08 14:00:32.264160+09:00
1~1000 구간 데이터 1000개 수집 완료
1001~2000 구간 데이터 1000개 수집 완료
2001~3000 구간 데이터 715개 수집 완료
수집된 정류소 수: 2715
BigQuery 테이블 bike-analysis-project-452711.bike_analysis_project.seoul_bike_stations에 2715 행이 추가되었습니다.
10분마다 자동 데이터 수집 및 BigQuery 업로드를 시작합니다...
데이터 수집 시작: 2025-03-08 14:10:42.731246+09:00
1~1000 구간 데이터 1000개 수집 완료
1001~2000 구간 데이터 1000개 수집 완료
2001~3000 구간 데이터 715개 수집 완료
수집된 정류소 수: 2715
BigQuery 테이블 bike-analysis-project-452711.bike_analysis_project.seoul_bike_stations에 2715 행이 추가되었습니다.
데이터 수집 시작: 2025-03-08 14:20:52.641746+09:00
1~1000 구간 데이터 1000개 수집 완료
1001~2000 구간 데이터 1000개 수집 완료
2001~3000 구간 데이터 715개 수집 완료
수집된 정류소 수: 2715
BigQuery 테이블 bike-analysis-project-452711.bike_analysis_project.seoul_bike_stations에 2715 행이 추가되었습니다.
데이터 수집 시작: 2025-03-08 14:31:03.192630+09:00
1~1000 구간 데이터 1000개 수집 완료
1001~2000 구간 데이터 1000개 수집 완료
2001~3000 구간 데이터 715개 수집 완료
수집된 정류소 수: 2715
BigQuery 테이블 bike-analysis-project-452711.b