In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_date, first, last, max as _max, min as _min, sum as _sum, avg, 
    input_file_name, regexp_extract, col, log, exp
)
from pyspark.sql.window import Window
import matplotlib.pyplot as plt
import pandas as pd

In [None]:
# Create Spark session with necessary JARs for S3 access
spark = SparkSession.builder \
    .appName("Read from S3") \
    .config("spark.jars.packages", 
            "org.apache.hadoop:hadoop-aws:3.3.4,"
            "com.amazonaws:aws-java-sdk-bundle:1.12.568") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
            "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
    .getOrCreate()


In [None]:
# Load Parquet data from S3

df = spark.read.parquet("s3a://cryptospark-dataset/archive/")

# Extract symbol from filename (e.g., 1INCH-BTC.parquet), and date from open_time column
df = df.withColumn("symbol", regexp_extract(input_file_name(), r"([^/]+)\.parquet$", 1)) \
    .withColumn("date", to_date("open_time"))

df.printSchema()
df.show(5)

### Basic Descriptive Statistics

In [None]:
# Summary statistics for key numeric columns
df.describe(["open", "high", "low", "close", "volume"]).show()

In [None]:
# Time Range 
df.select(_min("open_time").alias("start_date"), _max("open_time").alias("end_date")).show()

### Daily Aggregates

In [None]:
# Aggregate to daily OHLCV
daily_df = df.groupBy("date").agg(
    first("open").alias("open"),
    last("close").alias("close"),
    _max("high").alias("high"),
    _min("low").alias("low"),
    avg("close").alias("avg_close"),
    _sum("volume").alias("total_volume")
).orderBy("date")

# Cache the result for faster access in future steps (avoiding recomputation)
daily_df.cache()

In [None]:
# Repartition the data based on 'date' to improve parallelism before the window operation
repartitioned_df = daily_df.repartition(100, "date")  # Adjust the number of partitions as needed

# Define window specification: partition by 'date' (or 'symbol' if you have multiple symbols)
window_spec = Window.partitionBy("date").orderBy("date").rowsBetween(-6, 0)

# Add 7-day moving averages for 'close' and 'volume'
daily_df = repartitioned_df.withColumn("ma_close", avg("close").over(window_spec)) \
                           .withColumn("ma_volume", avg("total_volume").over(window_spec))

# Cache the results again to speed up future accesses
daily_df.cache()



In [None]:
# Check row count to ensure safe Pandas conversion
print("Daily count:", daily_df.count())

In [None]:
# To avoid memory issues, only convert the aggregated daily data to Pandas
# Check data size first to avoid running out of memory
daily_pd = daily_df.limit(2000).toPandas()  # Limit to the first 2000 rows for visualization
daily_pd["date"] = pd.to_datetime(daily_pd["date"])
daily_pd = daily_pd.sort_values("date")


In [None]:
import matplotlib.pyplot as plt

# Set up the figure layout
fig, axes = plt.subplots(5, 1, figsize=(14, 18), sharex=True)

# Plot Open Price
axes[0].plot(daily_pd["date"], daily_pd["open"], label="Open", color="blue", alpha=0.7)
axes[0].set_title("Daily Open Price")
axes[0].set_ylabel("Price")
axes[0].legend()
axes[0].grid(True)

# Plot Close Price
axes[1].plot(daily_pd["date"], daily_pd["close"], label="Close", color="green", alpha=0.7)
axes[1].set_title("Daily Close Price")
axes[1].set_ylabel("Price")
axes[1].legend()
axes[1].grid(True)

# Plot High Price
axes[2].plot(daily_pd["date"], daily_pd["high"], label="High", color="orange", alpha=0.7)
axes[2].set_title("Daily High Price")
axes[2].set_ylabel("Price")
axes[2].legend()
axes[2].grid(True)

# Plot Low Price
axes[3].plot(daily_pd["date"], daily_pd["low"], label="Low", color="red", alpha=0.7)
axes[3].set_title("Daily Low Price")
axes[3].set_ylabel("Price")
axes[3].legend()
axes[3].grid(True)

# Plot Avg Close
axes[4].plot(daily_pd["date"], daily_pd["avg_close"], label="Average Close", color="purple", linestyle="--")
axes[4].set_title("Daily Average Close Price")
axes[4].set_xlabel("Date")
axes[4].set_ylabel("Price")
axes[4].legend()
axes[4].grid(True)

plt.tight_layout()
plt.show()


In [None]:
daily_pd = daily_pd.dropna(subset=["close", "ma_close"])

