## Data Preprocessing

### Use Spark to get data

In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %session_type       String        Specify a session_type to be used. Supported values: streaming and etl.
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



### Setup the session

In [1]:
import boto3
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window

spark = SparkSession.builder \
    .appName("CryptoDataProcessing") \
    .getOrCreate()

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Trying to create a Glue session for the kernel.
Session Type: etl
Session ID: 2083d2d9-10b3-457d-9817-dfc9ed6da1ea
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 2083d2d9-10b3-457d-9817-dfc9ed6da1ea to get into ready status...
Session 2083d2d9-10b3-457d-9817-dfc9ed6da1ea has been created.



In [2]:
%session_id 

Current active Session ID: 2083d2d9-10b3-457d-9817-dfc9ed6da1ea


In [59]:
# Define file path to access bucket
bucket_name = "crypto-marketdata-marketdatabucket-m13jkiz8o4p7"
period = "1h"
year="2024"
btc_file_path = "s3://{}/marketdata/crypto/BTCUSDT-{}-{}-*.snappy.parquet".format(bucket_name, period, year)




In [60]:
# Load the data
btc_data = spark.read.parquet(btc_file_path)




In [29]:
btc_data.head(1)

[Row(open_time=datetime.datetime(2017, 12, 1, 1, 0), open=9685.02, high=9720.0, low=9550.0, close=9635.0, vol=279.578526, close_time=datetime.datetime(2017, 12, 1, 1, 59, 59, 999000), quote_asset_vol=2690971.25173209, number_of_trades=1488, taker_base_asset_vol=137.232568, taker_quote_asset_vol=1321250.32929446, ignore=34409.30772883, ticker='BTCUSDT')]


### Extra data

In [24]:
def calculate_sma(df, window_size):
    return df.withColumn("SMA_{}".format(window_size), F.avg("close").over(Window.orderBy("close_time").rowsBetween(-window_size + 1, 0)))




In [61]:
# Calculate Simply Moving Average (SMA) for both BTC and ETH for 10 days(240 hr) and 20 days(480 hr)

btc_data = calculate_sma(btc_data, 240)
btc_data = calculate_sma(btc_data, 480)




In [63]:
btc_data.head(1)

[Row(close_time=datetime.datetime(2024, 1, 1, 1, 59, 59, 999000), open_time=datetime.datetime(2024, 1, 1, 1, 0), open=42475.23, high=42775.0, low=42431.65, close=42613.56, vol=1196.37856, quote_asset_vol=50984893.3481416, number_of_trades=50396, taker_base_asset_vol=712.32227, taker_quote_asset_vol=30355645.3482764, ignore=0, ticker='BTCUSDT', SMA_240=42613.56, SMA_480=42613.56, RSI=None)]


In [26]:
def calculate_rsi(df, window_size):
    columns = ["close_time", "close"]
    rsi = df.select(columns)
    rsi = rsi.withColumn("delta", F.col("close") - F.lag("close").over(Window.orderBy("close_time")))
    rsi = rsi.withColumn("gain", F.when(F.col("delta") > 0, F.col("delta")).otherwise(0))
    rsi = rsi.withColumn("loss", F.when(F.col("delta") < 0, -F.col("delta")).otherwise(0))

    window = Window.orderBy("close_time").rowsBetween(-window_size + 1, 0)
    rsi = rsi.withColumn("avg_gain", F.avg("gain").over(window))
    rsi = rsi.withColumn("avg_loss", F.avg("loss").over(window))

    rsi = rsi.withColumn("rs", F.col("avg_gain") / F.col("avg_loss"))
    rsi = rsi.withColumn("RSI", 100 - (100 / (1 + F.col("rs"))))

    return df.join(rsi.select(["RSI", "close_time"]), on="close_time", how="left")




In [62]:
# Calculate Relative Strength Index (RSI) for both BTC and ETH Over 336 hr (14 days)
btc_data = calculate_rsi(btc_data, 336)




In [64]:
btc_data.head(5)

