In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType
from pyspark.sql.functions import col, lag, from_unixtime, round
from pyspark.sql.window import Window

# Step 1: Create Spark session
spark = SparkSession.builder \
    .appName("CryptoDataProcessing") \
    .getOrCreate()

# Step 2: Define the schema
schema = StructType([
    StructField("price", DoubleType(), True),
    StructField("volume_24h", DoubleType(), True),
    StructField("volume_24h_change_24h", DoubleType(), True),
    StructField("market_cap", LongType(), True),
    StructField("market_cap_change_24h", DoubleType(), True),
    StructField("percent_change_15m", DoubleType(), True),
    StructField("percent_change_30m", DoubleType(), True),
    StructField("percent_change_1h", DoubleType(), True),
    StructField("percent_change_6h", DoubleType(), True),
    StructField("percent_change_12h", DoubleType(), True),
    StructField("percent_change_24h", DoubleType(), True),
    StructField("percent_change_7d", DoubleType(), True),
    StructField("percent_change_30d", DoubleType(), True),
    StructField("percent_change_1y", DoubleType(), True),
    StructField("symbol", StringType(), True),
    StructField("beta_value", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/23 18:10:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/23 18:10:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [12]:
import os
import re

file_list = os.listdir('data')

# Regular expression to match a year (four consecutive digits)
year_pattern = re.compile(r'\d{4}')

# Filter files that contain a year in their names
files_with_years = sorted(['data/' + file for file in file_list if year_pattern.search(file)])
files_with_years

['data/BTC-2017min.csv',
 'data/BTC-2018min.csv',
 'data/BTC-2019min.csv',
 'data/BTC-2020min.csv',
 'data/BTC-2021min.csv']

In [47]:
# Step 3: Load the CSV file into a DataFrame
df = spark.read.csv(files_with_years, header=True, inferSchema=True)

                                                                                

In [48]:
df.show(5)

+----------+-------------------+-------+--------+--------+--------+--------+----------+----------------+
|      unix|               date| symbol|    open|    high|     low|   close|Volume BTC|      Volume USD|
+----------+-------------------+-------+--------+--------+--------+--------+----------+----------------+
|1514764740|2017-12-31 23:59:00|BTC/USD|13913.28|13913.28|13867.18| 13880.0|0.59174759|    8213.4565492|
|1514764680|2017-12-31 23:58:00|BTC/USD|13913.26|13953.83|13884.69|13953.77|1.39878396|19518.3096575292|
|1514764620|2017-12-31 23:57:00|BTC/USD|13908.73|13913.26|13874.99|13913.26|0.77501206|10782.9442939156|
|1514764560|2017-12-31 23:56:00|BTC/USD| 13827.0|13908.69| 13827.0|13859.58|0.66645895|  9236.841134241|
|1514764500|2017-12-31 23:55:00|BTC/USD|13825.05|13825.05|13825.05|13825.05| 0.0655014|    905.56013007|
+----------+-------------------+-------+--------+--------+--------+--------+----------+----------------+
only showing top 5 rows



In [52]:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampType
from pyspark.sql.functions import col, sum as _sum, last, window, from_unixtime

# Step 3: Convert unix time to timestamp for resampling
df = df.withColumn("timestamp", from_unixtime(col("unix")).cast(TimestampType())).orderBy(col("timestamp"))

# Step 4: Resample the DataFrame to 5-minute intervals
df_resampled = df.groupBy(window(col("timestamp"), "5 minutes").alias("time_window")) \
    .agg(
        last("close").alias("price"),          # Get the last close price within the 5-minute window
        _sum("Volume BTC").alias("volume_BTC"), # Sum of Volume BTC for the 5-minute window
        _sum("Volume USD").alias("volume_USD")  # Sum of Volume USD for the 5-minute window
    )

# Step 5: Select the desired columns and rename window to timestamp
df_resampled = df_resampled.select(
    col("time_window.end").alias("timestamp"),  # Use the end of the time window as the timestamp
    col("price"),
    col("volume_BTC"),
    col("volume_USD")
)

# Step 6: Show the resampled DataFrame
df_resampled.orderBy(col("timestamp")).show(truncate=False)



+-------------------+------+-------------------+------------------+
|timestamp          |price |volume_BTC         |volume_USD        |
+-------------------+------+-------------------+------------------+
|2017-01-01 01:05:00|966.37|15.69737633        |15169.2452640221  |
|2017-01-01 01:10:00|966.58|0.43873245         |424.053961521     |
|2017-01-01 01:15:00|965.55|6.6617703299999995 |6434.2622184592   |
|2017-01-01 01:20:00|965.55|20.77338473        |20078.145180898802|
|2017-01-01 01:25:00|964.87|0.67092309         |647.7043203818    |
|2017-01-01 01:30:00|965.24|6.88               |6640.8512         |
|2017-01-01 01:35:00|965.24|0.0                |0.0               |
|2017-01-01 01:40:00|966.39|5.7117815          |5519.856963785    |
|2017-01-01 01:45:00|966.38|10.38096456        |10032.1113757011  |
|2017-01-01 01:50:00|966.97|34.87239491        |33706.5180215286  |
|2017-01-01 01:55:00|966.97|0.154              |148.91338000000002|
|2017-01-01 02:00:00|966.6 |0.24347781999999998|

                                                                                

In [69]:
window_spec = Window.orderBy("timestamp")

In [70]:
# Helper function to calculate percentage change
def calculate_percentage_change(current, previous):
    return (current - previous) / previous * 100

In [87]:
df_transformed = df_resampled.select(
    col("timestamp"),
    col("price"),
    col("Volume_USD"),
    # Calculate volume_24h_change_24h as the percentage change from the previous row
    round((col("Volume_USD") - lag("Volume_USD", 1).over(window_spec)) / lag("Volume_USD", 1).over(window_spec) * 100, 2)
    .alias("volume_24h_change_24h"),
        # Calculate percentage change for different intervals (15m, 30m, 1h, 6h, 12h, 24h, 7d, 30d, 1y)
    round(calculate_percentage_change(col("price"), lag("price", 3).over(window_spec)), 2).alias("percent_change_15m"),  # Assuming 1m intervals
    round(calculate_percentage_change(col("price"), lag("price", 6).over(window_spec)), 2).alias("percent_change_30m"),
    round(calculate_percentage_change(col("price"), lag("price", 12).over(window_spec)), 2).alias("percent_change_1h"),
    round(calculate_percentage_change(col("price"), lag("price", 72).over(window_spec)), 2).alias("percent_change_6h"),   # 72 mins is approx. 6h
    round(calculate_percentage_change(col("price"), lag("price", 144).over(window_spec)), 2).alias("percent_change_12h"), # 144 mins is approx. 12h
    round(calculate_percentage_change(col("price"), lag("price", 288).over(window_spec)), 2).alias("percent_change_24h"), # 288 mins is approx. 24h
    round(calculate_percentage_change(col("price"), lag("price", 2016).over(window_spec)), 2).alias("percent_change_7d"), # 2016 mins is approx. 7 days
    round(calculate_percentage_change(col("price"), lag("price", 8640).over(window_spec)), 2).alias("percent_change_30d"),# 8640 mins is approx. 30 days
    round(calculate_percentage_change(col("price"), lag("price", 105120).over(window_spec)), 2).alias("percent_change_1y"),# 105120 mins is approx. 1 year
)


In [88]:
df_transformed.show((40))

24/09/23 18:45:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/23 18:45:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/23 18:45:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/23 18:45:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/23 18:45:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------------+------+------------------+---------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+
|          timestamp| price|        Volume_USD|volume_24h_change_24h|percent_change_15m|percent_change_30m|percent_change_1h|percent_change_6h|percent_change_12h|percent_change_24h|percent_change_7d|percent_change_30d|percent_change_1y|
+-------------------+------+------------------+---------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+
|2017-01-01 01:05:00|966.37|  15169.2452640221|                 NULL|              NULL|              NULL|             NULL|             NULL|              NULL|              NULL|             NULL|              NULL|             NULL|
|2017-01-01 01:10:00|966.58|     424.053961521|     

24/09/23 18:45:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/23 18:45:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/23 18:45:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/09/23 18:45:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [75]:
# Step 5: Transform the DataFrame according to the schema
df_transformed = df_resampled.select(
    col("price"),
    col("Volume_USD"),
    # Calculate volume_24h_change_24h as the percentage change from the previous row
    round((col("Volume_USD") - lag("Volume_USD", 1).over(window_spec)) / lag("Volume_USD", 1).over(window_spec) * 100, 2)
    .alias("volume_24h_change_24h"),

    # Calculate percentage change for different intervals (15m, 30m, 1h, 6h, 12h, 24h, 7d, 30d, 1y)
    round(calculate_percentage_change(col("price"), lag("price", 3).over(window_spec)), 2).alias("percent_change_15m"),  # Assuming 1m intervals
    round(calculate_percentage_change(col("price"), lag("price", 6).over(window_spec)), 2).alias("percent_change_30m"),
    round(calculate_percentage_change(col("price"), lag("price", 12).over(window_spec)), 2).alias("percent_change_1h"),
    round(calculate_percentage_change(col("price"), lag("price", 72).over(window_spec)), 2).alias("percent_change_6h"),   # 72 mins is approx. 6h
    round(calculate_percentage_change(col("price"), lag("price", 144).over(window_spec)), 2).alias("percent_change_12h"), # 144 mins is approx. 12h
    round(calculate_percentage_change(col("price"), lag("price", 288).over(window_spec)), 2).alias("percent_change_24h"), # 288 mins is approx. 24h
    round(calculate_percentage_change(col("price"), lag("price", 2016).over(window_spec)), 2).alias("percent_change_7d"), # 2016 mins is approx. 7 days
    round(calculate_percentage_change(col("price"), lag("price", 8640).over(window_spec)), 2).alias("percent_change_30d"),# 8640 mins is approx. 30 days
    round(calculate_percentage_change(col("price"), lag("price", 105120).over(window_spec)), 2).alias("percent_change_1y"),# 105120 mins is approx. 1 year
    
    # Map the `symbol` from CSV
    col("symbol"),
    
    # Placeholder for beta_value
    col("Volume USD").alias("beta_value"),
    
    # Convert unix timestamp to actual timestamp
    from_unixtime(col("unix")).alias("timestamp")
)

# Step 6: Show the transformed DataFrame
df_transformed.show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `symbol` cannot be resolved. Did you mean one of the following? [`price`, `timestamp`, `volume_BTC`, `volume_USD`].;
'Project [price#1660, Volume_USD#1664, round((((Volume_USD#1664 - lag(Volume_USD#1664, -1, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) / lag(Volume_USD#1664, -1, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1))) * cast(100 as double)), 2) AS volume_24h_change_24h#1862, round((((price#1660 - lag(price#1660, -3, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -3, -3))) / lag(price#1660, -3, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -3, -3))) * cast(100 as double)), 2) AS percent_change_15m#1863, round((((price#1660 - lag(price#1660, -6, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -6, -6))) / lag(price#1660, -6, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -6, -6))) * cast(100 as double)), 2) AS percent_change_30m#1864, round((((price#1660 - lag(price#1660, -12, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -12, -12))) / lag(price#1660, -12, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -12, -12))) * cast(100 as double)), 2) AS percent_change_1h#1865, round((((price#1660 - lag(price#1660, -72, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -72, -72))) / lag(price#1660, -72, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -72, -72))) * cast(100 as double)), 2) AS percent_change_6h#1866, round((((price#1660 - lag(price#1660, -144, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -144, -144))) / lag(price#1660, -144, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -144, -144))) * cast(100 as double)), 2) AS percent_change_12h#1867, round((((price#1660 - lag(price#1660, -288, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -288, -288))) / lag(price#1660, -288, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -288, -288))) * cast(100 as double)), 2) AS percent_change_24h#1868, round((((price#1660 - lag(price#1660, -2016, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2016, -2016))) / lag(price#1660, -2016, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -2016, -2016))) * cast(100 as double)), 2) AS percent_change_7d#1869, round((((price#1660 - lag(price#1660, -8640, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -8640, -8640))) / lag(price#1660, -8640, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -8640, -8640))) * cast(100 as double)), 2) AS percent_change_30d#1870, round((((price#1660 - lag(price#1660, -105120, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -105120, -105120))) / lag(price#1660, -105120, null) windowspecdefinition(timestamp#1670 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -105120, -105120))) * cast(100 as double)), 2) AS percent_change_1y#1871, 'symbol, 'Volume USD AS beta_value#1872, from_unixtime('unix, yyyy-MM-dd HH:mm:ss, Some(Europe/Brussels)) AS timestamp#1873]
+- Project [time_window#1648.end AS timestamp#1670, price#1660, volume_BTC#1662, volume_USD#1664]
   +- Aggregate [window#1665], [window#1665 AS time_window#1648, last(close#1380, false) AS price#1660, sum(Volume BTC#1381) AS volume_BTC#1662, sum(Volume USD#1382) AS volume_USD#1664]
      +- Project [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(timestamp#1636, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(timestamp#1636, TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(timestamp#1636, TimestampType, LongType) - 0) % 300000000) + 300000000) ELSE ((precisetimestampconversion(timestamp#1636, TimestampType, LongType) - 0) % 300000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(timestamp#1636, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(timestamp#1636, TimestampType, LongType) - 0) % 300000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(timestamp#1636, TimestampType, LongType) - 0) % 300000000) + 300000000) ELSE ((precisetimestampconversion(timestamp#1636, TimestampType, LongType) - 0) % 300000000) END) - 0) + 300000000), LongType, TimestampType))) AS window#1665, unix#1374, date#1375, symbol#1376, open#1377, high#1378, low#1379, close#1380, Volume BTC#1381, Volume USD#1382, timestamp#1636]
         +- Filter isnotnull(timestamp#1636)
            +- Sort [timestamp#1636 ASC NULLS FIRST], true
               +- Project [unix#1374, date#1375, symbol#1376, open#1377, high#1378, low#1379, close#1380, Volume BTC#1381, Volume USD#1382, cast(from_unixtime(cast(unix#1374 as bigint), yyyy-MM-dd HH:mm:ss, Some(Europe/Brussels)) as timestamp) AS timestamp#1636]
                  +- Sort [timestamp#1555 ASC NULLS FIRST], true
                     +- Project [unix#1374, date#1375, symbol#1376, open#1377, high#1378, low#1379, close#1380, Volume BTC#1381, Volume USD#1382, cast(from_unixtime(cast(unix#1374 as bigint), yyyy-MM-dd HH:mm:ss, Some(Europe/Brussels)) as timestamp) AS timestamp#1555]
                        +- Project [unix#1374, date#1375, symbol#1376, open#1377, high#1378, low#1379, close#1380, Volume BTC#1381, Volume USD#1382, cast(from_unixtime(cast(unix#1374 as bigint), yyyy-MM-dd HH:mm:ss, Some(Europe/Brussels)) as timestamp) AS timestamp#1474]
                           +- Sort [timestamp#1439 ASC NULLS FIRST], true
                              +- Project [unix#1374, date#1375, symbol#1376, open#1377, high#1378, low#1379, close#1380, Volume BTC#1381, Volume USD#1382, cast(from_unixtime(cast(unix#1374 as bigint), yyyy-MM-dd HH:mm:ss, Some(Europe/Brussels)) as timestamp) AS timestamp#1439]
                                 +- Relation [unix#1374,date#1375,symbol#1376,open#1377,high#1378,low#1379,close#1380,Volume BTC#1381,Volume USD#1382] csv
