In [3]:
NOTEBOOK = 'dataproc'
REGION = 'us-central1'

GCS_BUCKET = PROJECT_ID
GCS_FOLDER = f'demos/{NOTEBOOK}'

In [5]:
# PROJECT_ID = input('Enter your GCP project ID here')

In [None]:
# BQ_DATASET = input('Enter your BQ dataset name here')

In [4]:
from google.cloud import bigquery
from google.cloud import storage

In [5]:
bq = bigquery.Client(project = PROJECT_ID)
gcs = storage.Client()

In [6]:
DIR = NOTEBOOK
!rm -rf {NOTEBOOK}
!mkdir -p {NOTEBOOK}

In [7]:
buckets = !gsutil list -p {PROJECT_ID}
if f"gs://{GCS_BUCKET}/" not in buckets:
    ! gsutil mb -l us -c standard gs://{GCS_BUCKET}
else: print(f"Bucket gs://{GCS_BUCKET} already exists")

Creating gs://fast-envoy-329104/...


### BigQuery Dataset

In [8]:
ds = bigquery.Dataset(f"{PROJECT_ID}.{BQ_DATASET}")
ds.location = 'US'
ds = bq.create_dataset(dataset = ds, exists_ok = True)

### Dataproc

In [9]:
status = !gcloud compute networks subnets describe default --region={REGION} --format="get(privateIpGoogleAccess)"
if status[0] == 'False':
  !gcloud compute networks subnets update default --region={REGION} --enable-private-ip-google-access
  status = !gcloud compute networks subnets describe default --region={REGION} --format="get(privateIpGoogleAccess)"
print(f"Private Google Access is Enable = {status[0]}")

Updated [https://www.googleapis.com/compute/v1/projects/fast-envoy-329104/regions/us-central1/subnetworks/default].


To take a quick anonymous survey, run:
  $ gcloud survey

Private Google Access is Enable = True


In [6]:
with open('gdc_str.sql', 'r') as f:
    sql = f.read()

### Pyspark Job

In [10]:
%%writefile {DIR}/myjob.py
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from pyspark.sql import SparkSession
import sys

print("Number of Arguments: {0} arguments.".format(len(sys.argv)))
print("Arguments List: {0}".format(str(sys.argv)))

# create a session
spark = SparkSession.builder.appName('spark-bigquery').getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
spark.conf.set('temporaryGcsBucket', sys.argv[1])

# Perform word count.
query = spark.sql(sql)
query.show(n=5)
query.printSchema()

# Saving the data to BigQuery
query.write.format('bigquery').option('table', sys.argv[2]).mode('overwrite').save()

Writing dataproc/myjob.py


In [14]:
!gcloud dataproc batches submit pyspark {DIR}/myjob.py \
--project={PROJECT_ID} \
--region={REGION} \
--deps-bucket={GCS_BUCKET} \
--jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.24.2.jar \
-- {GCS_BUCKET}/{GCS_FOLDER} \
    {PROJECT_ID}:{BQ_DATASET}.myjob_output

Batch [6836e1c7abcf4466aa81ef762b654d27] submitted.
Using the default container image
unpacking docker.io/library/serverless-spark-default:1.0 (sha256:59f351b542666f6eb713c61aa9e38292daf610ba5489196fbb420c4cbd8f1283)...done
Waiting for container log creation
PYSPARK_PYTHON=/opt/dataproc/conda/bin/python
JAVA_HOME=/usr/lib/jvm/temurin-11-jdk-amd64
SPARK_EXTRA_CLASSPATH=
:: loading settings :: file = /etc/spark/conf/ivysettings.xml
Number of Arguments: 3 arguments.
Arguments List: ['/tmp/srvls-batch-d7cbb60e-0508-4594-b6d1-bbc36932b23b/myjob.py', 'fast-envoy-329104/demos/dataproc', 'fast-envoy-329104:dataproc.myjob_output']
22/08/22 06:19:23 INFO DirectBigQueryRelation: Querying table bigquery-public-data.samples.shakespeare, parameters sent from Spark: requiredColumns=[word,word_count], filters=[]
22/08/22 06:19:23 INFO DirectBigQueryRelation: Going to read from bigquery-public-data.samples.shakespeare columns=[word, word_count], filter=''
22/08/22 06:19:25 INFO DirectBigQueryRelation: 

In [15]:
bq.query(query = f"SELECT * FROM {PROJECT_ID}.{NOTEBOOK}.myjob_output ORDER BY word_count DESC LIMIT 5").to_dataframe()

Unnamed: 0,word,word_count
0,the,25568
1,I,21028
2,and,19649
3,to,17361
4,of,16438
