<img src="https://swan.web.cern.ch/sites/swan.web.cern.ch/files/pictures/logo_swan_letters.png" alt="SWAN" style="float: left; width: 25%; margin-right: 15%; margin-left: 15%; margin-bottom: 2.0em;">
<img src="http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png" alt="EP-SFT" style="float: left; width: 25%; margin-right: 10%; margin-bottom: 2.0em;">
<p style="clear: both;">
# **Integration of SWAN with Spark clusters**
<hr style="border-top-width: 4px; border-top-color: #34609b;">

This notebook demonstrates the functionality provided by a SWAN server that allows to offload computations to an external Spark cluster. The code below works with Spark version 2.2.0 or higher. We will connect to the Spark cluster that was previously selected in the SWAN web form.

In this example, we will query an HBase database and show some results. We will first acquire the necessary credentials to access the database.

In [1]:
import getpass
import os, sys

print("Please enter your password")
ret = os.system("echo \"%s\" | kinit" % getpass.getpass())

if ret == 0: print("Credentials created successfully")
else:        sys.stderr.write('Error creating credentials, return code: %s\n' % ret)

Please enter your password
········
Credentials created successfully


Next, some Spark imports.

In [2]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

Now we will create the SparkContext, where we will configure the Spark driver to connect to the previously selected cluster. We will also define the necessary jars (note that the HBase jars are already located on CVMFS). Note that this cell will take some time to run since it triggers the first connection to the cluster.

It is worth pointing out that **this configuration step will be automatized**: a graphical interface will hide most of these configuration options and allow to set other parameters like the number of executors.

In [3]:
conf = SparkConf()

conf.set('spark.driver.host', os.environ['SERVER_HOSTNAME'])
conf.set('spark.driver.port', os.environ['SPARK_PORT_1'])
conf.set('spark.blockManager.port', os.environ['SPARK_PORT_2'])
conf.set('spark.ui.port', os.environ['SPARK_PORT_3'])
conf.set('spark.master', 'yarn')
conf.set('spark.authenticate', True)
conf.set('spark.network.crypto.enabled', True)
conf.set('spark.authenticate.enableSaslEncryption', True)

