In [None]:
# ! pip install sqlalchemy==2.0.8 pybigquery==0.5

# 載入參數

In [3]:
# input:uid,start_lat,start_lng,created_at
#output: uid,start_lat,start_lng,hour_type,is_holiday,weekday
# start_lat,start_lng: round to 2 decimal points
# hour_type:convert created_at to taiwan time zone, convert to hour, rules to convert hour to hour type
# weekday: convert created_at to taiwan time zone,convert to weekday

import pandas as pd
from datetime import datetime
import pytz

def input_data(uid:int,start_lat: float,start_lng: float,created_at: datetime):
  uid=uid
  # # Convert to datetime
  # created_at_dt = datetime.strptime(created_at, '%Y-%m-%d %H:%M:%S')

# Define Taiwan timezone
  taiwan_tz = pytz.timezone('Asia/Taipei')

# Convert to Taiwan timezone
  created_at_taiwan = created_at.astimezone(taiwan_tz)

# Extract hour
  hour = created_at_taiwan.hour

# Hour to hour_type
  if 7 <= hour <= 9:
    category = '早尖峰'
  elif 10 <= hour < 12:
    category = '早離峰'
  elif 13 <= hour <= 16:
    category = '午離峰'
  elif 17 <= hour <= 19:
    category = '晚尖峰'
  elif 20 <= hour <= 22:
    category = '小晚尖'
  elif 2 <= hour <= 6:
    category = '凌晨'
  else: category='午夜'

  hour_type=category

# Extract weekday (0=Monday, 6=Sunday) and adjust to 1=Monday, 7=Sunday
  weekday = created_at_taiwan.weekday() + 1

# is hoilday
  if 1<=weekday<=5 :
    tag='0'
  else:
    tag='1'

  is_holiday=tag

# Round to 2 decimal places
  start_lat_rounded = round(start_lat, 2)
  start_lng_rounded = round(start_lng, 2)

# created_at

  created_at=created_at

  return uid,start_lat_rounded,start_lng_rounded,hour_type,is_holiday,weekday,created_at


In [8]:
# Create datetime objects in UTC
utc_dt1 = datetime(2024, 8, 13, 7, 0, tzinfo=pytz.utc)

start_time = time.time()
input_data(2184214,25.09202,121.46401,utc_dt1)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

Execution time: 0.00069427490234375 seconds


# Direct BigQuery Client (Native API)

In [6]:
# from google.cloud import bigquery as bq
# from google.oauth2 import service_account
# from datetime import datetime
# import pandas as pd
# import numpy as np
# import pytz

# def raw_data_producer(uid):
#   #讀取權限金鑰
#   credentials= service_account.Credentials.from_service_account_file("/content/sample_data/bq-key.json" )
#     #設定BigQuery權限
#   client = bq.Client(credentials=credentials)

#   query = """
#       select *
#       from `william.address_uid_all`
#       where uid=
#     """
#   query=query+str(uid)
#   result = client.query(query).to_dataframe()
#   return result

In [9]:
import time
from google.cloud import bigquery
from google.oauth2 import service_account

def fetch_data_with_uid(uid: int):
    # Load credentials and create a BigQuery client
    key_path = "/content/sample_data/bq-key.json"
    project_id = 'taxigo-production'
    dataset_id = 'recommend_address'

    credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(credentials=credentials, project=project_id)


    # Write the SQL query to filter by uid
    query = f"""
    SELECT *
    FROM `{project_id}.{dataset_id}.address_v2_training_data`
    WHERE uid = @uid
    """

    # # Create a query job with parameter binding to avoid SQL injection
    # query_job = client.query(query, job_config=bigquery.QueryJobConfig(
    #     query_parameters=[bigquery.ScalarQueryParameter("uid", "INT64", uid)]
    # ))

   # Set up job configuration
    job_config = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("uid", "INT64", uid)]
    )

    # Run the query
    query_job = client.query(query, job_config=job_config)

    # Fetch and return the results
    df = query_job.to_dataframe()
    return df

# Example usage


start_time = time.time()
df = fetch_data_with_uid(1274208)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")


Execution time: 2.0330827236175537 seconds


In [10]:
import time
from google.cloud import bigquery
from google.oauth2 import service_account

def end_matching_data(uid: int):
    # Load credentials and create a BigQuery client
    key_path = "/content/sample_data/bq-key.json"
    project_id = 'taxigo-production'
    dataset_id = 'recommend_address'

    credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(credentials=credentials, project=project_id)


    # Write the SQL query to filter by uid
    query = f"""
    SELECT *
    FROM `{project_id}.{dataset_id}.address_v2_suggestion`
    WHERE uid = @uid
    """

   # Set up job configuration
    job_config = bigquery.QueryJobConfig(
        query_parameters=[bigquery.ScalarQueryParameter("uid", "INT64", uid)]
    )
    # Run the query
    query_job = client.query(query, job_config=job_config)

    # Fetch and return the results
    df = query_job.to_dataframe()
    return df

# Example usage

uid = 1274208
start_time = time.time()
df = fetch_data_with_uid(uid)
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

Execution time: 1.6812825202941895 seconds


#SQLAlchemy ORM

In [24]:
from google.oauth2 import service_account
from sqlalchemy import create_engine, Column, Integer, String, DateTime,text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os

def create_bq_engine(key_path: str, project_id: str, dataset_id: str):
    """
    Creates and returns a SQLAlchemy engine connected to BigQuery using a service account key.

    Parameters:
    - key_path (str): The file path to the service account key JSON file.
    - project_id (str): The Google Cloud project ID.
    - dataset_id (str): The BigQuery dataset ID.

    Returns:
    - SQLAlchemy engine connected to BigQuery.
    """
  # Set the environment variable for credentials
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=key_path

    # Construct the connection string using the credentials
    connection_string = f"bigquery://{project_id}/{dataset_id}"

    # Create the SQLAlchemy engine with the credentials
    engine = create_engine(connection_string)

    return engine

# Example usage
key_path = "/content/sample_data/bq-key.json"   # Replace with your actual key file path
project_id = 'taxigo-production'  # Replace with your Google Cloud project ID
dataset_id = 'william'  # Replace with your BigQuery dataset ID

engine = create_bq_engine(key_path, project_id, dataset_id)

Session = sessionmaker(bind=engine)
session = Session()

# Use raw SQL to query the table
query = text("SELECT * FROM `taxigo-production.william.address_uid_all`")
try:
        result = session.execute(text("SELECT * FROM `taxigo-production.william.address_uid_all` LIMIT 10"))
        for row in result:
            print(row)
except Exception as e:
        print(f"An error occurred: {e}")


  engine = create_engine(connection_string)
INFO:sqlalchemy.engine.Engine:BEGIN (implicit)


An error occurred: SQLCompiler.__init__() got multiple values for argument 'cache_key'


  result = session.execute(text("SELECT * FROM `taxigo-production.william.address_uid_all` LIMIT 10"))
