The following 3 code boxes are used to load the dataset. The first one is used to load the API key!

In [9]:
from google.colab import files
uploaded = files.upload()

Saving kaggle.json to kaggle.json


In [16]:
import os
os.makedirs('/root/.kaggle', exist_ok=True)
os.rename('kaggle.json', '/root/.kaggle/kaggle.json')

# To protect API key
!chmod 600 /root/.kaggle/kaggle.json

FileNotFoundError: [Errno 2] No such file or directory: 'kaggle.json' -> '/root/.kaggle/kaggle.json'

In [14]:
import os
from kaggle.api.kaggle_api_extended import KaggleApi

# Initialize the Kaggle API
api = KaggleApi()
api.authenticate()

# Download the file
api.dataset_download_files('olegshpagin/crypto-coins-prices-ohlcv', path='/content/data/Bitcoin +233 Crypto Coins Prices/', unzip=True)

Dataset URL: https://www.kaggle.com/datasets/olegshpagin/crypto-coins-prices-ohlcv


In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, DoubleType, StringType
from pyspark.sql.functions import col, to_date, input_file_name, split, regexp_replace

In [17]:
spark = SparkSession.builder.appName('Crypto Coins Prices').getOrCreate()

In [19]:
# Define the schema
schema = StructType([
    StructField("pair", StringType(), True),  # This will be populated later
    StructField("datetime", TimestampType(), True),
    StructField("open", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("close", DoubleType(), True),
    StructField("volume", DoubleType(), True),
])

In [20]:
# Define your schema (replace with your actual schema)
schema = "datetime STRING, open FLOAT, high FLOAT, low FLOAT, close FLOAT, volume FLOAT"
# Read the CSV files with the defined schema
df = spark.read.csv('/content/data/Bitcoin +233 Crypto Coins Prices/M15/*.csv', header=True, schema=schema)

# Add the file name column
df = df.withColumn("file_name", input_file_name())

# Extract the actual filename without the path
df = df.withColumn("actual_filename", regexp_replace(col("file_name"), r"^.*\/(.*)$", "$1"))

# Extract the part before the first underscore
df = df.withColumn("pair", split(col("actual_filename"), "_").getItem(0))

# Select only the original datetime and the before_underscore columns
result_df = df.select("pair", "datetime", "open", "high", "low", "close", "volume")

# Show the final DataFrame with only the desired columns
result_df.show(truncate=False)

+------+-------------------+--------+--------+--------+--------+-------+
|pair  |datetime           |open    |high    |low     |close   |volume |
+------+-------------------+--------+--------+--------+--------+-------+
|ETHBTC|2017-07-14 04:00:00|0.08    |0.0864  |0.08    |0.0864  |8.752  |
|ETHBTC|2017-07-14 04:15:00|0.085289|0.086   |0.085128|0.085811|61.042 |
|ETHBTC|2017-07-14 04:30:00|0.085811|0.08638 |0.085811|0.086314|53.769 |
|ETHBTC|2017-07-14 04:45:00|0.086314|0.08638 |0.086309|0.086347|42.818 |
|ETHBTC|2017-07-14 05:00:00|0.085874|0.086205|0.084608|0.08468 |16.52  |
|ETHBTC|2017-07-14 05:15:00|0.084581|0.086196|0.084581|0.08585 |22.144 |
|ETHBTC|2017-07-14 05:30:00|0.08585 |0.086   |0.085367|0.085669|42.024 |
|ETHBTC|2017-07-14 05:45:00|0.085669|0.085957|0.085341|0.085399|33.304 |
|ETHBTC|2017-07-14 06:00:00|0.085399|0.087001|0.085398|0.086973|162.135|
|ETHBTC|2017-07-14 06:15:00|0.087097|0.087097|0.086883|0.086883|35.316 |
|ETHBTC|2017-07-14 06:30:00|0.08678 |0.087117|0.086

In [21]:
result_df.count()

25942097

In [22]:
result_df.describe().show()

+-------+------------+-------------------+-----------------+-----------------+------------------+------------------+-------------------+
|summary|        pair|           datetime|             open|             high|               low|             close|             volume|
+-------+------------+-------------------+-----------------+-----------------+------------------+------------------+-------------------+
|  count|    25942097|           25942097|         25942097|         25942097|          25942097|          25942097|           25942097|
|   mean|        NULL|               NULL|315.3085654019528|316.3616548518669|314.22169202798875|315.30973962842415| 2612941.5000165906|
| stddev|        NULL|               NULL|3018.146470177234|3028.201832675217|3007.7704022336616|3018.1641784770873|6.717937442660275E7|
|    min|1000SATSUSDT|2017-07-14 04:00:00|          6.95E-5|          6.95E-5|           6.55E-5|           6.63E-5|                0.0|
|    max|     ZRXUSDT|2024-03-14 19:15:00

Query number 1: Filter with a Single Condition
Goal: Find all records where the volume is greater than 1 million.



In [23]:
# Filter where volume is greater than 1 million
result_df.filter(result_df["volume"] > 1000000).show(10)

+-------+-------------------+-------+-------+-------+-------+-----------+
|   pair|           datetime|   open|   high|    low|  close|     volume|
+-------+-------------------+-------+-------+-------+-------+-----------+
|VETUSDT|2018-07-25 04:00:00| 0.0225|0.02489| 0.0203|0.02106|3.7407276E7|
|VETUSDT|2018-07-25 04:15:00|0.02152|0.02269|0.02147|0.02226|3.4112624E7|
|VETUSDT|2018-07-25 04:30:00|0.02235| 0.0225|0.02165|0.02246|2.8341142E7|
|VETUSDT|2018-07-25 04:45:00|0.02236| 0.0224|  0.022|  0.022| 1.129151E7|
|VETUSDT|2018-07-25 05:00:00|  0.022|0.02206| 0.0213|0.02189|1.3751765E7|
|VETUSDT|2018-07-25 05:15:00| 0.0219| 0.0219| 0.0212|0.02139|  6160434.0|
|VETUSDT|2018-07-25 05:30:00|0.02139|0.02189|0.02135|0.02177|  7470119.0|
|VETUSDT|2018-07-25 05:45:00|0.02178|0.02225|0.02178|0.02218|  8955663.0|
|VETUSDT|2018-07-25 06:00:00|0.02219|0.02229|0.02159|0.02159|1.2489702E7|
|VETUSDT|2018-07-25 06:15:00|0.02159|0.02167| 0.0215|0.02164|  5894843.0|
+-------+-------------------+-------+-


Query number 2: Filter with a Single Condition on Date


In [24]:
from pyspark.sql.functions import col

# Filter where datetime is after 2022-01-01
result_df.filter(col("datetime") > "2022-01-01").show(10)

+------+-------------------+--------+--------+--------+--------+---------+
|  pair|           datetime|    open|    high|     low|   close|   volume|
+------+-------------------+--------+--------+--------+--------+---------+
|ETHBTC|2022-01-01 00:00:00|0.079544|0.079757|0.079541|0.079672|1307.2555|
|ETHBTC|2022-01-01 00:15:00|0.079672|0.079762|0.079667|0.079737| 396.1373|
|ETHBTC|2022-01-01 00:30:00|0.079736|0.079736|  0.0796|0.079696| 485.6153|
|ETHBTC|2022-01-01 00:45:00|0.079695|0.079808|0.079682|0.079805| 603.3572|
|ETHBTC|2022-01-01 01:00:00|0.079805|0.079805| 0.07969|0.079714| 601.3664|
|ETHBTC|2022-01-01 01:15:00|0.079709|0.079866|0.079705|0.079837|1219.3774|
|ETHBTC|2022-01-01 01:30:00|0.079837|0.079886|0.079754|0.079834| 663.1523|
|ETHBTC|2022-01-01 01:45:00|0.079833|0.079841| 0.07957|0.079637| 517.7116|
|ETHBTC|2022-01-01 02:00:00|0.079628|0.079673|0.079527|0.079538|  478.672|
|ETHBTC|2022-01-01 02:15:00|0.079541|0.079677| 0.07954|0.079635| 482.1326|
+------+-----------------

Query number 3: Multiple Conditions — AND fucntion

In [26]:
# High price greater than 50000 AND volume > 2,000,000
result_df.filter((col("high") > 50000) & (col("volume") > 2000000)).show(10)

+----+--------+----+----+---+-----+------+
|pair|datetime|open|high|low|close|volume|
+----+--------+----+----+---+-----+------+
+----+--------+----+----+---+-----+------+



Query number 4: Multiple Conditions — OR

In [27]:
# Open price OR close price is less than 100
result_df.filter((col("open") < 100) | (col("close") < 100)).show(10)

+------+-------------------+--------+--------+--------+--------+-------+
|  pair|           datetime|    open|    high|     low|   close| volume|
+------+-------------------+--------+--------+--------+--------+-------+
|ETHBTC|2017-07-14 04:00:00|    0.08|  0.0864|    0.08|  0.0864|  8.752|
|ETHBTC|2017-07-14 04:15:00|0.085289|   0.086|0.085128|0.085811| 61.042|
|ETHBTC|2017-07-14 04:30:00|0.085811| 0.08638|0.085811|0.086314| 53.769|
|ETHBTC|2017-07-14 04:45:00|0.086314| 0.08638|0.086309|0.086347| 42.818|
|ETHBTC|2017-07-14 05:00:00|0.085874|0.086205|0.084608| 0.08468|  16.52|
|ETHBTC|2017-07-14 05:15:00|0.084581|0.086196|0.084581| 0.08585| 22.144|
|ETHBTC|2017-07-14 05:30:00| 0.08585|   0.086|0.085367|0.085669| 42.024|
|ETHBTC|2017-07-14 05:45:00|0.085669|0.085957|0.085341|0.085399| 33.304|
|ETHBTC|2017-07-14 06:00:00|0.085399|0.087001|0.085398|0.086973|162.135|
|ETHBTC|2017-07-14 06:15:00|0.087097|0.087097|0.086883|0.086883| 35.316|
+------+-------------------+--------+--------+-----

 Query number 5: Combined AND + OR

In [28]:
# Open < 100 AND (volume > 1 million OR high > 90000)
result_df.filter((col("open") < 100) & ((col("volume") > 1000000) | (col("high") > 90000))).show(10)


+-------+-------------------+-------+-------+-------+-------+-----------+
|   pair|           datetime|   open|   high|    low|  close|     volume|
+-------+-------------------+-------+-------+-------+-------+-----------+
|VETUSDT|2018-07-25 04:00:00| 0.0225|0.02489| 0.0203|0.02106|3.7407276E7|
|VETUSDT|2018-07-25 04:15:00|0.02152|0.02269|0.02147|0.02226|3.4112624E7|
|VETUSDT|2018-07-25 04:30:00|0.02235| 0.0225|0.02165|0.02246|2.8341142E7|
|VETUSDT|2018-07-25 04:45:00|0.02236| 0.0224|  0.022|  0.022| 1.129151E7|
|VETUSDT|2018-07-25 05:00:00|  0.022|0.02206| 0.0213|0.02189|1.3751765E7|
|VETUSDT|2018-07-25 05:15:00| 0.0219| 0.0219| 0.0212|0.02139|  6160434.0|
|VETUSDT|2018-07-25 05:30:00|0.02139|0.02189|0.02135|0.02177|  7470119.0|
|VETUSDT|2018-07-25 05:45:00|0.02178|0.02225|0.02178|0.02218|  8955663.0|
|VETUSDT|2018-07-25 06:00:00|0.02219|0.02229|0.02159|0.02159|1.2489702E7|
|VETUSDT|2018-07-25 06:15:00|0.02159|0.02167| 0.0215|0.02164|  5894843.0|
+-------+-------------------+-------+-

Task B
creating a new column called price_range, which shows the difference between the high and low prices for each row — a very common metric in financial/crypto data analysis.

In [29]:
from pyspark.sql.functions import col

# Create a new column called 'price_range' = high - low
result_df = result_df.withColumn("price_range", col("high") - col("low"))

# Show a few rows to confirm the new column is added correctly
result_df.select("pair", "datetime", "high", "low", "price_range").show(10)

+------+-------------------+--------+--------+------------+
|  pair|           datetime|    high|     low| price_range|
+------+-------------------+--------+--------+------------+
|ETHBTC|2017-07-14 04:00:00|  0.0864|    0.08| 0.006400004|
|ETHBTC|2017-07-14 04:15:00|   0.086|0.085128|8.7200105E-4|
|ETHBTC|2017-07-14 04:30:00| 0.08638|0.085811|5.6900084E-4|
|ETHBTC|2017-07-14 04:45:00| 0.08638|0.086309| 7.099658E-5|
|ETHBTC|2017-07-14 05:00:00|0.086205|0.084608|0.0015969947|
|ETHBTC|2017-07-14 05:15:00|0.086196|0.084581|0.0016149953|
|ETHBTC|2017-07-14 05:30:00|   0.086|0.085367| 6.330013E-4|
|ETHBTC|2017-07-14 05:45:00|0.085957|0.085341| 6.159991E-4|
|ETHBTC|2017-07-14 06:00:00|0.087001|0.085398|0.0016029999|
|ETHBTC|2017-07-14 06:15:00|0.087097|0.086883|2.1399558E-4|
+------+-------------------+--------+--------+------------+
only showing top 10 rows



Explanation
withColumn("price_range",) creates a new column.

col("high") - col("low") subtracts values row-by-row.

This helps visualize how volatile a crypto pair was during that period.

Task C: Perform Aggregate Functions in PySpark
We’ll run at least two aggregation queries using:

avg() for average close price
max() for maximum volume

In [30]:
from pyspark.sql.functions import avg, max, min, sum

 Query number 1 — Average Close Price for All Pairs

In [31]:
# Calculate average close price
avg_close = result_df.agg(avg("close").alias("avg_close_price"))
avg_close.show()

+------------------+
|   avg_close_price|
+------------------+
|315.30973962842415|
+------------------+



Query number 2 — Mininmum open price of the entire data set means all the pairs

In [34]:
result_df.agg(min("open").alias("min_open_price")).show()



+--------------+
|min_open_price|
+--------------+
|       6.95E-5|
+--------------+



Task D
Group by pair and calculate average close price

In [35]:
from pyspark.sql.functions import avg

# Group by trading pair and calculate average close price
avg_close_per_pair = result_df.groupBy("pair") \
    .agg(avg("close").alias("avg_close_price"))

# Show the result
avg_close_per_pair.show(10, truncate=False)

+--------+--------------------+
|pair    |avg_close_price     |
+--------+--------------------+
|VETUSDT |0.033351619857801926|
|TRXUSDT |0.05289777105559504 |
|BTCUSDT |21807.118113527038  |
|BNBUSDT |172.21060789427383  |
|ADAUSDT |0.48033505673984955 |
|ETHUSDT |1251.8066989186987  |
|ETHBTC  |0.05157799774743873 |
|XLMUSDT |0.16146420606724426 |
|LTCUSDT |96.91862814289583   |
|IOTAUSDT|0.4964321518321508  |
+--------+--------------------+
only showing top 10 rows



explanation
groupBy("pair"): groups all rows by the pair value

.agg(avg("close")): calculates the average closing price for each group

.alias(...): renames the resulting column for clarity



Task E: Top 10 Highest Volume Trades Across All Pairs

In [36]:
# Sort the DataFrame by volume in descending order
sorted_volume_df = result_df.orderBy(result_df["volume"].desc())

# Show the top 10 highest volume records
sorted_volume_df.select("pair", "datetime", "volume").show(10, truncate=False)

+------------+-------------------+-------------+
|pair        |datetime           |volume       |
+------------+-------------------+-------------+
|1000SATSUSDT|2023-12-12 12:00:00|1.24054217E11|
|1000SATSUSDT|2023-12-12 12:15:00|4.2619228E10 |
|1000SATSUSDT|2023-12-12 12:30:00|4.2222641E10 |
|1000SATSUSDT|2023-12-14 16:00:00|3.5824419E10 |
|1000SATSUSDT|2023-12-18 09:15:00|3.32747162E10|
|1000SATSUSDT|2023-12-14 01:00:00|3.27737324E10|
|1000SATSUSDT|2023-12-15 07:15:00|3.26273085E10|
|1000SATSUSDT|2023-12-15 12:15:00|3.16866376E10|
|1000SATSUSDT|2024-01-03 12:00:00|3.03175864E10|
|1000SATSUSDT|2024-03-12 01:00:00|2.90051011E10|
+------------+-------------------+-------------+
only showing top 10 rows



Explanation:
orderBy(...desc()): sorts from highest to lowest

This helps find which coins had the most trading activity during a specific time

Task F: We'll create a Left Join between:

Your existing DataFrame result_df
A second DataFrame containing the average volume per pair

In [37]:
 #Create a DataFrame with Average Volume Per Pair
 from pyspark.sql.functions import avg

avg_volume_df = result_df.groupBy("pair") \
    .agg(avg("volume").alias("avg_volume"))

Performimng a Left Join on pair

In [38]:
joined_df = result_df.join(avg_volume_df, on="pair", how="left")

# Show sample result
joined_df.select("pair", "datetime", "volume", "avg_volume").show(10, truncate=False)


+------+-------------------+-------+------------------+
|pair  |datetime           |volume |avg_volume        |
+------+-------------------+-------+------------------+
|ETHBTC|2017-07-14 04:00:00|8.752  |1674.6737597779027|
|ETHBTC|2017-07-14 04:15:00|61.042 |1674.6737597779027|
|ETHBTC|2017-07-14 04:30:00|53.769 |1674.6737597779027|
|ETHBTC|2017-07-14 04:45:00|42.818 |1674.6737597779027|
|ETHBTC|2017-07-14 05:00:00|16.52  |1674.6737597779027|
|ETHBTC|2017-07-14 05:15:00|22.144 |1674.6737597779027|
|ETHBTC|2017-07-14 05:30:00|42.024 |1674.6737597779027|
|ETHBTC|2017-07-14 05:45:00|33.304 |1674.6737597779027|
|ETHBTC|2017-07-14 06:00:00|162.135|1674.6737597779027|
|ETHBTC|2017-07-14 06:15:00|35.316 |1674.6737597779027|
+------+-------------------+-------+------------------+
only showing top 10 rows



result_df contains individual trades

avg_volume_df provides summary per pair
The Left Join adds a new column avg_volume to each record, letting you compare individual volume vs average volume

Task G Window Functions[link text](https://)
Import Required Modules

In [39]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, dense_rank, lag, lead


Query number 1: Add rank() by pair based on volume (most volume = rank 1)

In [40]:
# Define window: partition by pair, order by volume descending
window_spec_rank = Window.partitionBy("pair").orderBy(result_df["volume"].desc())

# Apply rank
ranked_df = result_df.withColumn("volume_rank", rank().over(window_spec_rank))

# Show ranked results
ranked_df.select("pair", "datetime", "volume", "volume_rank").show(10)


+--------+-------------------+-----------+-----------+
|    pair|           datetime|     volume|volume_rank|
+--------+-------------------+-----------+-----------+
|AGIXUSDT|2023-03-03 01:30:00|2.5388476E7|          1|
|AGIXUSDT|2024-02-16 09:30:00| 1.873388E7|          2|
|AGIXUSDT|2023-03-14 17:00:00|1.8614436E7|          3|
|AGIXUSDT|2023-05-25 16:00:00|1.8072368E7|          4|
|AGIXUSDT|2023-03-14 18:45:00|1.7988518E7|          5|
|AGIXUSDT|2023-06-22 10:00:00|1.6398441E7|          6|
|AGIXUSDT|2023-02-28 04:00:00|  1.61187E7|          7|
|AGIXUSDT|2023-03-14 19:15:00| 1.561941E7|          8|
|AGIXUSDT|2023-03-22 18:00:00|1.5500232E7|          9|
|AGIXUSDT|2024-02-21 22:45:00|1.5429726E7|         10|
+--------+-------------------+-----------+-----------+
only showing top 10 rows



Query number 2: Use lag() to compare close price to previous row (per pair)

In [41]:
# Define window: partition by pair, order by datetime
window_spec_lag = Window.partitionBy("pair").orderBy("datetime")

# Add lag column (previous row's close)
lag_df = result_df.withColumn("previous_close", lag("close", 1).over(window_spec_lag))

# Show result with previous close comparison
lag_df.select("pair", "datetime", "close", "previous_close").show(10)

+--------+-------------------+-------+--------------+
|    pair|           datetime|  close|previous_close|
+--------+-------------------+-------+--------------+
|AGIXUSDT|2023-02-17 08:00:00|0.43829|          NULL|
|AGIXUSDT|2023-02-17 08:15:00|0.42992|       0.43829|
|AGIXUSDT|2023-02-17 08:30:00|0.43236|       0.42992|
|AGIXUSDT|2023-02-17 08:45:00|0.43561|       0.43236|
|AGIXUSDT|2023-02-17 09:00:00|0.43194|       0.43561|
|AGIXUSDT|2023-02-17 09:15:00|0.43121|       0.43194|
|AGIXUSDT|2023-02-17 09:30:00|0.43136|       0.43121|
|AGIXUSDT|2023-02-17 09:45:00|0.43351|       0.43136|
|AGIXUSDT|2023-02-17 10:00:00|0.43262|       0.43351|
|AGIXUSDT|2023-02-17 10:15:00| 0.4336|       0.43262|
+--------+-------------------+-------+--------------+
only showing top 10 rows



Explanation: rank(): assigns volume-based rank within each pair

lag(): brings in previous row’s close price for comparison

Task H: Aggregate Window Functions
These combine aggregates (like avg, sum) with windowing, allowing you to calculate running totals, moving averages, and group-based aggregates without collapsing rows (unlike groupBy()).

In [42]:
from pyspark.sql.functions import avg, sum
from pyspark.sql.window import Window

Query number  1: Moving Average of Close Price (Per Pair Ordered by Date)

In [43]:
# Define a rolling window: current row and 2 previous rows, ordered by datetime
window_spec_avg = Window.partitionBy("pair").orderBy("datetime").rowsBetween(-2, 0)

# Add 3-row moving average column
moving_avg_df = result_df.withColumn("moving_avg_close", avg("close").over(window_spec_avg))

# Show result
moving_avg_df.select("pair", "datetime", "close", "moving_avg_close").show(10)


+--------+-------------------+-------+-------------------+
|    pair|           datetime|  close|   moving_avg_close|
+--------+-------------------+-------+-------------------+
|AGIXUSDT|2023-02-17 08:00:00|0.43829|  0.438289999961853|
|AGIXUSDT|2023-02-17 08:15:00|0.42992| 0.4341049939393997|
|AGIXUSDT|2023-02-17 08:30:00|0.43236| 0.4335233271121979|
|AGIXUSDT|2023-02-17 08:45:00|0.43561|0.43262999256451923|
|AGIXUSDT|2023-02-17 09:00:00|0.43194| 0.4333033263683319|
|AGIXUSDT|2023-02-17 09:15:00|0.43121| 0.4329199989636739|
|AGIXUSDT|2023-02-17 09:30:00|0.43136|0.43150333563486737|
|AGIXUSDT|2023-02-17 09:45:00|0.43351| 0.4320266743501027|
|AGIXUSDT|2023-02-17 10:00:00|0.43262|0.43249666690826416|
|AGIXUSDT|2023-02-17 10:15:00| 0.4336| 0.4332433342933655|
+--------+-------------------+-------+-------------------+
only showing top 10 rows



 Query number 2: Running Total of Volume (Cumulative Sum per Pair)

In [44]:
# Define cumulative window ordered by datetime per pair
window_spec_sum = Window.partitionBy("pair").orderBy("datetime").rowsBetween(Window.unboundedPreceding, 0)

# Add cumulative volume column
running_total_df = result_df.withColumn("cumulative_volume", sum("volume").over(window_spec_sum))

# Show result
running_total_df.select("pair", "datetime", "volume", "cumulative_volume").show(10)

+--------+-------------------+---------+-----------------+
|    pair|           datetime|   volume|cumulative_volume|
+--------+-------------------+---------+-----------------+
|AGIXUSDT|2023-02-17 08:00:00|3256768.0|        3256768.0|
|AGIXUSDT|2023-02-17 08:15:00|1236058.0|        4492826.0|
|AGIXUSDT|2023-02-17 08:30:00| 840131.0|        5332957.0|
|AGIXUSDT|2023-02-17 08:45:00| 377821.0|        5710778.0|
|AGIXUSDT|2023-02-17 09:00:00| 672372.0|        6383150.0|
|AGIXUSDT|2023-02-17 09:15:00| 200868.0|        6584018.0|
|AGIXUSDT|2023-02-17 09:30:00| 395115.0|        6979133.0|
|AGIXUSDT|2023-02-17 09:45:00| 434877.0|        7414010.0|
|AGIXUSDT|2023-02-17 10:00:00| 437388.0|        7851398.0|
|AGIXUSDT|2023-02-17 10:15:00| 665352.0|        8516750.0|
+--------+-------------------+---------+-----------------+
only showing top 10 rows



Explanation
rowsBetween(-2, 0) = current row and 2 previous → perfect for moving average

Window.unboundedPreceding = start from beginning → perfect for running totals
The result keeps row granularity, unlike groupBy()