In [105]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, FloatType
from pyspark.sql.functions import mean, col, to_date
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [28]:
spark = SparkSession.builder.appName("SockStream1").getOrCreate()

In [110]:
agg_data = []

schema = StructType([
    StructField("timestamp", StringType()),
    StructField("open", FloatType()),
    StructField("high", FloatType()),
    StructField("low", FloatType()),
    StructField("close", FloatType()),
    StructField("volume", FloatType())
])



In [109]:
listt = [
    {
        '2023-11-17 11:45:00': {
            '1. open': '153.0830',
            '2. high': '153.0900',
            '3. low': '153.0050',
            '4. close': '153.0750',
            '5. volume': '23015'
        },
        '2023-11-17 11:40:00': {
            '1. open': '153.1000',
            '2. high': '153.1050',
            '3. low': '153.0100',
            '4. close': '153.0850',
            '5. volume': '33288'
        },
        '2023-11-17 11:35:00': {
            '1. open': '152.8500', 
            '2. high': '153.1500', 
            '3. low': '152.8100', 
            '4. close': '153.1100', 
            '5. volume': '25972'
        },
        '2023-11-17 11:30:00': {
            '1. open': '152.9800', 
            '2. high': '152.9800', 
            '3. low': '152.8000', 
            '4. close': '152.8500', 
            '5. volume': '21931'
        },
        '2023-11-17 11:25:00': {'1. open': '153.0000', '2. high': '153.0300', '3. low': '152.9500', '4. close': '153.0000', '5. volume': '27809'},
        '2023-11-17 11:45:00': {'1. open': '153.0830', '2. high': '153.0900', '3. low': '153.0050', '4. close': '153.0750', '5. volume': '23015'}
    }
]
rows = []
for timestamp, values in listt[0].items():
    row_values = {
        'timestamp': timestamp,
        'open': float(values['1. open']),
        'high': float(values['2. high']),
        'low': float(values['3. low']),
        'close': float(values['4. close']),
        'volume': float(values['5. volume'])
        
    }
    rows.append(row_values)
print(rows[0])
df = spark.createDataFrame(rows, schema = schema)    
    
for column in df.columns:
    df = df.withColumn(column, col(column).cast("float"))

df = df.withColumn("date", to_date(col("timestamp"), 'yyyy-MM-dd HH:mm:ss'))
df.show()

{'timestamp': '2023-11-17 11:45:00', 'open': 153.083, 'high': 153.09, 'low': 153.005, 'close': 153.075, 'volume': 23015.0}
+---------+-------+-------+-------+-------+-------+----+
|timestamp|   open|   high|    low|  close| volume|date|
+---------+-------+-------+-------+-------+-------+----+
|     NULL|153.083| 153.09|153.005|153.075|23015.0|NULL|
|     NULL|  153.1|153.105| 153.01|153.085|33288.0|NULL|
|     NULL| 152.85| 153.15| 152.81| 153.11|25972.0|NULL|
|     NULL| 152.98| 152.98|  152.8| 152.85|21931.0|NULL|
|     NULL|  153.0| 153.03| 152.95|  153.0|27809.0|NULL|
+---------+-------+-------+-------+-------+-------+----+



In [100]:
#Calculate daily return - how much did the price change in percentage from opening to closing
df = df.withColumn("daily_return", (df.close - df.open) / df.open)
df.show()

+-------+-------+-------+-------+-------+--------------------+--------------------+
|   open|   high|    low|  close| volume|intra_day_volatility|        daily_return|
+-------+-------+-------+-------+-------+--------------------+--------------------+
|153.083| 153.09|153.005|153.075|23015.0|         0.084991455|-5.22305276070733...|
|  153.1|153.105| 153.01|153.085|33288.0|          0.09500122|-9.79711890951588...|
| 152.85| 153.15| 152.81| 153.11|25972.0|          0.33999634|0.001700978059888...|
| 152.98| 152.98|  152.8| 152.85|21931.0|          0.17999268|-8.49716483552215...|
|  153.0| 153.03| 152.95|  153.0|27809.0|          0.08000183|                 0.0|
+-------+-------+-------+-------+-------+--------------------+--------------------+



In [101]:
# 3. Calculate intra-day volatility - difference between the highest and lowest prices of the day
df = df.withColumn("intra_day_volatility", df.high - df.low)
df.show()

+-------+-------+-------+-------+-------+--------------------+--------------------+
|   open|   high|    low|  close| volume|intra_day_volatility|        daily_return|
+-------+-------+-------+-------+-------+--------------------+--------------------+
|153.083| 153.09|153.005|153.075|23015.0|         0.084991455|-5.22305276070733...|
|  153.1|153.105| 153.01|153.085|33288.0|          0.09500122|-9.79711890951588...|
| 152.85| 153.15| 152.81| 153.11|25972.0|          0.33999634|0.001700978059888...|
| 152.98| 152.98|  152.8| 152.85|21931.0|          0.17999268|-8.49716483552215...|
|  153.0| 153.03| 152.95|  153.0|27809.0|          0.08000183|                 0.0|
+-------+-------+-------+-------+-------+--------------------+--------------------+



In [102]:
big_list = [[150.61, 151.61, 150.51, 150.51, 130.0],[150.61, 150.61, 150.51, 150.51, 132.0], [150.61, 150.61, 150.51, 150.51, 130.0]]

agg_df = spark.createDataFrame(agg_data, schema = schema)
print(agg_df.columns)
for dlist in big_list:
    # agg_data.append(dlist)
    temp_df = spark.createDataFrame([dlist], schema = schema)
    agg_df = agg_df.union(temp_df)
    # rows = agg_df.collect()
    # for row in rows:
    #     print(row)
    mean_value = agg_df.agg(mean("volume")).collect()[0][0]



print(mean_value)

['open', 'high', 'low', 'close', 'volume']
130.66666666666666


In [6]:
agg_df.show()

+------+------+------+------+------+
|  open|  high|   low| close|volume|
+------+------+------+------+------+
|150.61|151.61|150.51|150.51| 130.0|
|150.61|150.61|150.51|150.51| 132.0|
|150.61|150.61|150.51|150.51| 130.0|
+------+------+------+------+------+



23/12/02 02:31:34 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