conf.set('spark.jars', '{lcgview}/lib/accsoft/accsoft-nxcals-data-access-0.1.4.jar,{lcgview}/lib/accsoft/dependency/accsoft-nxcals-common-0.1.4.jar,{lcgview}/lib/accsoft/dependency/accsoft-nxcals-service-client-0.1.4.jar,{lcgview}/lib/accsoft/dependency/activation-1.1.jar,{lcgview}/lib/accsoft/dependency/apacheds-i18n-2.0.0-M15.jar,{lcgview}/lib/accsoft/dependency/apacheds-kerberos-codec-2.0.0-M15.jar,{lcgview}/lib/accsoft/dependency/api-asn1-api-1.0.0-M20.jar,{lcgview}/lib/accsoft/dependency/api-util-1.0.0-M20.jar,{lcgview}/lib/accsoft/dependency/aspectjrt-1.8.10.jar,{lcgview}/lib/accsoft/dependency/aspectjweaver-1.8.10.jar,{lcgview}/lib/accsoft/dependency/avro-1.8.1.jar,{lcgview}/lib/accsoft/dependency/commons-beanutils-1.7.0.jar,{lcgview}/lib/accsoft/dependency/commons-beanutils-core-1.8.0.jar,{lcgview}/lib/accsoft/dependency/commons-cli-1.2.jar,{lcgview}/lib/accsoft/dependency/commons-codec-1.10.jar,{lcgview}/lib/accsoft/dependency/commons-collections-3.2.2.jar,{lcgview}/lib/accsoft/dependency/commons-compress-1.8.1.jar,{lcgview}/lib/accsoft/dependency/commons-configuration-1.6.jar,{lcgview}/lib/accsoft/dependency/commons-daemon-1.0.13.jar,{lcgview}/lib/accsoft/dependency/commons-digester-1.8.jar,{lcgview}/lib/accsoft/dependency/commons-el-1.0.jar,{lcgview}/lib/accsoft/dependency/commons-httpclient-3.1.jar,{lcgview}/lib/accsoft/dependency/commons-io-2.5.jar,{lcgview}/lib/accsoft/dependency/commons-lang-2.6.jar,{lcgview}/lib/accsoft/dependency/commons-lang3-3.5.jar,{lcgview}/lib/accsoft/dependency/commons-logging-1.2.jar,{lcgview}/lib/accsoft/dependency/commons-math-2.1.jar,{lcgview}/lib/accsoft/dependency/commons-math3-3.4.1.jar,{lcgview}/lib/accsoft/dependency/commons-net-2.2.jar,{lcgview}/lib/accsoft/dependency/commons-pool2-2.4.2.jar,{lcgview}/lib/accsoft/dependency/config-1.3.1.jar,{lcgview}/lib/accsoft/dependency/curator-client-2.7.1.jar,{lcgview}/lib/accsoft/dependency/curator-framework-2.7.1.jar,{lcgview}/lib/accsoft/dependency/curator-recipes-2.4.0.jar,{lcgview}/lib/accsoft/dependency/disruptor-3.3.0.jar,{lcgview}/lib/accsoft/dependency/findbugs-annotations-1.3.9-1.jar,{lcgview}/lib/accsoft/dependency/gson-2.2.4.jar,{lcgview}/lib/accsoft/dependency/guava-16.0.jar,{lcgview}/lib/accsoft/dependency/hadoop-annotations-2.6.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hadoop-auth-2.6.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hadoop-common-2.6.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hadoop-hdfs-2.6.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hadoop-hdfs-2.6.0-cdh5.7.5-tests.jar,{lcgview}/lib/accsoft/dependency/hamcrest-core-1.3.jar,{lcgview}/lib/accsoft/dependency/hbase-annotations-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hbase-client-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hbase-common-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hbase-common-1.2.0-cdh5.7.5-tests.jar,{lcgview}/lib/accsoft/dependency/hbase-hadoop2-compat-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hbase-hadoop-compat-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hbase-prefix-tree-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hbase-procedure-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hbase-protocol-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/hbase-server-1.2.0-cdh5.7.5.jar,{lcgview}/lib/accsoft/dependency/high-scale-lib-1.1.1.jar,{lcgview}/lib/accsoft/dependency/hortonworks-shc-core-cern-1.0.3-2.1-s_2.11-CERN.jar,{lcgview}/lib/accsoft/dependency/htrace-core-3.2.0-incubating.jar,{lcgview}/lib/accsoft/dependency/htrace-core4-4.0.1-incubating.jar,{lcgview}/lib/accsoft/dependency/httpclient-4.5.2.jar,{lcgview}/lib/accsoft/dependency/httpcore-4.4.4.jar,{lcgview}/lib/accsoft/dependency/jackson-annotations-2.6.0.jar,{lcgview}/lib/accsoft/dependency/jackson-core-2.6.5.jar,{lcgview}/lib/accsoft/dependency/jackson-core-asl-1.9.13.jar,{lcgview}/lib/accsoft/dependency/jackson-databind-2.6.5.jar,{lcgview}/lib/accsoft/dependency/jackson-jaxrs-1.8.8.jar,{lcgview}/lib/accsoft/dependency/jackson-mapper-asl-1.9.13.jar,{lcgview}/lib/accsoft/dependency/jackson-xc-1.8.3.jar,{lcgview}/lib/accsoft/dependency/jamon-runtime-2.4.1.jar,{lcgview}/lib/accsoft/dependency/jasper-compiler-5.5.23.jar,{lcgview}/lib/accsoft/dependency/jasper-runtime-5.5.23.jar,{lcgview}/lib/accsoft/dependency/javax.servlet-api-3.1.0.jar,{lcgview}/lib/accsoft/dependency/jaxb-api-2.2.2.jar,{lcgview}/lib/accsoft/dependency/jaxb-impl-2.2.3-1.jar,{lcgview}/lib/accsoft/dependency/jcodings-1.0.8.jar,{lcgview}/lib/accsoft/dependency/jersey-core-1.9.jar,{lcgview}/lib/accsoft/dependency/jersey-json-1.9.jar,{lcgview}/lib/accsoft/dependency/jersey-server-1.9.jar,{lcgview}/lib/accsoft/dependency/jets3t-0.7.1.jar,{lcgview}/lib/accsoft/dependency/jettison-1.1.jar,{lcgview}/lib/accsoft/dependency/jetty-6.1.26.cloudera.4.jar,{lcgview}/lib/accsoft/dependency/jetty-sslengine-6.1.26.cloudera.4.jar,{lcgview}/lib/accsoft/dependency/jetty-util-6.1.26.cloudera.4.jar,{lcgview}/lib/accsoft/dependency/joni-2.1.2.jar,{lcgview}/lib/accsoft/dependency/jsch-0.1.42.jar,{lcgview}/lib/accsoft/dependency/jsp-2.1-6.1.14.jar,{lcgview}/lib/accsoft/dependency/jsp-api-2.1-6.1.14.jar,{lcgview}/lib/accsoft/dependency/leveldbjni-all-1.8.jar,{lcgview}/lib/accsoft/dependency/log4j-1.2-api-2.6.2.jar,{lcgview}/lib/accsoft/dependency/log4j-api-2.6.2.jar,{lcgview}/lib/accsoft/dependency/log4j-core-2.6.2.jar,{lcgview}/lib/accsoft/dependency/log4j-slf4j-impl-2.6.2.jar,{lcgview}/lib/accsoft/dependency/metrics-core-2.2.0.jar,{lcgview}/lib/accsoft/dependency/netty-all-4.0.23.Final.jar,{lcgview}/lib/accsoft/dependency/paranamer-2.7.jar,{lcgview}/lib/accsoft/dependency/protobuf-java-2.5.0.jar,{lcgview}/lib/accsoft/dependency/scala-library-2.11.8.jar,{lcgview}/lib/accsoft/dependency/slf4j-api-1.7.21.jar,{lcgview}/lib/accsoft/dependency/snappy-java-1.1.1.3.jar,{lcgview}/lib/accsoft/dependency/stax-api-1.0-2.jar,{lcgview}/lib/accsoft/dependency/xmlenc-0.52.jar,{lcgview}/lib/accsoft/dependency/xz-1.5.jar,{lcgview}/lib/accsoft/dependency/zookeeper-3.4.5-cdh5.7.5.jar'.format(lcgview = os.environ['LCG_VIEW']))
conf.set('spark.driver.extraJavaOptions', '-Dservice.url=http://cs-ccr-nxcalsstr1.cern.ch:19093')
conf.set('spark.driver.memory', '8g')

