Welcome to one of the collaborative Spark environments in ZHAW. You are not yet connected to Sparky by default. However, the necessary code template makes this a quick process. Keep in mind that you are sharing both the Jupyter environment and the Sparky cluster with others. Custom Python packages on the notebook/Spark driver side are installed with %pip install.

In [1]:
ticker = 'NESN.SW'
path = '/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/'

In [2]:
PORT = 10089

In [3]:
import os
import sparky

import pyspark
import slash
import pyspark.sql
sc = sparky.connect("sparknotebook-...", 2)
spark = pyspark.sql.SparkSession.builder.getOrCreate()

~~~ Sparky module loaded ~~~
[slash] initialised for indirect scaling
[slash] augmented context for app sparknotebook-..., cores limit 2
[slash:proxy] logging failed to 127.0.0.1
[slash] no ability to scale; proceeding at good luck


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/16 12:59:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


[slash] app running with 2 cores
Attached to Sparky cluster context from sparky-ext as sparknotebook-....
Requested 2 cores; real number might be less.


In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import socket, time

# Too high share price increase in stock
#=======================================

# This code seeks through the NYSE stock exchange data to see which
# transactions are unusually priced, as compared to the overall previous price average.

# Look at the CSV input file to understand how the data is structured

# exchange company date price_open price_high price_low price_close stock_volume price_adj_close timestamp
# NYSE	ASP	2001-12-31	12.55	12.8	12.42	12.8	11300	6.91 2018-02-14 13:16:44.550444
   
  
# 1. Read input stream from socket (by default, sockets contain raw strings, which we must then parse in a structured format)

lines = spark.readStream.format("socket")\
  .option("host", "localhost")\
  .option("port", PORT)\
  .load()

# 2. Split the lines by comma (and assign each resulting column a proper name to reflect its contents, using the alias function)
# Note that wherever needed, we must also cast the resulting column to its proper type (e.g. Float for prices, instead of the default String)
structuredStream = lines.select(\
  split(lines.value, ",")[0].cast('Float').alias("open"),\
  split(lines.value, ",")[1].cast('Float').alias("high"),\
  split(lines.value, ",")[2].cast('Float').alias("low"),\
  split(lines.value, ",")[3].cast('Float').alias("close"),\
  split(lines.value, ",")[4].cast('Float').alias("adj_close"),\
  split(lines.value, ",")[5].cast('Int').alias("volume"),\
  split(lines.value, ",")[6].cast('Timestamp').alias("id"),\
  split(lines.value, ",")[7].cast('String').alias("date"),\
  split(lines.value, ",")[8].cast('String').alias("time"))

23/06/16 12:59:28 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


In [5]:
print(structuredStream)

DataFrame[open: float, high: float, low: float, close: float, adj_close: float, volume: int, id: timestamp, date: string, time: string]


In [6]:
# Add watermark to the structuredStream DataFrame
structuredStreamWithWatermark = structuredStream.withWatermark("id", "5 minutes")

# Define the windowed stream
windowedStream = structuredStreamWithWatermark.groupBy(window("id", "5 minutes", "5 minutes"))

print(windowedStream)

<pyspark.sql.group.GroupedData object at 0x7fd1e2e9ae10>


In [7]:
from pyspark.sql.functions import min, max, first, last

agregationsStream = windowedStream.agg(
    first("adj_close").alias("open"),
    max("high").alias("high"),
    min("low").alias("low"),
    last("adj_close").alias("close")
)

print(agregationsStream)

DataFrame[window: struct<start:timestamp,end:timestamp>, open: float, high: float, low: float, close: float]


In [8]:
# 'complete' outputMode for aggregations and 'append' outputMode to get complete records

streamingETLQuery =\
agregationsStream \
  .writeStream \
  .format("memory") \
  .queryName("aggDF") \
  .outputMode("append")\
  .start()

23/06/16 12:59:30 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6fede734-20a4-4325-9348-be905d09fab9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/06/16 12:59:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [9]:
from pyspark.sql.functions import desc
display(agregationsStream)

