## This notebook is part of the Apache Spark training delivered by CERN-IT
### Demo of Spark instrumenation on CERN SWAN
Contact: Luca.Canali@cern.ch

Run this notebook from Jupyter with Python kernel
- When using on CERN SWAN, do not attach the notebook to a Spark cluster, but rather run locally on the SWAN container
- If running this outside CERN SWAN, plese make sure to have PySpark installed: `pip install pyspark`

### First let's create a Spark Session

In [None]:
# Do not run this cell when running on CERN SWAN
# Rather use the Spark connector (the "star" button)
#
# When not running this on SWAN you also need additional steps
# to see Spark metrics in the Spark dashboard ,
# see https://github.com/cerndb/spark-dashboard

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("my demo app")  \
        .master("yarn") \
        .config("spark.executor.memory","4g") \
        .config("spark.executor.cores","4") \
        .config("spark.jars.packages","ch.cern.sparkmeasure:spark-measure_2.12:0.22") \
        .config("spark.ui.showConsoleProgress", "false") \
        .getOrCreate()


In [1]:
spark

## Run a TPCDS benchmark query
We use this to create some load, which allows to show the available Spark monitoring tools:
spark monitor, the Spark Web UI, Spark dashboard integration

In [2]:
# This uses TPCDS data in CERN Hadoop cluter
path="hdfs://analytix/project/spark/TPCDS/tpcds_10000_parquet_1.12.2/"
tables = ["date_dim", "store_sales", "item"]

for t in tables:
  print(f"Creating temporary view {t}")
  spark.read.parquet(path + t).createOrReplaceTempView(t)



Creating temporary view date_dim
22/11/02 16:49:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
Creating temporary view store_sales
Creating temporary view item


In [None]:
## TPCDS benchmark query Q3
q3 = """
SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg
FROM  date_dim dt, store_sales, item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
  AND store_sales.ss_item_sk = item.i_item_sk
  AND item.i_manufact_id = 128
  AND dt.d_moy=11
GROUP BY dt.d_year, item.i_brand, item.i_brand_id
ORDER BY dt.d_year, sum_agg desc, brand_id
LIMIT 100
"""

In [None]:
# Run TPCDS query Q3
result = spark.sql(q3).collect()

- Demo:  
   - while the query runs go to the Spark Web UI  
   - also see Spark metrics visualized in the Spark dashboard  
    

## Additional tools: sparkMeasure
SparkMeasure is an external tools that simplifies the collection and analysis of Spark performance metrics.   
See: https://github.com/LucaCanali/sparkMeasure

In [None]:
# install the Python wrapper API
# Note to use sparkMeasure you need configuration to get and use the jar, example
# spark.jars.packages=ch.cern.sparkmeasure:spark-measure_2.12:0.22

!pip install --user sparkmeasure

In [None]:
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)


In [None]:
stagemetrics.runandmeasure(globals(), 'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')

In [None]:
stagemetrics.runandmeasure(globals(), f'spark.sql("""{q3}""").collect()')

## Increase Logging verbosity

In [None]:
# increase logging verbosity
spark.sparkContext.setLogLevel("INFO")

In [None]:
# Run TPCDS query Q3
result = spark.sql(q3).collect()

In [None]:
#  logging verbosity back to normal
spark.sparkContext.setLogLevel("WARN")

In [None]:
spark.stop()