In [1]:
import time
from datetime import datetime
import pandas as pd          # pip install pandas
import pypyodbc as odbc      # pip install pypyodbc
from google.cloud import bigquery  # pip install google-cloud-bigquery
import io                    # Import the io module
import os
from google.oauth2 import service_account
from sqlserver import SQLServer

### Connect to SQL Server database
SERVER_NAME = r'DESKTOP-A1B4UHT\SQLEXPRESS'
DATABASE_NAME = 'AdventureWorks2022'
sql_server_instance = SQLServer(SERVER_NAME, DATABASE_NAME)
sql_server_instance.connect_to_sql_server()

### Set up Google Cloud credentials
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = r"C:\Users\Admin\Downloads\DWH\seventh-jet-424513-h5-a426c383f9d7.json"

Connected


In [2]:
def load_data_to_bigquery(sql_query, table_id):
    # Step 2. Query the records from SQL Server
    columns, records = sql_server_instance.query(sql_query)
    df = pd.DataFrame(records, columns=columns)
    df["Created_dt"] = datetime.now()

    # Step 3. Define table schema
    client = bigquery.Client()

    # Configure job
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        write_disposition='WRITE_TRUNCATE'
    )

    # Convert the DataFrame to a CSV string
    csv_data = df.to_csv(index=False)

    # Load data to BigQuery from CSV string
    job = client.load_table_from_file(
        file_obj=io.StringIO(csv_data),
        destination=table_id,
        job_config=job_config
    )

    # Wait for the load job to complete
    job.result()
    print(f"Data loaded successfully to BigQuery table {table_id}")

# Define the SQL queries and corresponding BigQuery table IDs
tasks = [
    {"sql_query": "SELECT * FROM Sales.Customer", "table_id": "seventh-jet-424513-h5.STAGING.Customer"},

    {"sql_query": "SELECT * FROM Sales.CurrencyRate", "table_id": "seventh-jet-424513-h5.STAGING.CurrencyRate"},

    {"sql_query": "SELECT * FROM Sales.Currency", "table_id": "seventh-jet-424513-h5.STAGING.Currency"},

    {"sql_query": "SELECT * FROM Sales.SalesOrderHeader", "table_id": "seventh-jet-424513-h5.STAGING.SalesOrderHeader"},

    {"sql_query": "SELECT * FROM Sales.SalesOrderDetail", "table_id": "seventh-jet-424513-h5.STAGING.SalesOrderDetail"},

    {"sql_query": "SELECT * FROM Production.Product", "table_id": "seventh-jet-424513-h5.STAGING.Product"},

    {"sql_query": "SELECT * FROM Production.ProductSubcategory", "table_id": "seventh-jet-424513-h5.STAGING.ProductSubcategory"},

    {"sql_query": "SELECT * FROM Production.ProductCategory", "table_id": "seventh-jet-424513-h5.STAGING.ProductCategory"},

    {"sql_query": "SELECT * FROM Person.StateProvince", "table_id": "seventh-jet-424513-h5.STAGING.StateProvince"},

    {"sql_query": "SELECT * FROM Person.CountryRegion", "table_id": "seventh-jet-424513-h5.STAGING.CountryRegion"},

    {"sql_query": "SELECT * FROM Sales.SalesOrderHeaderSalesReason", "table_id": "seventh-jet-424513-h5.STAGING.SalesOrderHeaderSalesReason"},

    {"sql_query": "SELECT * FROM Sales.SalesReason", "table_id": "seventh-jet-424513-h5.STAGING.SalesReason"},
]

# Run the load process for each task
for task in tasks:
    load_data_to_bigquery(task["sql_query"], task["table_id"])

Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.Customer
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.CurrencyRate
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.Currency
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.SalesOrderHeader
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.SalesOrderDetail
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.Product
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.ProductSubcategory
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.ProductCategory
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.StateProvince
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.CountryRegion
Data loaded successfully to BigQuery table seventh-jet-424513-h5.STAGING.SalesOrderHeaderSalesReason
Data loaded successfully to BigQu

In [3]:

# Step 2. Query the records from SQL Server
sql_statement = """
SELECT 
AddressID,
AddressLine1,
AddressLine2,
City,
StateProvinceID

FROM Person.Address 
"""

columns, records = sql_server_instance.query(sql_statement)
Address = pd.DataFrame(records, columns=columns)
Address["Created_dt"] = datetime.now()
Address["addressline1"] = Address["addressline1"].astype(str)
Address["addressline2"] = Address["addressline1"].astype(str)

# Step 3. Define table schema
client = bigquery.Client()

# If you don't want to specify column schema, set autodetect to True
job_config = bigquery.LoadJobConfig(
    autodetect=True,
    #schema=[
        #bigquery.SchemaField("orderdate", 'TIMESTAMP'),
        #bigquery.SchemaField("duedate", 'TIMESTAMP'),
        #bigquery.SchemaField("shipdate", 'TIMESTAMP'),
    #],
    # WRITE_APPEND, WRITE_EMPTY
    write_disposition='WRITE_TRUNCATE'
)

# Step 4. Load data to BigQuery
table_id = 'seventh-jet-424513-h5.STAGING.Address'  # Replace with your project and dataset ID

# Convert the DataFrame to a CSV string
csv_data = Address.to_csv(index=False)

# Load data to BigQuery from CSV string
job = client.load_table_from_file(
    file_obj=io.StringIO(csv_data),
    destination=table_id,
    job_config=job_config
)

# Wait for the load job to complete
job.result()

print("Data loaded successfully to BigQuery")


Data loaded successfully to BigQuery
