In [1]:
##################################################
## SET ENV. PARAMETERS & CREATE INIT. SESSIONS  ##
##################################################

import os
import sys

os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda-4.4.0/bin/python"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-oracle/jre"
os.environ["SPARK_CLASSPATH"] = "/home/big-dama/pavol/moa-release-2018.6.0/lib/moa.jar"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# distributed mode - only "--deploy-mode client" is available in jupyter, you have to run the the script byt "submit spark" in order to have "--deploy-mode cluster"
# os.environ['PYSPARK_SUBMIT_ARGS'] = "--master yarn --deploy-mode client --jars /home/big-dama/pavol/sparkstreaming/elasticsearch-hadoop-6.3.2.jar --driver-class-path /home/big-dama/pavol/sparkstreaming/elasticsearch-hadoop-6.3.2.jar pyspark-shell"
# local testing mode
os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] --jars /home/big-dama/pavol/sparkstreaming/elasticsearch-hadoop-6.3.2.jar --driver-class-path /home/big-dama/pavol/sparkstreaming/elasticsearch-hadoop-6.3.2.jar pyspark-shell"

print("starting")

from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()#.set("spark.jars", "/home/big-dama/pavol/moa-release-2018.6.0/lib/moa.jar")
conf.setAppName('dev_n_testing')
sc = SparkContext(conf=conf)

print("done with startup")

starting
done with startup


In [3]:
######################
## DATASET LOADING  ##
######################

from scipy.io import arff
from pyspark.sql import SQLContext
import pandas as pd
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *

#loadarff creates scipy record array, [0] - data, [1] - labels; in the following I extract labels and everything except labels
data = arff.loadarff('/home/big-dama/pavol/mawi_datasets/mawi_data_1_long.arff')
pd_df = pd.DataFrame(data=data[0], columns=data[1])
pd_df.label = pd_df.label.astype(float)

#convert pandas data frame to spark dataframe
sqlContext = SQLContext(sc)
spark_df = sqlContext.createDataFrame(pd_df)