sc = SparkContext(conf = conf)
spark = SparkSession(sc)

Once Spark is configured, we can go ahead and query the database.

In [4]:
from cern.accsoft.nxcals.pyquery.builders import *

In [5]:
query = DevicePropertyQueryBuilder().system("CMW").device("RADMON.PS-1").property("ExpertMonitoringAcquisition") \
    .start_time("2017-06-10 00:00:00.000").end_time("2017-06-11 00:00:00.000").build_query()
        
df1 = spark.read.options(**query.get_map()).format("cern.accsoft.nxcals.data.access.api").load()

df1.printSchema()

root
 |-- acqStamp: long (nullable = true)
 |-- nxcals_entity_id: long (nullable = true)
 |-- cyclestamp: long (nullable = true)
 |-- voltage_18V: double (nullable = true)
 |-- current_8V5: double (nullable = true)
 |-- voltage_3V3: double (nullable = true)
 |-- temperatureSensorBoard: double (nullable = true)
 |-- current_18V: double (nullable = true)
 |-- PinDiodeVoltageOutOfRange: boolean (nullable = true)
 |-- property: string (nullable = true)
 |-- selector: string (nullable = true)
 |-- temperatureDeported: double (nullable = true)
 |-- class: string (nullable = true)
 |-- current_radfet: double (nullable = true)
 |-- Radfet2VoltageOutOfRange: boolean (nullable = true)
 |-- voltage_memoryBank2: double (nullable = true)
 |-- voltage_sensorAdcRef: double (nullable = true)
 |-- __record_version__: long (nullable = true)
 |-- current_memBanks: double (nullable = true)
 |-- voltage_3VNeg: double (nullable = true)
 |-- Radfet1VoltageOutOfRange: boolean (nullable = true)
 |-- nxcals_tim

