In [5]:
#Isolation Forest

#Before Training Install the Spark -Iforest pacakage as mentioned in the link: https://github.com/titicaca/spark-iforest
#installation reference 2: https://towardsdatascience.com/isolation-forest-and-spark-b88ade6c63ff

#packages
import sys
from pyspark.sql import SparkSession, functions, types
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, mean, stddev,stddev_pop,avg,max,to_timestamp,udf,desc
from pyspark.sql import Window
from pyspark import SparkConf, SparkContext

from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark_iforest.ml.iforest import *
from pyspark.ml.linalg import Vectors
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number



# Configure Spark
conf = SparkConf()
conf.set('spark.jars', '/Users/varnnithavinay/spark-iforest/target/spark-iforest-2.4.0.jar')
spark = SparkSession \
  .builder \
  .appName("Python Spark SQL basic example") \
  .config("spark.memory.fraction", 0.8) \
  .config("spark.executor.memory", "25G") \
  .config("spark.driver.memory", "25G")\
  .config("spark.sql.shuffle.partitions" , "80") \
  .config("spark.memory.offHeap.enabled",'true')\
  .config("spark.memory.offHeap.size","25G")\
  .getOrCreate()
# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),6), types.DoubleType())

##OTHER FUNCTIONS

# Schema Set
# Defining the schema for tmax datasets
def sensor_schema():
    sen_schema = types.StructType([
        types.StructField('timestamp', types.StringType()),
        types.StructField('X', types.DoubleType()),
        types.StructField('Y', types.DoubleType()),
        types.StructField('Z', types.DoubleType()),
    ])
    return sen_schema
     
## Standard Scaler - Z normalization
def z_norm(temp):
    assembler = VectorAssembler(
    inputCols=["Z"],
    outputCol="Zvector")
    tempdata = assembler.transform(temp)
    scaler = StandardScaler(inputCol="Zvector", outputCol="Zscale",withMean=True, withStd=True)
    scalerModel = scaler.fit(tempdata)
    scaledData = scalerModel.transform(tempdata).withColumn("Zscale", unlist("Zscale")).drop("Zvector").cache()
    scaledData.createOrReplaceTempView("scaledData")
    return scaledData

def movingAverage(scaledData):
    movAvg = scaledData.withColumn("movingAverage", avg(scaledData["Zscale"])
             .over( Window.partitionBy(scaledData["timestamp1"]).rowsBetween(-3,3))).cache()
    return movAvg

  
#to save the model
  
temp_path = '/Users/varnnithavinay/Desktop/'
iforest_path = temp_path + "/iforest"
model_path = temp_path + "/iforest_model"

########################################################

def iforest_model(features):
    
    iforest = IForest(contamination=0.015, maxDepth=2)
    iforest.setSeed(43)  # for reproducibility

    model = iforest.fit(features)
    # Check if the model has summary or not, the newly trained model has the summary info
    print(model.hasSummary)

    #Show the number of anomalies
    summary = model.summary
    #print(summary.numAnomalies)
    
    # Predict for a new data frame based on the fitted model
    transformed = model.transform(features)

    # Save the iforest estimator into the path
    iforest.save(iforest_path)

    # Load iforest estimator from a path
    loaded_iforest = IForest.load(iforest_path)

    # Save the fitted model into the model path
    model.save(model_path)

    # Load a fitted model from a model path
    loaded_model = IForestModel.load(model_path)

    # The loaded model has no summary info
    #print(loaded_model.hasSummary)

    # Use the loaded model to predict a new data frame
    
    return loaded_model.transform(features)

    

########################################################


def main(inputs):
    #Spark read of data
    sens_schema = sensor_schema()
    #for training in the original data set use lines below:
    #temp1 = spark.read.csv(inputs, schema =sens_schema ).repartition(100)
        # Initial Filter to obtain useful Data. Other ranges are useless
    #temp.createOrReplaceTempView("temp")
    #temp = spark.sql("select timestamp,(Y*-1) as Y from temp where timestamp between '2018-07-09 12:00:00' and '2018-07-09 14:00:00'")
    #temp.show()
    #scaledData  = z_norm(temp)
    #scaledData = scaledData.withColumn("timestamp1", to_timestamp("timestamp", 'yyyy-MM-dd HH:mm:ss')).cache()
    #movAvg = movingAverage(scaledData)
    #movAvg.createOrReplaceTempView("movAvg")
        #Select only rounded values
    #scaledNorm =  spark.sql('SELECT timestamp1,Z,Zscale,round(movingAverage,0) as movingAverage_round from movAvg').cache()
    
    #training sample
    temp1 = spark.read.csv(inputs,header = True)
    temp1.createOrReplaceTempView("temp1")
    temp1.show()
    

    scaledData = temp1.withColumn("timestamp1", to_timestamp("timestamp", 'yyyy-MM-dd HH:mm:ss')).cache()
    
    movAvg = movingAverage(scaledData)
    movAvg.createOrReplaceTempView("movAvg")
    
    #Select only rounded values
    scaledNorm =  spark.sql('SELECT timestamp1,Z,Zscale,round(movingAverage,0) as movingAverage_round from movAvg').cache()
    
    #Select only rounded values
   
    print("Scaled Data\n")
    scaledNorm.show()
    scaledNorm.createOrReplaceTempView("scaledNorm") 
    
    #specify the metric column to be modelled
    data= spark.sql("select cast(movingAverage_round as decimal)  as  movingAverage_round from scaledNorm").cache()
    data.createOrReplaceTempView("data") 

    features = (VectorAssembler(inputCols=data.columns, outputCol="features").transform(data).select("features"))

    features.show()
    print(features.count())
    features.createOrReplaceTempView("features")
    
    
    #Iforest model
    model_val = iforest_model(features).cache()

    model_val.createOrReplaceTempView("model_val") 
    model_val.show()
    print("#(1.0 means anomalous/ outlier, 0.0 normal/ inlier)")
    print("Now in main\n")
    count_anomalies= spark.sql('select prediction,count(prediction) from model_val group by prediction')
    count_anomalies.show()
    
    
    to_array = functions.udf(lambda v: v.toArray().tolist(), types.ArrayType(types.FloatType()))
    test_df = model_val.withColumn('MvngAvg_feature', to_array('features'))
    test_df = test_df.withColumn("movingAverage_round", test_df["MvngAvg_feature"].getItem(0))
    test_df.createOrReplaceTempView("test_df")  
    
    test_df = spark.sql('select movingAverage_round, prediction, anomalyScore from test_df').cache()
    test_df.createOrReplaceTempView("test_df") 
    test_df.show()
    
#     test_df.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("iForest_Out")
   
    
    
if __name__ == "__main__":
    inputs = 'Lift1_sample.csv'
    main(inputs)

+--------------------+--------+---------+-------------------+
|           timestamp|       Z|   Zscale|         timestamp1|
+--------------------+--------+---------+-------------------+
|2018-07-11 09:04:...|0.785156| 0.039606|2018-07-11 09:04:00|
|2018-07-11 09:04:...| 0.78125| -0.07135|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.785156| 0.039606|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.792969| 0.261546|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.785156| 0.039606|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.773438|-0.293261|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.785156| 0.039606|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.792969| 0.261546|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.789063|  0.15059|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.777344|-0.182306|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.761719|-0.626157|2018-07-11 09:04:00|
|2018-07-11 09:04:...|    0.75|-0.959053|2018-07-11 09:04:00|
|2018-07-11 09:04:...|0.710938|-2.068667|2018-07-11 09:04:00|
|2018-07