#create features column for further machine learning processing
assembler = VectorAssembler(inputCols=list(pd_df)[:-1], outputCol="features")
spark_df_final = assembler.transform(spark_df)
spark_df_final.select("features", "label").show(n=1,truncate=True)
#the label column is detected as string and needs to be change to doubletype for further processing
spark_df_final = spark_df_final.withColumn("label", spark_df_final["label"].cast(DoubleType()))

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[3.6619E7,86533.0...|  0.0|
+--------------------+-----+
only showing top 1 row



In [4]:
##################
## KMEANS BATCH ##
##################


from pyspark.ml.clustering import KMeans

(trainingData, testData) = spark_df_final.randomSplit([0.8, 0.2])

kmeans = KMeans(k=2, seed=2, initMode="random")
model = kmeans.fit(trainingData.select("features"))

predictions = model.transform(testData.select("features"))

wssse = model.computeCost(trainingData.select("features"))
print("Within Set Sum of Squared Errors = " + str(wssse))

centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

predictions.select("features", "prediction").show()

Within Set Sum of Squared Errors = 5.79140325166e+24
Cluster Centers: 
[  6.50167161e+07   1.24757310e+05   6.00000000e+01   5.16285419e+02
   1.51400000e+03   4.11933226e+05   6.41333806e+02   6.00000000e+01
   6.00000000e+01   6.00000000e+01   6.00000000e+01   6.00000000e+01
   6.00000000e+01   6.00129032e+01   6.80000000e+01   1.36383226e+03
   1.51352258e+03   1.51400000e+03   1.51400000e+03   1.51400000e+03
   3.65824065e-01   7.23870968e+00   1.00000000e+00   6.04364710e+00
   6.67096774e+01   2.86910194e+01   5.12838710e+00   1.00000000e+00
   1.00000000e+00   1.00000000e+00   1.00000000e+00   1.19354839e+00
   3.61290323e+00   4.25806452e+00   6.00000000e+00   6.00000000e+00
   7.84516129e+00   1.71935484e+01   1.90903226e+01   2.45741935e+01
   4.13891548e-01   2.79290323e+01   4.58255548e+02   2.13845806e+03
   3.97764129e+05   6.30037742e+02   3.19483871e+01   3.20000000e+01
   3.20000000e+01   3.20000000e+01   3.24387097e+01   3.65677419e+01
   3.74516129e+01   5.24903226e+

+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[2.7612E7,77582.0...|         1|
|[2.9189E7,78524.0...|         1|
|[2.9621E7,75075.0...|         1|
|[3.0465E7,76962.0...|         1|
|[3.0801E7,69624.0...|         1|
|[3.0902E7,76576.0...|         1|
|[3.1039E7,76399.0...|         1|
|[3.1099E7,81844.0...|         1|
|[3.1692E7,76181.0...|         1|
|[3.2057E7,82251.0...|         1|
|[3.2448E7,73984.0...|         1|
|[3.2525E7,77413.0...|         1|
|[3.2741E7,81946.0...|         1|
|[3.2795E7,77418.0...|         1|
|[3.2828E7,78132.0...|         1|
|[3.2981E7,82120.0...|         1|
|[3.3035E7,80597.0...|         1|
|[3.3174E7,80055.0...|         1|
|[3.3185E7,79492.0...|         1|
|[3.3414E7,82803.0...|         1|
+--------------------+----------+
only showing top 20 rows



In [5]:
######################
## KMEANS STREAMING ##
######################

from pyspark.mllib.linalg import Vectors

from pyspark.mllib.clustering import StreamingKMeans
from pyspark.streaming import StreamingContext

(trainingData, testData) = spark_df_final.randomSplit([0.8, 0.2])

#sc = SparkContext()
ssc = StreamingContext(sc, 1)

# from https://stackoverflow.com/questions/36142973/how-to-convert-type-row-into-vector-to-feed-to-the-kmeans/43666133#43666133
# Turn your dataframe into a dense vector RDD. This step is the one 
# that had me banging my head against a wall. Prior answers above didn't 
# spell this out. When you turn your dataframe into an RDD, it is a 
# bunch of rows with vectors inside them. These need to be converted 
# into just dense vectors, outside of their row containers. This can be
# accomplished like this:

trainingDataRDD = trainingData.rdd.map(lambda row: Vectors.dense([x for x in row['features']]))
testDataRDD = testData.rdd.map(lambda row: Vectors.dense([x for x in row['features']]))

trainingDataRDD = [trainingDataRDD]
testDataRDD = [testDataRDD]

trainingStream = ssc.queueStream(trainingDataRDD)
testingStream = ssc.queueStream(testDataRDD)

model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(245, 1.0, 0)

model.trainOn(trainingStream)

result = model.predictOn(testingStream)
result.pprint()

ssc.start()
ssc.stop(stopSparkContext=False, stopGraceFully=True)

print("Final centers: " + str(model.latestModel().centers))

-------------------------------------------
Time: 2018-08-07 15:19:34
-------------------------------------------
1
1
1
1
1
1
1
1
1
1
...

-------------------------------------------
Time: 2018-08-07 15:19:35
-------------------------------------------

-------------------------------------------
Time: 2018-08-07 15:19:36
-------------------------------------------

Final centers: [[  1.76405235e+00   4.00157208e-01   9.78737984e-01   2.24089320e+00
    1.86755799e+00  -9.77277880e-01   9.50088418e-01  -1.51357208e-01
   -1.03218852e-01   4.10598502e-01   1.44043571e-01   1.45427351e+00
    7.61037725e-01   1.21675016e-01   4.43863233e-01   3.33674327e-01
    1.49407907e+00  -2.05158264e-01   3.13067702e-01  -8.54095739e-01
   -2.55298982e+00   6.53618595e-01   8.64436199e-01  -7.42165020e-01
    2.26975462e+00  -1.45436567e+00   4.57585173e-02  -1.87183850e-01
    1.53277921e+00   1.46935877e+00   1.54947426e-01   3.78162520e-01
   -8.87785748e-01  -1.98079647e+00  -3.47912149e-01   1

In [7]:
###########################
## ML MODEL PREDICTIONS  ##
###########################

from datetime import datetime
from pyspark.ml.classification import DecisionTreeClassifier

# Split te data to training/testing
(trainingData, testData) = spark_df_final.randomSplit([0.8, 0.2])

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=3)
start_training = datetime.now()

# Train model with Training Data
dtModel = dt.fit(trainingData)

end_training = datetime.now()
print ("training time : " + str(end_training - start_training))
print "numNodes = ", dtModel.numNodes
print "depth = ", dtModel.depth

start_test = datetime.now()
predictions = dtModel.transform(testData)
end_test = datetime.now()
print ("testing time : " + str(end_test - start_test))

# Predictions check
selected = predictions.select("label", "prediction", "probability")
selected.show()  

training time : 0:00:20.396457
numNodes =  15
depth =  3
testing time : 0:00:00.496904
+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.63333333333333...|
|  1.0|       1.0|[0.16641679160419...|
|  1.0|       1.0|[0.16641679160419...|
|  0.0|       1.0|[0.37106918238993...|
|  0.0|       1.0|[0.37106918238993...|
|  0.0|       0.0|[0.63333333333333...|
|  1.0|       0.0|[0.63333333333333...|
|  0.0|       0.0|[0.63333333333333...|
|  1.0|       1.0|[0.16641679160419...|
|  0.0|       0.0|[0.63333333333333...|
|  1.0|       1.0|[0.37106918238993...|
|  0.0|       0.0|[0.63333333333333...|
|  0.0|       0.0|[0.63333333333333...|
|  0.0|       1.0|[0.37106918238993...|
|  0.0|       0.0|[0.63333333333333...|
|  1.0|       0.0|[0.63333333333333...|
|  0.0|       0.0|[0.63333333333333...|
|  0.0|       1.0|[0.37106918238993...|
|  0.0|       0.0|[0.63333333333333...|
|  0.0|       0.0|[0.633333333333

In [8]:
################################
## 10-CV ML MODEL PREDICTIONS ##
################################

from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator()
#evaluator.evaluate(predictions)

# Create ParamGrid for Cross Validation
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1,2])
             .build())

# Create 10-fold CrossValidator
cv = CrossValidator(estimator=dt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=10)

# Run cross validations
cvModel = cv.fit(trainingData)
predictions = cvModel.transform(testData)

selected = predictions.select("label", "prediction", "probability")
selected.show()  

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.64330543933054...|
|  1.0|       1.0|[0.11358024691358...|
|  1.0|       1.0|[0.11358024691358...|
|  0.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  1.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  1.0|       1.0|[0.11358024691358...|
|  0.0|       0.0|[0.64330543933054...|
|  1.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  1.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
|  0.0|       0.0|[0.64330543933054...|
+-----+----------+--------------------+
only showing top 20 rows



In [9]:
########################
## EVALUATION METRICS ##
########################

# Metrics calculation is only included in former RDD API - dataFrame needs to be converted to RDD
from pyspark.mllib.evaluation import BinaryClassificationMetrics, MulticlassMetrics
# prediction or probability?
predictionAndLabels = predictions.select("prediction","label")

metrics_bin = BinaryClassificationMetrics(predictionAndLabels.rdd)
metrics_multi = MulticlassMetrics(predictionAndLabels.rdd)

# Binary metrics
#	.areaUnderPR - Computes the area under the precision-recall curve.
#	.areaUnderROC - Computes the area under the receiver operating characteristic (ROC) curve.
#	.unpersist() - Unpersists intermediate RDDs used in the computation.
print("AUC : " + str(metrics_bin.areaUnderROC))
# Multiclass metrics
#	.accuracy - Returns accuracy (equals to the total number of correctly classified instances out of the total number of instances).
#	.confusionMatrix() - Returns confusion matrix: predicted classes are in columns, they are ordered by class label ascending, as in “labels”.
#	.fMeasure(label=None, beta=None) - Returns f-measure or f-measure for a given label (category) if specified.
#	.falsePositiveRate(label) - Returns false positive rate for a given label (category).
#	.precision(label=None) - Returns precision or precision for a given label (category) if specified.
#	.recall(label=None) - Returns recall or recall for a given label (category) if specified.
#	.truePositiveRate(label) - Returns true positive rate for a given label (category).
#	.weightedFMeasure(beta=None) - Returns weighted averaged f-measure.
#	.weightedFalsePositiveRate - Returns weighted false positive rate.
#	.weightedPrecision - Returns weighted averaged precision.
#	.weightedRecall - Returns weighted averaged recall. (equals to precision, recall and f-measure)
#	.weightedTruePositiveRate - Returns weighted true positive rate. (equals to precision, recall and f-measure)
print("ACC : " + str(metrics_multi.accuracy))
print("Confustion matrix : \n" + str(metrics_multi.confusionMatrix()))

AUC : 0.738129713424
ACC : 0.741020793951
Confustion matrix : 
DenseMatrix([[ 148.,   56.],
             [  81.,  244.]])


In [None]:
## TBD ###################################
##########################################
## INITIATE STREAMING CONTEXT for Kafka ##
##########################################
# according to gudie : https://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html

from pyspark.streaming.kafka import KafkaUtils
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

In [1]:
###############################################
## STREAMING EXAMPLE                         ##
## COPIED/ADJUSTED/COMMENTED FROM SOMEWHERE? ##
## NOT SURE                                  ##
###############################################
import os
import sys

os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda-4.4.0/bin/python"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-oracle/jre"
os.environ["SPARK_CLASSPATH"] = "/home/big-dama/pavol/moa-release-2018.6.0/lib/moa.jar"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = "--master yarn pyspark-shell"

from __future__ import print_function

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

if __name__ == "__main__":
    sc = SparkContext(appName="StreamingKMeansExample")  # SparkContext
    ssc = StreamingContext(sc, 1)

    # $example on$
    # we make an input stream of vectors for training,
    # as well as a stream of vectors for testing
    def parse(lp):
        label = float(lp[lp.find('(') + 1: lp.find(')')])
        vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))

        return LabeledPoint(label, vec)

    trainingData = sc.textFile("/user/big-dama/pavol/kmeans_data.txt")\
        .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))

    testingData = sc.textFile("/user/big-dama/pavol/streaming_kmeans_data_test.txt").map(parse)
    # 'PipelinedRDD' object has no attribute 'pprint' - testingData.pprint()
    # for testing purpose
    #print(trainingData.take(2))
    #print(testingData.take(2))
    
    trainingQueue = [trainingData]
    testingQueue = [testingData]

    trainingStream = ssc.queueStream(trainingQueue)
    testingStream = ssc.queueStream(testingQueue)

    # We create a model with random clusters and specify the number of clusters to find
    model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)

    # Now register the streams for training and testing and start the job,
    # printing the predicted cluster assignments on new data points as they arrive.
    model.trainOn(trainingStream)

    result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
    result.pprint()

    ssc.start()
    ssc.stop(stopGraceFully=True)

print("Final centers: " + str(model.latestModel().centers))

-------------------------------------------
Time: 2018-08-07 15:40:42
-------------------------------------------
(1.0, 0)
(2.0, 1)

-------------------------------------------
Time: 2018-08-07 15:40:43
-------------------------------------------

Final centers: [[ 4.19486462  4.00002246  4.08267685]
 [ 2.2408932   1.86755799 -0.97727788]]


In [None]:
#########################################
##WORK IN PROGRESS - KIBANA + SPARK ML ##
#########################################

In [None]:
# you have to ru this script in terminal to send/stream contents of the mawi dataset to socket -> localhost:12345
# if your scripts succesfully listen to socket you will see following output:
#big-dama@bigdama-vworker1-phy7:~/pavol/sparkstreaming$ python stream_mawi_csv.py
#('\nListening for a client at', 'localhost', 12345)
#('\nConnected by', ('127.0.0.1', 49952))
#
#Reading file...
#
#('Sending line', '36619000,86533,60,423.18,1514,354200,595.14,60,60,60,60,60,60,60,66,807,1514,1514,1514,1514,0.38138,9,1,6.1937,89,26.069,5.1058,1,1,1,1,1,1,1,6,6,17,17,17,17,0.43022,28,370.58,44862,355240,596.02,32,32,32,32,32,32,32,52,201,1500,1500,1500,1500,0.43156,0,79.034,255,2190.7,46.804,1,46,49,51,52,53,54,59,115,120,228,245,251,0.64655,0.95671,0.043269,1258,3318,80,0.40762,80,0.26323,15,23146,65494,706740000,26585,23,80,80,80,80,80,80,873,54447,59995,62324,64103,65013,0.49774,22,27394,65468,673120000,25945,80,80,80,80,80,80,80,21554,53964,59995,64103,64103,64712,0.57312,10,2,15.715,194,16.009,4.0012,2,2,16,16,16,16,16,16,16,16,17,24,24,0.1924,1.0082,0.00014,0.004575,0.00018,0,0.042199,0.056585,0,0,523.86,1460,456810,675.88,0,0,0,0,0,0,0,0,1402,1448,1460,1460,1460,0.26715,0,13897,2097100,1358900000,36863,34,72,129,260,514,725,1031,4172,14174,64436,65450,65535,65535,0.66349,2001,699,62979,0.18476,52342,0.18476,19,28531,65486,667210000,25830,22,53,53,1044,1044,1044,1781,18327,58009,62979,62979,62979,64688,0.57283,19,25635,65476,611230000,24723,53,53,53,123,123,1044,1044,7273,52342,54756,58009,61804,64125,0.5089,9034,20371,3417100000,0.1857,2516700000,0.2287,0.26248,0.57838,0.12935,0.001098,0\r\n')

%%bash
python /home/big-dama/pavol/sparkstreaming/stream_mawi_csv.py

In [1]:
import os
import sys

os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda-4.4.0/bin/python"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-oracle/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] --jars /home/big-dama/pavol/sparkstreaming/elasticsearch-hadoop-6.3.2.jar --driver-class-path /home/big-dama/pavol/sparkstreaming/elasticsearch-hadoop-6.3.2.jar pyspark-shell"

print("starting")

from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setAppName('mawi_dataset_kibana')
sc = SparkContext(conf=conf)

print("done with startup")

starting
done with startup


In [3]:
###############################
## STATIC EXAMPLE THAT WORKS ##
###############################
##
## send the data to Kibana and I am able to find it under "testindex" index 
import json

rdd = sc.parallelize([{"key1": ["val1", "val2"]}])
final_rdd = rdd.map(json.dumps).map(lambda x: ('key', x))

es_write_conf = {
    # specify the node that we are sending data to (this should be the master)
    "es.nodes" : 'localhost',
    # specify the port in case it is not the default port
    # "es.port" : '5601',
    # specify a resource in the form 'index/doc-type'
    "es.resource" : 'testindex/testdoc',
    # is the input JSON?
    "es.input.json" : "yes",
    # is there a field in the mapping that should be used to specify the ES document ID
    #"es.mapping.id": "doc_id"
}

final_rdd.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)

In [None]:
# I tried the same approach as with previous example by use of .foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(...)), but I couldn't ge it to work
# the problem is with converting/writing DStream object to Elastic search, it says it 
# https://stackoverflow.com/questions/41385293/pyspark-write-dstream-data-to-elasticsearch-using-saveasnewapihadoopfile/50524458
# , then I suceeded with following approach:
# https://stackoverflow.com/questions/49115508/spark-streaming-write-dataframe-to-elasticsearch
# this should be structured streaming approach which is not supported in Cloudera yet? but its working?
# https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

import os
import sys

os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda-4.4.0/bin/python"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-oracle/jre"
os.environ["SPARK_HOME"] = "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] --jars /home/big-dama/pavol/sparkstreaming/elasticsearch-hadoop-6.3.2.jar --driver-class-path /home/big-dama/pavol/sparkstreaming/elasticsearch-hadoop-6.3.2.jar pyspark-shell"

print("starting")

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

print("done with startup")   

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 12345) \
    .load()

# regarding the "checkpointLocation" option
#    Py4JJavaError: An error occurred while calling o68.start.
#: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Could not determine path for the Elasticsearch commit log. Specify the commit log location by setting the [checkpointLocation] option on your DataStreamWriter. If you do not want to persist the Elasticsearch commit log in the regular checkpoint location for your streaming query then you can specify a location to store the log with [es.spark.sql.streaming.sink.log.path], or disable the commit log by setting [es.spark.sql.streaming.sink.log.enabled] to false.
        
query = lines \
.writeStream \
.outputMode("append") \
.format("org.elasticsearch.spark.sql") \
.option("checkpointLocation", "/tmp/") \
.option("es.resource", "mawitestindex/mawitestdoc") \
.option("es.nodes", "localhost") \
.start()

query.awaitTermination()

starting
done with startup
