# Job Performance Analytics

### Initialize Spark and Sonar Cassandra Session

In [None]:
from sonar_auth.cassandra import SonarCassandraSession
session = SonarCassandraSession(['rzsonar8'])

In [None]:
from sonar_driver.spark import analytics as analytics
from sonar_driver.spark import visuals as visuals

import pandas as pd

import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0/'
os.environ['SPARK_HOME'] = '/g/g13/wang109/spark-2.3.1-bin-hadoop2.7'
os.environ['JAVA_OPTS'] = '-Djavax.net.ssl.trustStore=/etc/pki/ca-trust/extracted/java/cacerts'
os.environ['PYSPARK_SUBMIT_ARGS'] = (
    '--master local[*] '
    '--packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0 '
    'pyspark-shell'
)

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType, StringType, TimestampType
from pyspark.sql.functions import col, lit, split, udf, explode, asc, desc

import findspark
findspark.init()

spark = (
    SparkSession.builder
        .appName('cassandra')
        .config('spark.cassandra.connection.host', session.hosts_string)
        .config('spark.cassandra.auth.username', session.username)
        .config('spark.cassandra.auth.password', session.token)
        .getOrCreate()
)

### Read job data from Cassandra and store in Spark dataframe with appropriate column types

In [None]:
sparkdf = (
    spark.read.format('org.apache.spark.sql.cassandra')
        .options(keyspace='lcstaff_k', table='jobdata')
        .load()
        .select(['JobId', 'StartTime', 'scontrol'])
        .withColumn('JobId', col('JobId').cast(IntegerType()))
        .withColumn('StartTime', col('StartTime').cast(TimestampType()))
        .withColumn('EndTime', col('scontrol')['EndTime'].cast(TimestampType()))
        .withColumn('RunTime', col('scontrol')['RunTime'])
        .withColumn('NodeList', col('scontrol')['NodeList'])
        .drop('scontrol')
)

sparkdf.show()

### Filter jobs based on time range, nodes, users

In [None]:
time_range = ['2018-05-16T07:27:21', '2018-05-17T07:27:21', 'EndTime']
nodes = ['rztopaz', 'rzgenie36', 'rztronal[10-13]', 'rzalastor[10-15,20-24]']

queried_sparkdf = analytics.query_jobs(sparkdf, time_range=time_range, nodes=nodes)
queried_sparkdf.show()

### Calculate discrete derivatives based on window size and slide length

In [None]:
derivatives = analytics.discrete_derivatives(queried_sparkdf, 'EndTime', window_size=300, slide_length=300)
derivatives.show()

### Plot discrete derivatives

In [None]:
visuals.plot_derivatives(sparkdf=derivatives, column='EndTime', window_size=300, slide_length=300)

### Calculate discrete integrals based on slide length

In [None]:
integrals = analytics.discrete_integrals(queried_sparkdf, slide_length=10)
integrals.show()

### Plot discrete integrals

In [None]:
visuals.plot_integrals(sparkdf=integrals, slide_length=10)

### Original dataframe is unaltered

In [None]:
sparkdf.show()

# Memory Allocations Analytics

### Read file of allocations

In [None]:
allocs_file = 'allocs_file.txt'

allocdf = (
    spark.read
        .format('com.databricks.spark.csv')
        .option('delimiter', ',')
        .option('header', 'false')
        .load(allocs_file)
        .toDF('address', 'size', 'alloc_time', 'free_time')
        .withColumn('size', col('size').cast(DoubleType()).cast(IntegerType()))
        .withColumn('alloc_time', col('alloc_time').cast(DoubleType()))
        .withColumn('free_time', col('free_time').cast(DoubleType()))
)

max_free_time = allocdf.agg({"free_time": "max"}).collect()[0][0]
set_free_time = udf(lambda t: max_free_time if t == 0 else t, DoubleType())
allocdf = allocdf.withColumn('free_time', set_free_time('free_time'))

In [None]:
allocdf.show()

### Calculate metrics of unpooled and pooled allocations

In [None]:
print('max_memory_unpooled:', analytics.max_memory_unpooled(allocdf))
print('max_memory_pooled:', analytics.max_memory_pooled(allocdf))
print('total_bytesecs_unpooled:', analytics.total_bytesecs_unpooled(allocdf))
print('total_bytesecs_pooled:', analytics.total_bytesecs_pooled(allocdf))

### Calculate minimum number of pools for each unique allocation size

In [None]:
pools = sorted(analytics.pool_counts(allocdf), key=lambda p: -p['count'])
pools

### Plot allocation pools

In [None]:
visuals.plot_allocs(sparkdf=allocdf)