In [6]:
df1.select("selector","nxcals_timestamp","pt100Value").show()

+--------+-------------------+------------------+
|selector|   nxcals_timestamp|        pt100Value|
+--------+-------------------+------------------+
|    null|1497052884872473000|      108.57187849|
|    null|1497053192139704000|108.11673318999999|
|    null|1497053290561304000|      108.07121866|
|    null|1497053364977589000|      108.05604715|
|    null|1497053464999073000|      108.20776225|
|    null|1497053591426205000|      108.79945114|
|    null|1497053904294788000|108.23810526999999|
|    null|1497054133944520000|       108.4353349|
|    null|1497054209160665000|108.60222150999999|
|    null|1497054548434291000|      108.42016339|
|    null|1497054558836559000|      108.61739302|
|    null|1497054726072146000|      108.08639017|
|    null|1497054741275405000|      108.81462265|
|    null|1497054930116250000|      108.10156168|
|    null|1497055383813460000|      108.29879131|
|    null|1497055669475059000|      108.08639017|
|    null|1497055808704900000|      108.08639017|


In [7]:
df1.createOrReplaceTempView("temp")

In [8]:
df2 = spark.sql("SELECT acqStamp, voltage_18V, current_18V FROM temp ")
df2.toPandas()

Unnamed: 0,acqStamp,voltage_18V,current_18V
0,1497052884872473000,19.981064,39.828047
1,1497053192139704000,19.976493,40.071830
2,1497053290561304000,19.984111,39.873756
3,1497053364977589000,19.984111,39.888993
4,1497053464999073000,19.987158,40.026121
5,1497053591426205000,19.984111,40.330850
6,1497053904294788000,,40.285140
7,1497054133944520000,19.944496,39.797574
8,1497054209160665000,19.985635,39.888993
9,1497054548434291000,20.013060,39.873756


Now a time series plot with matplotlib.

In [9]:
import matplotlib
%matplotlib notebook



In [10]:
tgm_query = DevicePropertyQueryBuilder().system("CMW").device("CPS.TGM").property("FULL-TELEGRAM.STRC") \
    .start_time("2017-06-10 00:00:00.000").end_time("2017-06-11 00:00:00.000").build_query()

tgm_df = spark.read.options(**tgm_query.get_map()).format("cern.accsoft.nxcals.data.access.api").load()


In [11]:
tgm_df.createOrReplaceTempView("tgm")

In [12]:
query2 = DevicePropertyQueryBuilder().system("CMW").device("PR.QTRJ-DB-A").property("Acquisition") \
    .start_time("2017-06-10 00:00:00.000").end_time("2017-06-11 00:00:00.000") \
    .build_query()
    
data2 = spark.read.options(**query2.get_map()).format("cern.accsoft.nxcals.data.access.api").load()
  
data2.printSchema()

root
 |-- acqStamp: long (nullable = true)
 |-- nxcals_entity_id: long (nullable = true)
 |-- current_unit: string (nullable = true)
 |-- current_max: float (nullable = true)
 |-- cyclestamp: long (nullable = true)
 |-- acq_2: float (nullable = true)
 |-- current_tolr: float (nullable = true)
 |-- __record_version__: long (nullable = true)
 |-- current: float (nullable = true)
 |-- current_min: float (nullable = true)
 |-- property: string (nullable = true)
 |-- nxcals_timestamp: long (nullable = true)
 |-- current_status: integer (nullable = true)
 |-- selector: string (nullable = true)
 |-- current_tola: float (nullable = true)
 |-- class: string (nullable = true)
 |-- device: string (nullable = true)