DataFrame[window: struct<start:timestamp,end:timestamp>, open: float, high: float, low: float, close: float]

In [10]:
import math
import sys
import pandas

from pyspark.sql.functions import desc, col, avg, when
from pyspark.sql.window import Window
from pyspark.sql.functions import sum as spark_sum, lag


iter = 1
old_count = 0

#SQL statement required to get all the contents of the stream (use the name assigned earlier)
df = spark.sql("select * from aggDF")
print("Running SQL (initialization may take a while)...")

while True:

  # only start computing once some data has been collected (note that the dataframe is automatically updated by Spark)
  if(df.count() != 0):
    print('New # of rows in df is: ' + str(df.count() - old_count) + '\n')
    while(old_count == df.count()):
      # don't do anything while there is no new data
      continue
      
    # update the total count in order to be able to use the condition above in the next iteration
    old_count = df.count()
    
    print("**************************************")
    print("\nIteration no. "+ str(iter) + "\n")
    print("Sample data from streaming dataframe:\n")
    
    # set the time to be the end of the window (just to have a visual indication of time)
    df = df.withColumn("id", df.window.end)
       
    # we sort by time first, because by default there is no ordering in the streaming dataframe and we want to only show the most recent results
    # the following allows showing the most recent 5 windows (the False parameter instructs Spark not to truncate the output)
    df = df.sort(desc("id"))
    
    # Define the window specification for the moving average calculation
    #ma_window_1 = Window.partitionBy("id").orderBy(desc("id")).rowsBetween(-4, 0)
    # Calculate the moving average and add it as a new column
    #df = df.withColumn("MA5", avg(col("close")).over(ma_window_1))
    
    # Define the window specification for the moving average calculation
    #ma_window_2 = Window.partitionBy("id").orderBy(desc("id")).rowsBetween(-14, 0)
    # Calculate the moving average and add it as a new column
    #df = df.withColumn("MA15", avg(col("close")).over(ma_window_2))
    
    # Add a column indicating whether close is larger than open
    df = df.withColumn("close_larger_open", when(col("close") > col("open"), True).otherwise(False))
    # Define the window specification for counting consecutive windows where close_larger_open is True
    consecutive_window = Window.partitionBy("id").orderBy(desc("id")).rowsBetween(Window.unboundedPreceding, 0)
    # Add a column counting consecutive windows where close_larger_open is True
    df = df.withColumn("count_close_larger_open", spark_sum(when(col("close_larger_open"), 1).otherwise(0)).over(consecutive_window))
    df = df.withColumn("count_open_larger_close", spark_sum(when(col("close_larger_open"), 0).otherwise(1)).over(consecutive_window))
    
    # Calculate the lower shadow length and add it as a new column
    df = df.withColumn("upper_shadow_length", when(col("close_larger_open"), col("high") - col("close")).otherwise(col("high") - col("open")))
    # Calculate the lower shadow length and add it as a new column
    df = df.withColumn("lower_shadow_length", when(col("close_larger_open"), col("open") - col("low")).otherwise(col("close") - col("low")))

    df = df.withColumn("body_length", when(col("close_larger_open"), col("close") - col("open")).otherwise(col("open") - col("close")))
       
    # Select the columns you want to save
    # Convert the 'window' struct column to a string representation
    #df = df.withColumn("window_str", concat(
    #    to_utc_timestamp(col("window.start"), "GMT"),
    #    to_utc_timestamp(col("window.end"), "GMT")
    #))

    df.show(2, False)
        
    # Remove the original 'window' column if desired
    #df = df.drop("window")
    
    # Convert Spark DF to Pandas DF
    pandas_df = df.toPandas()
    # Extract the timestamp from the id column
    pandas_df['time_stamp'] = pandas_df['id'].apply(lambda x: x.strftime("%Y-%m-%d %H:%M:%S"))
    # Get the most recent time_stamp value
    time_stamp = pandas_df['time_stamp'].iloc[0]
    
    # Calculate a Moving Average
    pandas_df['MA5'] = pandas_df['open'].rolling(window=5).mean()
    pandas_df['MA15'] = pandas_df['open'].rolling(window=15).mean()
    
    # Reorder the columns in the Pandas DataFrame
    pandas_df = pandas_df[['window', 'open', 'high', 'low', 'close', 'id', 'MA5', 'MA15', 'close_larger_open',
                           'count_close_larger_open', 'count_open_larger_close', 'upper_shadow_length',
                           'lower_shadow_length', 'body_length', 'time_stamp']]
    
    # Drop window for readability of csv file
    pandas_df = pandas_df.drop('window', axis=1)

    # Save the file - create the directory if it doesn't exist
    stock_directory = os.path.join(path, ticker)
    if not os.path.exists(stock_directory):
        os.makedirs(stock_directory)

    file_name = '/' + str(iter) + '_' + ticker + '_window.csv'
    print('Start writing file... \n')
    print(stock_directory + file_name)
    pandas_df.to_csv(stock_directory + file_name, index=False) 
    print('File written to drive \n')

    print("Current number of windows processed in stream: "+ str(df.count()) + "\n")
    
    iter += 1

