In [1]:
!scala -version

Scala code runner version 2.12.10 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc.


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName('BigQuery Storage & Spark DataFrames') \
    .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
    .getOrCreate()

In [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)


In [4]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
    .format("bigquery") \
    .option("table", table) \
    .option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
    .load()

df_wiki_pageviews.printSchema()

root
 |-- datehour: timestamp (nullable = true)
 |-- wiki: string (nullable = true)
 |-- title: string (nullable = true)
 |-- views: long (nullable = true)



In [5]:
df_wiki_pageviews.createOrReplaceTempView("wiki_pageviews")

In [6]:
df_wiki_en = spark.sql("""
SELECT 
 title, wiki, views
FROM wiki_pageviews
WHERE views > 1000 AND wiki in ('en', 'en.m')
""").cache()

df_wiki_en

title,wiki,views
-,en,143159
-,en,14969
-,en,186802
-,en,131686
-,en,213787
-,en,211910
-,en,186675
-,en,21901
-,en,163710
-,en,23527


In [7]:
df_wiki_en.createOrReplaceTempView("wiki_en")

In [8]:
df_wiki_en_totals = spark.sql("""
SELECT 
 title, 
 SUM(views) as total_views
FROM wiki_en
GROUP BY title
ORDER BY total_views DESC
""")

df_wiki_en_totals

title,total_views
Main_Page,10939337
United_States_Senate,5619797
-,3852360
Special:Search,1538334
2019–20_coronavir...,407042
2020_Democratic_P...,260093
Coronavirus,254861
The_Invisible_Man...,233718
Super_Tuesday,201077
Colin_McRae,200219


In [None]:
# Update to your GCS bucket

gcs_bucket = 'eben-lab4'

# Update to your BigQuery dataset name you created
bq_dataset = 'my-spark-project-442210.new_dataset'

# Enter BigQuery table name you want to create or overwrite. 
# If the table does not exist it will be created when you run the write function
bq_table = 'wiki_total_pageviews_sql'

df_wiki_en_totals.write \
  .format("bigquery") \
  .option("table", "{}.{}".format(bq_dataset, bq_table)) \
  .option("temporaryGcsBucket", gcs_bucket) \
  .mode('overwrite') \
  .save()