In [20]:
import pyspark
from pyspark.sql import SparkSession
import os
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as func
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf
from PIL import Image, ImageDraw, ImageFont
import io
import math
import pandas as pd
from pyspark.ml.feature import PCA, VectorAssembler, VectorSizeHint, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import from_json, decode, monotonically_increasing_id
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import functions as func


In [2]:
pravega_jars = [
    'pravega-connectors-spark-0.4.0-SNAPSHOT.jar', 
    'pravega-keycloak-credentials-0.4.0-2030.d99411b-0.0.1-020.26736d2-shadow.jar']
spark_jars_conf = ','.join([os.path.join('/usr/local/share/java', j) for j in pravega_jars])
spark = (SparkSession
             .builder
             #.appName('test1')
             .config('spark.jars', spark_jars_conf)
             .getOrCreate()
             )
spark.conf.set('spark.sql.shuffle.partitions', '1')
spark.conf.set('spark.sql.execution.arrow.enabled', 'true')

In [13]:
#controller = 'tcp://nautilus-pravega-controller.nautilus-pravega.svc.cluster.local:9090'
controller = 'tcp://127.0.0.1:9090'
scope = 'examples'
# Read a batch dataframe.
df0 = (spark
      .read
      .format("pravega")
      .option("controller", controller)
      .option("scope", scope)
#       .option("stream", "ForceStrokeStream")
       .option('stream', 'SegStream')
#       .option("encoding", "chunked_v1")
      .load()
      )

In [23]:
df1 = df0.limit(10000).cache()

In [24]:
df1.show()

+--------------------+--------+---------+----------+------+
|               event|   scope|   stream|segment_id|offset|
+--------------------+--------+---------+----------+------+
|[7B 22 70 75 6E 6...|examples|SegStream|         0|     0|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|    88|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   183|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   278|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   366|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   455|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   543|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   632|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   721|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   809|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   898|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|   986|
|[7B 22 70 75 6E 6...|examples|SegStream|         0|  1075|
|[7B 22 70 75 6E 6...|examples|SegStream

In [25]:
df = df1
df = df.withColumnRenamed('event', 'raw_event')
df = df.select('*', func.decode('raw_event', 'UTF-8').alias('event_string'))
df.select('event_string').toPandas()['event_string']



0       {"punchID":1.0,"Force":161969.2174,"eventTime"...
1       {"punchID":1.0,"Force":161956.44968000002,"eve...
2       {"punchID":1.0,"Force":161943.68196000002,"eve...
3       {"punchID":1.0,"Force":161905.3788,"eventTime"...
4       {"punchID":1.0,"Force":161930.91424,"eventTime...
5       {"punchID":1.0,"Force":161905.3788,"eventTime"...
6       {"punchID":1.0,"Force":161918.14652,"eventTime...
7       {"punchID":1.0,"Force":161918.14652,"eventTime...
8       {"punchID":1.0,"Force":161905.3788,"eventTime"...
9       {"punchID":1.0,"Force":161879.84336,"eventTime...
10      {"punchID":1.0,"Force":161905.3788,"eventTime"...
11      {"punchID":1.0,"Force":161879.84336,"eventTime...
12      {"punchID":1.0,"Force":161854.30792000002,"eve...
13      {"punchID":1.0,"Force":161879.84336,"eventTime...
14      {"punchID":1.0,"Force":161867.07564,"eventTime...
15      {"punchID":1.0,"Force":161841.54020000002,"eve...
16      {"punchID":1.0,"Force":161816.00475999998,"eve...
17      {"punc

In [26]:
    schema = 'punchID double, Force double, eventTime timestamp'
    df = df.select('*', func.from_json('event_string', schema=schema).alias('event'))
    df = df.select('*', 'event.*')
    df = df.drop('raw_event', 'event_string', 'event')
    df.show()

+--------+---------+----------+------+-------+------------------+--------------------+
|   scope|   stream|segment_id|offset|punchID|             Force|           eventTime|
+--------+---------+----------+------+-------+------------------+--------------------+
|examples|SegStream|         0|     0|    1.0|       161969.2174|51378-04-13 21:41...|
|examples|SegStream|         0|    88|    1.0|161956.44968000002|51378-04-13 21:41...|
|examples|SegStream|         0|   183|    1.0|161943.68196000002|51378-04-13 21:41...|
|examples|SegStream|         0|   278|    1.0|       161905.3788|51378-04-13 21:41...|
|examples|SegStream|         0|   366|    1.0|      161930.91424|51378-04-13 21:41...|
|examples|SegStream|         0|   455|    1.0|       161905.3788|51378-04-13 21:41...|
|examples|SegStream|         0|   543|    1.0|      161918.14652|51378-04-13 21:41...|
|examples|SegStream|         0|   632|    1.0|      161918.14652|51378-04-13 21:41...|
|examples|SegStream|         0|   721|    1

In [27]:
list_to_vector_udf = func.udf(lambda l: Vectors.dense(l), VectorUDT())
# Define an ordered window.
# Ordering by segment_id and offset maintains the order in which events were written (per routing key).
window = Window.partitionBy('punchID').orderBy('segment_id', 'offset')
feature_df = df.groupby('punchID').agg(
    # Combine Force value from all records into a single vector.
    list_to_vector_udf(func.collect_list(df['Force'])).alias('force_list'),
)
feature_df.show()

+-------+--------------------+
|punchID|          force_list|
+-------+--------------------+
|    1.0|[161969.2174,1619...|
|    2.0|[161522.347200000...|
|    3.0|[161764.93388,161...|
|    4.0|[160947.7998,1609...|
|    5.0|[162058.591440000...|
+-------+--------------------+