[Row(close_time=datetime.datetime(2024, 1, 1, 1, 59, 59, 999000), open_time=datetime.datetime(2024, 1, 1, 1, 0), open=42475.23, high=42775.0, low=42431.65, close=42613.56, vol=1196.37856, quote_asset_vol=50984893.3481416, number_of_trades=50396, taker_base_asset_vol=712.32227, taker_quote_asset_vol=30355645.3482764, ignore=0, ticker='BTCUSDT', SMA_240=42613.56, SMA_480=42613.56, RSI=None), Row(close_time=datetime.datetime(2024, 1, 1, 2, 59, 59, 999000), open_time=datetime.datetime(2024, 1, 1, 2, 0), open=42613.57, high=42638.41, low=42500.0, close=42581.1, vol=685.2198, quote_asset_vol=29167377.9732375, number_of_trades=29863, taker_base_asset_vol=288.98864, taker_quote_asset_vol=12301017.299762, ignore=0, ticker='BTCUSDT', SMA_240=42597.33, SMA_480=42597.33, RSI=0.0), Row(close_time=datetime.datetime(2024, 1, 1, 3, 59, 59, 999000), open_time=datetime.datetime(2024, 1, 1, 3, 0), open=42581.09, high=42586.64, low=42230.08, close=42330.49, vol=794.80391, quote_asset_vol=33709050.2741136,

In [18]:
def calculate_bollinger_bands(df, window_size, k):
    window_spec = Window.orderBy("close_time").rowsBetween(-window_size + 1, 0)

    # find Middle Bollinger Bands
    df = df.withColumn("Middle_Band", F.avg("close").over(window_spec))

    # Find standard deviation
    df = df.withColumn("STD", F.stddev("close").over(window_spec))

    # Find the Upper and Lower Bollinger Bands
    df = df.withColumn("Upper_Band", F.col("Middle_Band") + (F.col("STD") * k))
    df = df.withColumn("Lower_Band", F.col("Middle_Band") - (F.col("STD") * k))

    return df




In [65]:
# Calculate Bollinger Bands for both BTC and ETH with period = 20 days
btc_data = calculate_bollinger_bands(btc_data, 480, 2)




In [66]:
btc_data.head(1)

[Row(close_time=datetime.datetime(2024, 1, 1, 1, 59, 59, 999000), open_time=datetime.datetime(2024, 1, 1, 1, 0), open=42475.23, high=42775.0, low=42431.65, close=42613.56, vol=1196.37856, quote_asset_vol=50984893.3481416, number_of_trades=50396, taker_base_asset_vol=712.32227, taker_quote_asset_vol=30355645.3482764, ignore=0, ticker='BTCUSDT', SMA_240=42613.56, SMA_480=42613.56, RSI=None, Middle_Band=42613.56, STD=None, Upper_Band=None, Lower_Band=None)]


### Upload the processed data

In [67]:
btc_output_path = f"s3://{bucket_name}/marketdata/processed_crypto/BTCUSDT-{year}-processed.parquet"




In [68]:
btc_data.write.mode("overwrite").parquet(btc_output_path)




In [6]:
!aws s3 ls s3://crypto-marketdata-marketdatabucket-m13jkiz8o4p7/marketdata/processed_crypto --recursive

2024-11-30 15:06:13     427631 marketdata/processed_crypto/BTCUSDT-2017-processed.parquet/part-00000-d7a4f3ec-e756-48ca-a8aa-8ea2701993db-c000.snappy.parquet
2024-11-30 15:07:18    1077173 marketdata/processed_crypto/BTCUSDT-2018-processed.parquet/part-00000-878fe21e-900b-4430-a2ad-2dbd490879b3-c000.snappy.parquet
2024-11-30 15:07:49    1077146 marketdata/processed_crypto/BTCUSDT-2019-processed.parquet/part-00000-5db41e6a-3fa0-4af3-8e1e-0a6a02583f64-c000.snappy.parquet
2024-11-30 15:08:11    1087753 marketdata/processed_crypto/BTCUSDT-2020-processed.parquet/part-00000-9a461648-bdc4-4c97-ae82-cb769e491c50-c000.snappy.parquet
2024-11-30 15:08:29    1095992 marketdata/processed_crypto/BTCUSDT-2021-processed.parquet/part-00000-85957820-5333-4681-8281-018860e0fe9d-c000.snappy.parquet
2024-11-30 15:08:58    1095197 marketdata/processed_crypto/BTCUSDT-2022-processed.parquet/part-00000-0dd79008-0526-46f8-81ff-ac12bae5addb-c000.snappy.parquet
2024-11-30 15:09:30    1091682 marketdata/processed_

In [8]:
%stop_session 

Stopping session: 5c29d583-1396-4994-90fc-74ad9ce23eaa
Stopped session.