In [13]:
data2.count()

54535

In [14]:
tgm_df_dump = tgm_df.select("cyclestamp", "USER","DEST").where("DEST = 'PS_DUMP' AND USER != 'ZERO'")
tgm_df_dump.show()

+-------------------+----+-------+
|         cyclestamp|USER|   DEST|
+-------------------+----+-------+
|1497085612300000000|  AD|PS_DUMP|
|1497090597100000000|  AD|PS_DUMP|
|1497082643500000000|  AD|PS_DUMP|
|1497082895500000000|  AD|PS_DUMP|
|1497084150700000000|  AD|PS_DUMP|
|1497087124300000000|  AD|PS_DUMP|
|1497090395500000000|  AD|PS_DUMP|
|1497082744300000000|  AD|PS_DUMP|
|1497085813900000000|  AD|PS_DUMP|
|1497084453100000000|  AD|PS_DUMP|
|1497084805900000000|  AD|PS_DUMP|
|1497083596300000000|  AD|PS_DUMP|
|1497084604300000000|  AD|PS_DUMP|
|1497085511500000000|  AD|PS_DUMP|
|1497085561900000000|  AD|PS_DUMP|
|1497083999500000000|  AD|PS_DUMP|
|1497082996300000000|  AD|PS_DUMP|
|1497085259500000000|  AD|PS_DUMP|
|1497083349100000000|  AD|PS_DUMP|
|1497081678700000000|  AD|PS_DUMP|
+-------------------+----+-------+
only showing top 20 rows



In [15]:
result_df = tgm_df_dump.join(data2.select("cyclestamp","current","device","property","current_min"),"cyclestamp")
result_df.show()

+-------------------+----+-------+---------+------------+-----------+-----------+
|         cyclestamp|USER|   DEST|  current|      device|   property|current_min|
+-------------------+----+-------+---------+------------+-----------+-----------+
|1497090597100000000|  AD|PS_DUMP|443.10712|PR.QTRJ-DB-A|Acquisition|       null|
|1497083797900000000|  AD|PS_DUMP|  442.819|PR.QTRJ-DB-A|Acquisition|       null|
|1497082693900000000|  AD|PS_DUMP| 443.3792|PR.QTRJ-DB-A|Acquisition|       null|
|1497083999500000000|  AD|PS_DUMP|443.26718|PR.QTRJ-DB-A|Acquisition|       null|
|1497086368300000000|  AD|PS_DUMP| 443.3312|PR.QTRJ-DB-A|Acquisition|       null|
|1497087124300000000|  AD|PS_DUMP|443.15512|PR.QTRJ-DB-A|Acquisition|       null|
|1497083949100000000|  AD|PS_DUMP| 443.3792|PR.QTRJ-DB-A|Acquisition|       null|
|1497084301900000000|  AD|PS_DUMP| 443.0431|PR.QTRJ-DB-A|Acquisition|       null|
|1497081174700000000|  AD|PS_DUMP|443.20316|PR.QTRJ-DB-A|Acquisition|       null|
|149708324830000

In [16]:
result_df.count()

112

In [17]:
import pyspark.sql.functions as func

result_df.agg(func.avg("current")).show()

+-----------------+
|     avg(current)|
+-----------------+
|443.2936128888811|
+-----------------+



In [18]:
from cern.accsoft.nxcals.pyquery.builders.VariableQueryBuilder import *

var = VariableQueryBuilder().variable("PR.QTRJ-DB-A:CURRENT") \
    .start_time("2017-06-10 00:00:00.000").end_time("2017-06-11 00:00:00.000").build_query()
    