Running SQL (initialization may take a while)...


                                                                                

New # of rows in df is: 340

**************************************

Iteration no. 1

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/1_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 432



                                                                                

New # of rows in df is: 0

**************************************

Iteration no. 2

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/2_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 433

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 3

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/3_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 434

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 4

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/4_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 435

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 5

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/5_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 436

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 6

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/6_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 437



                                                                                

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 7

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/7_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 438

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 8

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/8_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 439

New # of rows in df is: 0



23/06/16 13:33:30 WARN TransportChannelHandler: Exception in connection from /103.178.229.195:58856
java.lang.IllegalArgumentException: Too large frame: 4849910940755296249
	at org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
	at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:98)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.ne

**************************************

Iteration no. 9

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/9_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 440

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 10

Sample data from streaming dataframe:

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |1

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/10_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 441

New # of rows in df is: 0



23/06/16 13:42:10 WARN TransportChannelHandler: Exception in connection from /103.178.229.175:47628
java.lang.IllegalArgumentException: Too large frame: 4849910940755296249
	at org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
	at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:98)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.ne

**************************************

Iteration no. 11

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/11_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 442



                                                                                

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 12

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/12_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 443

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 13

Sample data from streaming dataframe:



                                                                                

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |107.82|107.88|2023-06-12 09:10:00|true             |1                      |0                     

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/13_NESN.SW_window.csv
File written to drive 



                                                                                

Current number of windows processed in stream: 444

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 14

Sample data from streaming dataframe:

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |1

23/06/16 14:01:14 WARN TransportChannelHandler: Exception in connection from /185.170.144.3:64951
java.lang.IllegalArgumentException: Too large frame: 216172984696569848
	at org.sparkproject.guava.base.Preconditions.checkArgument(Preconditions.java:119)
	at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:148)
	at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:98)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty

Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/14_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 445

New # of rows in df is: 0



                                                                                

**************************************

Iteration no. 15

Sample data from streaming dataframe:

+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|window                                    |open  |high  |low   |close |id                 |close_larger_open|count_close_larger_open|count_open_larger_close|upper_shadow_length|lower_shadow_length|body_length|
+------------------------------------------+------+------+------+------+-------------------+-----------------+-----------------------+-----------------------+-------------------+-------------------+-----------+
|{2023-06-12 09:00:00, 2023-06-12 09:05:00}|107.84|107.86|107.52|107.84|2023-06-12 09:05:00|false            |0                      |1                      |0.020004272        |0.3199997          |0.0        |
|{2023-06-12 09:05:00, 2023-06-12 09:10:00}|107.82|107.9 |1

  series = series.astype(t, copy=False)


Start writing file... 

/home/ubuntu/work/abd-fs23/fuxseb01/project/stock_data/NESN.SW/15_NESN.SW_window.csv
File written to drive 

Current number of windows processed in stream: 446



                                                                                

New # of rows in df is: 0



ERROR:root:KeyboardInterrupt while sending command.0:>              (0 + 0) / 1]
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

KeyboardInterrupt: 

In [None]:
# sc.stop()