In [None]:
# BigQuery-Spark Integration Test


# PySpark-BigQuery Integration Test

This notebook verifies that PySpark can successfully connect to and query Google BigQuery from within your development environment. It demonstrates:

1. Creating a SparkSession with the BigQuery connector
2. Verifying authentication and project information
3. Reading from a public BigQuery dataset
4. Reading from your authenticated project's datasets
5. Running a simple query through Spark

## Prerequisites

- Apache Spark 3.x installed (included in this container)
- Google Cloud authentication completed (via `gcloud auth login` or service account)
- BigQuery connector JAR (referenced via URL in the notebook)
- Appropriate BigQuery permissions for datasets you want to access

## Packages Required

- pyspark
- google-cloud-bigquery
- pandas
- pyarrow (for data conversion)


In [None]:
# Configuration Variables
# -----------------------
# Replace these with your actual values

# Your GCP dataset and table to query
dataset_id = "my_dataset"  # Replace with your dataset name
table_id = "my_table"       # Replace with your table name

# A GCS bucket you have read/write access to (for temporary data)
bucket_id = "my_bucket"  # Replace with your bucket name

# Import required libraries
import subprocess
from pyspark.sql import SparkSession


## 1. GCP Authentication Information

First, let's check which account and project you're currently authenticated with:


In [None]:
# Display GCP authentication information
print("GCP Authentication Information:")
try:
    # Get currently authenticated account
    account_cmd = ["gcloud", "config", "get-value", "account"]
    account = subprocess.check_output(account_cmd).decode().strip()
    print(f"Authenticated as: {account}")

    # Get current project
    project_cmd = ["gcloud", "config", "get-value", "project"]
    project = subprocess.check_output(project_cmd).decode().strip()
    print(f"Current Project: {project}")
except Exception as e:
    print(f"Error retrieving authentication info: {e}")

print("\n" + "-" * 50 + "\n")


## 2. Initialize Spark with BigQuery Connector

Now we'll create a SparkSession with the BigQuery connector configured. The connector JAR is referenced via URL.


In [None]:
# Initialize Spark session with BigQuery connector
print("Initializing Spark session with BigQuery connector...")
spark = (
    SparkSession.builder.appName("BigQuery Integration Test")
    .config(
        "spark.jars",
        "https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-latest_2.12.jar",
    )
    .getOrCreate()
)

# Display Spark version for reference
spark_version = spark.version
print(f"Using Apache Spark version: {spark_version}")

print("\n" + "-" * 50 + "\n")


## 3. Test with Public Dataset

Let's first test the connector using a public dataset that doesn't require special permissions.
We'll query the Shakespeare public dataset available in BigQuery.


In [None]:
# Test with a public dataset (no authentication needed)
print("Public dataset test:")
try:
    # Read Shakespeare public dataset
    public_data = (
        spark.read.format("bigquery")
        .option("table", "bigquery-public-data:samples.shakespeare")
        .load()
    )

    # Display schema and sample data
    print("Schema:")
    public_data.printSchema()

    print("\nSample data (5 rows):")
    public_data.show(5, truncate=False)

    print(f"Total rows in public dataset: {public_data.count()}")
except Exception as e:
    print(f"Error accessing public dataset: {e}")

print("\n" + "-" * 50 + "\n")


## 4. Test with Your Project's Dataset

Now let's try with a dataset in your authenticated project. Make sure you've set the correct values for `dataset_id` and `table_id` at the top of this notebook.


In [None]:
# Now try with your authenticated project
print("Authenticated project test:")

try:
    # If you have datasets in your project, this will work
    your_data = (
        spark.read.format("bigquery")
        .option("table", f"{project}.{dataset_id}.{table_id}")
        .option("parentProject", project)
        .load()
    )

    print("Schema of your dataset:")
    your_data.printSchema()

    print("\nSample data from your dataset (5 rows):")
    your_data.show(5, truncate=False)

    print(f"Total rows in your dataset: {your_data.count()}")
except Exception as e:
    print(f"Error accessing your project's dataset: {e}")
    print("\nIf this failed, check:")
    print("1. Your authentication is correct")
    print("2. The dataset and table exist in your project")
    print(f"3. You have permissions to access {project}.{dataset_id}.{table_id}")
    print("4. You've updated the notebook with correct dataset_id and table_id values")

print("\n" + "-" * 50 + "\n")


## 5. Execute a Simple Query

Finally, let's demonstrate executing a simple query directly through the connector.
This will use the temporary bucket you specified for results materialization.


In [None]:
# Additional test - simple query execution
print("Executing a simple BigQuery query through Spark:")
try:
    # First, verify your dataset exists
    print(f"Using dataset: {dataset_id}")

    # Configure a temporary Cloud Storage bucket (required for the connector)
    spark.conf.set("temporaryGcsBucket", bucket_id)

    # Execute the query with proper materialization dataset
    query_result = (
        spark.read.format("bigquery")
        .option("viewsEnabled", "true")
        .option(
            "query", "SELECT 'Success!' as message, CURRENT_TIMESTAMP() as timestamp"
        )
        .option("parentProject", project)
        .option(
            "materializationDataset", dataset_id
        )  # Use this option instead of "dataset"
        .load()
    )

    query_result.show(truncate=False)
except Exception as e:
    print(f"Error executing query: {e}")

# Clean up
spark.stop()
print("\nSpark session closed. Test complete.")


## Troubleshooting

If you encounter errors in this notebook:

1. **Authentication Issues**
   - Run `!gcloud auth login` in a notebook cell
   - Check if your service account has appropriate permissions

2. **Dataset/Table Access**
   - Verify the dataset and table names are correct
   - Ensure you have permissions to access them
   - Check if you're using the correct project

3. **Temporary Bucket Issues**
   - Verify the bucket exists and you have read/write access
   - Try creating a new bucket with your project's default service account access

4. **Connector Issues**
   - Check if the connector JAR is accessible
   - Make sure there are no version conflicts with your Spark installation

5. **Network Issues**
   - Verify your container has internet access
   - Check if any firewalls are blocking the connection to GCP

For more information on the BigQuery connector for Spark, see the [official documentation](https://github.com/GoogleCloudDataproc/spark-bigquery-connector).