# Plot: Daily Close with 7-day Moving Average
plt.figure(figsize=(12, 5))
plt.plot(daily_pd["date"], daily_pd["close"], label="Close Price", alpha=0.6)
plt.plot(daily_pd["date"], daily_pd["ma_close"], label="7-Day MA", color="red")
plt.title("Daily Close Price and 7-Day Moving Average")
plt.xlabel("Date")
plt.ylabel("Price")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
## Plot: Daily Volume with 7-day Moving Average
plt.figure(figsize=(12, 5))
plt.bar(daily_pd["date"], daily_pd["total_volume"], label="Volume", alpha=0.6)
plt.plot(daily_pd["date"], daily_pd["ma_volume"], label="7-Day MA Volume", color="orange")
plt.title("Daily Volume and 7-Day Moving Average")
plt.xlabel("Date")
plt.ylabel("Volume")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


# Preprocessing Section

In [None]:
df.show(5)

In [None]:
# Drop unnecessary columns
processed_df = df.drop("quote_asset_volume", "number_of_trades", "taker_buy_base_asset_volume", "taker_buy_quote_asset_volume", "open_time")

# Drop rows with nulls in critical columns
processed_df = processed_df.dropna(subset=["open", "high", "low", "close", "volume", "date"])

In [None]:
processed_df.rdd.getNumPartitions()

In [None]:
df.select("symbol").distinct().count()

In [None]:
processed_df = processed_df.withColumn("daily_return", (col("close") - col("open")) / col("open")) \
       .withColumn("volatility", (col("high") - col("low")) / col("open"))

# Define window specification: partition by 'symbol' and order by 'date'
processed_df = processed_df.repartition("symbol").cache()
processed_df.count()  # Forces cache population

In [None]:
processed_df.rdd.getNumPartitions()

In [None]:
# Show the first few rows of the processed DataFrame
processed_df.show(5)

In [None]:
# Save the processed DataFrame to S3 in Parquet format, partitioned by 'symbol'
processed_df.write.mode("overwrite").partitionBy("symbol").parquet("s3a://cryptospark-dataset/daily_return_volatility/")

In [None]:
# Get the processed DataFrame from S3
processed_df1 = spark.read.parquet("s3a://cryptospark-dataset/daily_return_volatility/")

# Repartition by symbol to improve parallelism for window operations
processed_df1 = processed_df1.repartition("symbol").cache()

In [None]:
# Cumulative return
#window_spec = Window.partitionBy("symbol").orderBy("date") \
    #.rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Moving Averages (7-day and 30-day)
window_7 = Window.partitionBy("symbol").orderBy("date").rowsBetween(-6, 0)
window_30 = Window.partitionBy("symbol").orderBy("date").rowsBetween(-29, 0)

# Calculate cumulative return and moving averages
processed_df1 = processed_df1.withColumn("ma_7", avg("close").over(window_7)) \
    .withColumn("ma_30", avg("close").over(window_30))
    #.withColumn("cumulative_return", exp(_sum(log(1 + col("daily_return"))).over(window_spec)) - 1) \

In [None]:
# Lag features (previous day close price, volume)
processed_df1 = processed_df1.withColumn("prev_close", lag("close", 1).over(window_7)) \
                             .withColumn("prev_volume", lag("volume", 1).over(window_7))

In [None]:
# Price change from previous day
processed_df1 = processed_df1.withColumn("price_change", col("close") - col("prev_close"))

In [None]:
# Percent change from previous day
processed_df1 = processed_df1.withColumn("percent_change", 
    ((col("close") - col("prev_close")) / col("prev_close")) * 100)

In [None]:
# Normalized volume (volume relative to 7-day average)
processed_df1 = processed_df1.withColumn("volume_normalized", 
    col("volume") / avg("volume").over(window_7))

In [None]:
# Bollinger Bands: Compute 7-day std deviation and use it with MA_7
processed_df1 = processed_df1.withColumn("stddev_7", stddev("close").over(window_7)) \
                             .withColumn("bollinger_upper", col("ma_7") + 2 * col("stddev_7")) \
                             .withColumn("bollinger_lower", col("ma_7") - 2 * col("stddev_7"))

In [None]:
# Show the final DataFrame with new features
processed_df1.show(5)

In [None]:
processed_df1 = processed_df1.repartition("symbol")

In [None]:
# Load new preprocessed data into s3
processed_df1.write.mode("overwrite").partitionBy("symbol").parquet("s3a://cryptospark-dataset/processed-data/")