In [1]:
import databricks.koalas as ks
from pyspark.sql import SparkSession

import datetime

## create the Spark session to distribute processing across a cluster

In [2]:
spark = SparkSession.builder \
                    .appName('Jupyter BigQuery Storage')\
                    .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
                    .getOrCreate()

## Cluster update function will allow us to add & remove nodes to/from the cluster

In [3]:
def data_proc_update_cluster(project_id, region, cluster_name, service_account_json, non_preemptibile, preemptibile):
    from google.cloud import dataproc_v1 as dataproc
    from google.oauth2 import service_account

    api_endpoint = {"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
    
    
    credentials = service_account.Credentials.from_service_account_file(service_account_json)
    
    client = dataproc.ClusterControllerClient(credentials=credentials,
                                              client_options=api_endpoint)

    cluster = client.get_cluster(project_id=project_id, 
                                 region=region, 
                                 cluster_name=cluster_name)

    mask = { "paths": {"config.worker_config.num_instances": str(non_preemptibile),
                       "config.secondary_worker_config.num_instances": str(preemptibile),

                      }
           }

    cluster.config.worker_config.num_instances = non_preemptibile # Must update the cluster info itself
    cluster.config.secondary_worker_config.num_instances = preemptibile 

    operation = client.update_cluster(project_id=project_id, 
                                      region=region, 
                                      cluster=cluster,
                                      cluster_name=cluster_name,
                                      update_mask=mask)

    return operation.result()

## Extract data from BigQuery into a Spark dataframe

In [4]:
table = '`fh-bigquery.reddit_comments.2019_12`'
table = "dustinspark.demos.reddit_comments"

sp_df = spark.read \
             .format("bigquery") \
             .option("table", table) \
             .option('viewsEnabled', True) \
             .load()

## If you want to preview data you can use the toPandas() function
Be sure to limit the number of records before pulling in to Pandas or you can exceed memory


In [34]:
row_count = sp_df.count()
print(f'{row_count:,} rows in the data frame')

6738684849

In [5]:
sp_df.limit(10).toPandas()

Unnamed: 0,body
0,&gt;There is no rape in Ck2\n\n'Because she is...
1,"Take the ship channel tour - it's free, and ki..."
2,I never get my youth back now :'(
3,It's for the reasons I stated. If you feel th...
4,What an honor! :D\n\nIt was a bit weird to dri...
5,Also shouldn't it be step 1 cut and grill the ...
6,"Yeah, as far as I've been able to tell, the fo..."
7,HOW DO THE ROPES AFFECT THIS MATCH??
8,/r/titlegore
9,Hot Topic Guy - https://www.reddit.com/r/funko...


## Create a Koalas dataframe from the Spark dataframe
 It's very easy to change between a Spark, Koalas & Pandas dataframe
 
 Be mindful when using Pandas as that will pull all the data in to the master and could exceed memory

In [6]:
koalas_df = ks.DataFrame(sp_df)
# koala_to_spark_df = koalas_df.to_spark()
# spark_to_pandas_df = sp_df.toPandas()

## Most of the Pandas API is availble with Koalas
In this example you can see how easy it is to work with user defined functions (UDFs) 

## Process some data using 64 CPUs and save it to a GCS bucket

In [None]:
start = datetime.datetime.now()
koalas_df['body_2'] = koalas_df['body'].str.split(' ')
koalas_df['string_length'] = koalas_df['body'].str.len()
koalas_df['body_lower'] = koalas_df['body'].str.lower()

koala_to_spark_df = koalas_df.to_spark()

koala_to_spark_df.write.format("parquet").option("path", "gs://dusty-study/koalas_run_1").save() # save() is what triggers the DAG to execute

end = datetime.datetime.now()
run_time = end - start
rt_secs = round(run_time.total_seconds())
rt_mins = round(rt_secs / 60)
print(f'64 Core dataframe ran for {rt_secs} seconds / {rt_mins} minutes')

Pandas dataframe ran for 9487 seconds / 158 minutes


## Let's scale up the cluster 980 CPUs to increase performance
Preemptibile instances are 40% of the cost of a non-preemptiable one

## Costs

https://cloud.google.com/products/calculator#id=0ad8bb87-2d0d-446c-8533-9fee94cb677d

1   x Master

2   x Workers non-preemptable

980 x Workers preemptable

1 Hour

$79.09


### VERSUS

https://cloud.google.com/products/calculator#id=409ff1ba-8e54-4ede-9f5e-f999e19988b8

1   x Master

982 x Workers non-preemptable


1 Hour

$226.09

## SAVINGS

### **$147**

## total run cost for 980 premeptable CPUs at \\$1.33  per minute for 20 minutes = \\$26.5 

In [37]:
non_preemptibile = 2
preemptibile = round(980 / 4)
service_account_json = '/home/dustinwilliams/dataproc-jupyter-srvc-acct.json'
username = 'dustinwilliams'
project_id = 'dusty-study'
region = 'us-central1'
cluster_name = 'koalas'
total_cpus = (non_preemptibile + preemptibile) * 4

start = datetime.datetime.now()

update_result = data_proc_update_cluster(project_id, region, cluster_name, service_account_json, non_preemptibile, preemptibile)

In [None]:
%%time
koalas_df['word_array']    = koalas_df['body'].str.split(' ')
koalas_df['string_length'] = koalas_df['body'].str.len()
koalas_df['body_lower']    = koalas_df['body'].str.lower()

koala_to_spark_df = koalas_df.to_spark()

koala_to_spark_df.write.format("parquet").option("path", "gs://dusty-study/koalas_run_2").save() 



CPU times: user 288 ms, sys: 105 ms, total: 394 ms
Wall time: 18min 32s


In [None]:
non_preemptibile = 2
preemptibile = 0

update_result = data_proc_update_cluster(project_id, region, cluster_name, service_account_json, non_preemptibile, preemptibile)


end = datetime.datetime.now()
run_time = end - start
rt_secs = round(run_time.total_seconds())
rt_mins = round(rt_secs / 60)
print(f'DataProc cluster scaled up to {total_cpus} CPUs and ran for {rt_secs} seconds / {rt_mins} minutes')

DataProc cluster scaled up to 988 CPUs and ran for 1210 seconds / 20 minutes


## Bigquery can be used to offload processing

on demand pricing
### 1TB = $5

Slot reservations can reduce this even more

In [None]:
start = datetime.datetime.now()

In [None]:
%%bigquery 

CREATE OR REPLACE TABLE `dusty-study.temp.koalas_demo`
AS 
SELECT  body, 
        SPLIT(body, ' ')        AS word_array,
        CHARACTER_LENGTH(body)  AS string_length,
        LOWER(body)             AS body_lower 
FROM    dustinspark.demos.reddit_comments

In [None]:
end = datetime.datetime.now()
run_time = end - start
rt_secs = round(run_time.total_seconds())
rt_mins = round(rt_secs / 60)
print(f'Bigquery query ran for {rt_secs} seconds / {rt_mins} minutes')

Bigquery query ran for 615 seconds / 10 minutes


In [23]:
%%bigquery

CREATE OR REPLACE MODEL `temp.bqml_example`
OPTIONS(model_type='logistic_reg') AS
SELECT  IF(totals.transactions IS NULL, 0, 1) AS label,
        IFNULL(device.operatingSystem, "") AS os,
        device.isMobile AS is_mobile,
        IFNULL(geoNetwork.country, "") AS country,
        IFNULL(totals.pageviews, 0) AS pageviews
FROM   `bigquery-public-data.google_analytics_sample.ga_sessions_*`

In [24]:
%%bigquery bqml_df_eval

SELECT  *
FROM    ML.EVALUATE(MODEL `temp.bqml_example`, 
          (SELECT  IF(totals.transactions IS NULL, 0, 1)  AS label,
                   IFNULL(device.operatingSystem, "")     AS os,
                   device.isMobile                        AS is_mobile,
                   IFNULL(geoNetwork.country, "")         AS country,
                   IFNULL(totals.pageviews, 0)            AS pageviews
           FROM   `bigquery-public-data.google_analytics_sample.ga_sessions_*`
           LIMIT 10000))

In [25]:
bqml_df_eval.head()

Unnamed: 0,precision,recall,accuracy,f1_score,log_loss,roc_auc
0,0.5,0.141176,0.9915,0.220183,0.030425,0.980737


In [26]:
%%bigquery bqml_predictions

WITH ga_data AS (
SELECT  IFNULL(device.operatingSystem, "")  AS os,
        device.isMobile                     AS is_mobile,
        IFNULL(totals.pageviews, 0)         AS pageviews,
        IFNULL(geoNetwork.country, "")      AS country
FROM    `bigquery-public-data.google_analytics_sample.ga_sessions_*`)

SELECT  country,
        SUM(predicted_label) AS total_predicted_purchases
FROM    ML.PREDICT(MODEL `temp.bqml_example`, TABLE ga_data)
GROUP BY country
ORDER BY total_predicted_purchases DESC


In [27]:
bqml_predictions.head()

Unnamed: 0,country,total_predicted_purchases
0,United States,3548
1,Canada,94
2,Venezuela,60
3,Japan,34
4,Taiwan,30
