In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [108]:
spark = SparkSession\
        .builder\
        .appName('bci')\
        .master('local')\
        .getOrCreate()

In [109]:
spark

## Load and transform the data

In [110]:
df = spark.read\
    .options(header=True,inferSchema='True',delimiter=',')\
    .csv("data/sub3_comp.csv")

                                                                                

In [111]:
windowSpec = Window.orderBy(F.lit(1))

In [112]:
df = df.withColumn("ms", F.row_number().over(windowSpec))

In [113]:
df = df.withColumn("step", F.ceil(df['ms']/100))

In [114]:
df.select("ms","step","thumb").show(200)

23/12/11 07:31:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/11 07:31:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/11 07:31:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/11 07:31:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/11 07:31:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+---+----+--------------------+
| ms|step|               thumb|
+---+----+--------------------+
|  1|   1|-0.41384039298833086|
|  2|   1|-0.41384039298833086|
|  3|   1|-0.41384039298833086|
|  4|   1|-0.41384039298833086|
|  5|   1|-0.41384039298833086|
|  6|   1|-0.41384039298833086|
|  7|   1|-0.41384039298833086|
|  8|   1|-0.41384039298833086|
|  9|   1|-0.41384039298833086|
| 10|   1|-0.41384039298833086|
| 11|   1|-0.41384039298833086|
| 12|   1|-0.41384039298833086|
| 13|   1|-0.41384039298833086|
| 14|   1|-0.41384039298833086|
| 15|   1|-0.41384039298833086|
| 16|   1|-0.41384039298833086|
| 17|   1|-0.41384039298833086|
| 18|   1|-0.41384039298833086|
| 19|   1|-0.41384039298833086|
| 20|   1|-0.41384039298833086|
| 21|   1|-0.41384039298833086|
| 22|   1|-0.41384039298833086|
| 23|   1|-0.41384039298833086|
| 24|   1|-0.41384039298833086|
| 25|   1|-0.41384039298833086|
| 26|   1|-0.41384039298833086|
| 27|   1|-0.41384039298833086|
| 28|   1|-0.41384039298833086|
| 29|   

In [13]:
num_files = 4000


In [14]:
df = df.repartition(num_files)

df.write.csv("data/processed_bci_csv",
             mode = "overwrite",
             header = True)

23/12/10 11:45:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/10 11:45:33 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/10 11:45:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
23/12/10 11:45:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/12/10 11:45:43 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

## Get Data Schema

In [115]:
dataSchema = df.schema

## Set up stream

In [116]:
signals = spark\
            .readStream\
            .format('csv')\
            .schema(dataSchema)\
            .option("header",True)\
            .option("maxFilesPerTrigger",1)\
            .load(r"data/processed_bci_csv/")

In [117]:
signals.isStreaming

True

In [118]:
signals.printSchema()

root
 |-- thumb: double (nullable = true)
 |-- index: double (nullable = true)
 |-- middle: double (nullable = true)
 |-- ring: double (nullable = true)
 |-- little: double (nullable = true)
 |-- channel_1: double (nullable = true)
 |-- channel_2: double (nullable = true)
 |-- channel_3: double (nullable = true)
 |-- channel_4: double (nullable = true)
 |-- channel_5: double (nullable = true)
 |-- channel_6: double (nullable = true)
 |-- channel_7: double (nullable = true)
 |-- channel_8: double (nullable = true)
 |-- channel_9: double (nullable = true)
 |-- channel_10: double (nullable = true)
 |-- channel_11: double (nullable = true)
 |-- channel_12: double (nullable = true)
 |-- channel_13: double (nullable = true)
 |-- channel_14: double (nullable = true)
 |-- channel_15: double (nullable = true)
 |-- channel_16: double (nullable = true)
 |-- channel_17: double (nullable = true)
 |-- channel_18: double (nullable = true)
 |-- channel_19: double (nullable = true)
 |-- channel_20: dou

## Create transformation

In [121]:
average_readings = signals.select([F.avg(col).alias('avg_' + col) for col in signals.columns])

In [14]:
import requests

In [15]:
def send_batch_to_http_server(batch_df, epoch_id):
    # Convert the DataFrame to a JSON string or another suitable format
    json_data = batch_df.toJSON().collect()
    
    # HTTP POST request for each record in the batch
    for record in json_data:
        response = requests.post('ttp://127.0.0.1:5000/post_data', json=record)
        # Handle the response if necessary

In [125]:
query = average_readings.writeStream\
        .outputMode("update")\
        .format("memory")\
        .queryName("avg_readings")\
        .trigger(processingTime='1000 milliseconds')\
        .start()

23/12/11 07:34:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-65bc2e65-0969-4499-aef3-fc4f55c596e8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/11 07:34:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/11 07:35:37 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 45526 milliseconds
23/12/11 07:36:20 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 43049 milliseconds
23/12/11 07:36:47 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 26673 milliseconds
23/12/11 07:37:

In [126]:
query.stop()

23/12/11 07:45:09 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 20, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@7b9c438d] is aborting.
23/12/11 07:45:09 ERROR WriteToDataSourceV2Exec: Data source write support MicroBatchWrite[epoch: 20, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@7b9c438d] aborted.
23/12/11 07:45:09 WARN TaskSetManager: Lost task 165.0 in stage 46.0 (TID 4192) (172.18.210.214 executor driver): TaskKilled (Stage cancelled: Job 24 cancelled part of cancelled job group 868169ef-e180-414c-b6e3-03c73b9793f5)


In [127]:
spark.stop()

23/12/11 07:45:53 WARN StateStore: Error running maintenance thread
java.lang.IllegalStateException: SparkEnv not active, cannot do maintenance on StateStores
	at org.apache.spark.sql.execution.streaming.state.StateStore$.doMaintenance(StateStore.scala:632)
	at org.apache.spark.sql.execution.streaming.state.StateStore$.$anonfun$startMaintenanceIfNeeded$1(StateStore.scala:610)
	at org.apache.spark.sql.execution.streaming.state.StateStore$MaintenanceTask$$anon$1.run(StateStore.scala:453)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor