In [None]:
from pyspark.sql import SparkSession
import urllib.request
import os

os.environ["PYSPARK_ALLOW_INSECURE_GATEWAY"] = "1"

# Download dependencies for BigQuery and GCS
gc_jars = ['https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.1.1/gcs-connector-hadoop3-2.1.1-shaded.jar',
        'https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/bigquery-connector/hadoop3-1.2.0/bigquery-connector-hadoop3-1.2.0-shaded.jar',
        'https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.22.2/spark-bigquery-with-dependencies_2.12-0.22.2.jar',
        'https://repo1.maven.org/maven2/io/openlineage/openlineage-spark_2.12/1.25.0/openlineage-spark_2.12-1.25.0.jar']

files = [urllib.request.urlretrieve(url)[0] for url in gc_jars]

# Set these to your own project and bucket
project_id = 'YOUR-PROJECT-ID'
gcs_bucket = 'YOUR-GCS-BUCKET'
credentials_file = '/home/jovyan/notebooks/YOUR-CREDENTIAL-KEY-FILE-FOR-SERVICE-ACCOUNT.json'

spark = (SparkSession.builder.master('local').appName('openlineage_spark_test')
        .config('spark.jars', ",".join(files))
        
        # Install and set up the OpenLineage listener
        #.config('spark.jars.packages', 'io.openlineage:openlineage-spark:1.25.0')
        .config('spark.extraListeners', 'io.openlineage.spark.agent.OpenLineageSparkListener')
        .config('spark.openlineage.transport.url', 'http://marquez-api:5000')
        .config('spark.openlineage.transport.type', 'http')
        .config('spark.openlineage.namespace', 'spark_integration')
        
        # Configure the Google credentials and project id
        .config('spark.executorEnv.GCS_PROJECT_ID', project_id)
        .config('spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS', '/home/jovyan/notebooks/gcs/bq-spark-demo.json')
        .config('spark.hadoop.google.cloud.auth.service.account.enable', 'true')
        .config('spark.hadoop.google.cloud.auth.service.account.json.keyfile', credentials_file)
        .config('spark.hadoop.fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
        .config('spark.hadoop.fs.AbstractFileSystem.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS')
        .config("spark.hadoop.fs.gs.project.id", project_id)
        .getOrCreate())

In [2]:
from pyspark.sql.functions import expr, col

mask_use = spark.read.format('bigquery') \
    .option('parentProject', project_id) \
    .option('table', 'bigquery-public-data:covid19_nyt.mask_use_by_county') \
    .load() \
    .select(expr("always + frequently").alias("frequent"),
            expr("never + rarely").alias("rare"),
            "county_fips_code")

opendata = spark.read.format('bigquery') \
    .option('parentProject', project_id) \
    .option('table', 'bigquery-public-data.covid19_open_data.covid19_open_data') \
    .load() \
    .filter("country_name == 'United States of America'") \
    .filter("date == '2021-10-31'") \
    .select("location_key",
            expr('cumulative_deceased/(population/100000)').alias('deaths_per_100k'),
            expr('cumulative_persons_fully_vaccinated/(population - population_age_00_09)').alias('vaccination_rate'),
            col('subregion2_code').alias('county_fips_code'))

joined = mask_use.join(opendata, 'county_fips_code')

joined.write.mode('overwrite').parquet(f'gs://{gcs_bucket}/demodata/covid_deaths_and_mask_usage/')

In [3]:
spark.read.parquet(f'gs://{gcs_bucket}/demodata/covid_deaths_and_mask_usage/').count()

3142