var_df = spark.read.options(**var.get_map()).format('cern.accsoft.nxcals.data.access.api').load()

In [19]:
var_df.show()

+------------+--------------------+----------------+-------------------+
|nxcals_value|nxcals_variable_name|nxcals_entity_id|   nxcals_timestamp|
+------------+--------------------+----------------+-------------------+
|   443.61932|PR.QTRJ-DB-A:CURRENT|             236|1497052804300000000|
|   371.51016|PR.QTRJ-DB-A:CURRENT|             236|1497053037100000000|
|   1.6966858|PR.QTRJ-DB-A:CURRENT|             236|1497053194300000000|
|    459.9299|PR.QTRJ-DB-A:CURRENT|             236|1497053388700000000|
|   1.6806793|PR.QTRJ-DB-A:CURRENT|             236|1497055018300000000|
|     460.138|PR.QTRJ-DB-A:CURRENT|             236|1497055176700000000|
|   1.7447052|PR.QTRJ-DB-A:CURRENT|             236|1497055403500000000|
|   1.8247375|PR.QTRJ-DB-A:CURRENT|             236|1497055933900000000|
|    1.568634|PR.QTRJ-DB-A:CURRENT|             236|1497056273500000000|
|   460.23404|PR.QTRJ-DB-A:CURRENT|             236|1497056367100000000|
|   460.34607|PR.QTRJ-DB-A:CURRENT|             236

In [20]:
var_df.count()

54535

In [21]:
var_df.createOrReplaceTempView("var_data")
tgm_df_dump.createOrReplaceTempView("tgm_dump")

In [22]:
res2 = spark.sql("SELECT nxcals_variable_name as VARIABLE, nxcals_value as VALUE, t1.nxcals_timestamp as STAMP " \
                 "FROM var_data t1 JOIN tgm_dump t2 ON t1.nxcals_timestamp = t2.cyclestamp")
res2.show()

+--------------------+---------+-------------------+
|            VARIABLE|    VALUE|              STAMP|
+--------------------+---------+-------------------+
|PR.QTRJ-DB-A:CURRENT|443.10712|1497090597100000000|
|PR.QTRJ-DB-A:CURRENT|  442.819|1497083797900000000|
|PR.QTRJ-DB-A:CURRENT| 443.3792|1497082693900000000|
|PR.QTRJ-DB-A:CURRENT|443.26718|1497083999500000000|
|PR.QTRJ-DB-A:CURRENT| 443.3312|1497086368300000000|
|PR.QTRJ-DB-A:CURRENT|443.15512|1497087124300000000|
|PR.QTRJ-DB-A:CURRENT| 443.3792|1497083949100000000|
|PR.QTRJ-DB-A:CURRENT| 443.0431|1497084301900000000|
|PR.QTRJ-DB-A:CURRENT|443.20316|1497081174700000000|
|PR.QTRJ-DB-A:CURRENT|443.26718|1497083248300000000|
|PR.QTRJ-DB-A:CURRENT| 443.3152|1497085007500000000|
|PR.QTRJ-DB-A:CURRENT|443.39523|1497083898700000000|
|PR.QTRJ-DB-A:CURRENT|443.42725|1497082794700000000|
|PR.QTRJ-DB-A:CURRENT|  442.819|1497083097100000000|
|PR.QTRJ-DB-A:CURRENT|443.39523|1497084654700000000|
|PR.QTRJ-DB-A:CURRENT|443.15512|14970860155000

In [23]:
res2.count()

112

In [24]:
spark.sql("SELECT nxcals_variable_name as VARIABLE, max(nxcals_value) as VALUE " \
                 "FROM var_data t1 JOIN tgm_dump t2 ON t1.nxcals_timestamp = t2.cyclestamp " \
                 "GROUP BY nxcals_variable_name").show()

+--------------------+--------+
|            VARIABLE|   VALUE|
+--------------------+--------+
|PR.QTRJ-DB-A:CURRENT|443.6353|
+--------------------+--------+

