#  <span style="font-family: Latin Modern Roman; font-size: 35px; font-weight: bold;"> TradeData Project: Spark</span>

---

## <span style="font-family: Latin Modern Roman; font-size: 25px;"> Sprint 3 (Historias de Usuario  4) </span>

In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [2]:
conf = (SparkConf()
            .setMaster("yarn")
            .set("spark.executor.cores", 5)
            .set("spark.sql.shuffle.partitions", 200)
            .set("spark.default.parallelism", 200)
            .set("spark.executor.memory", "7g")
            .set("spark.dynamicAllocation.maxExecutors", 20)
        )

spark = SparkSession \
    .builder \
    .config(conf = conf) \
    .appName("Spark") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### <span style="font-family: Latin Modern Roman; font-size: 23px;"> Silver Layer </span>

In [3]:
cryptos = [
    "BTCUSDT", "ETHUSDT", "XRPUSDT", "SOLUSDT", 
    "DOGEUSDT", "ADAUSDT", "SHIBUSDT", "DOTUSDT", 
    "AAVEUSDT", "XLMUSDT"
]

for symbol in cryptos:
    # Load data from HDFS
    df = spark.read \
              .option("header", "true") \
              .option("inferSchema", "true") \
              .csv(f"/datos/gittba/gittba04/{symbol}")

    # Transform to Parquet and Load to HDFS again
    df.write.mode("overwrite").partitionBy("year").parquet(f"/datos/gittba/gittba04/{symbol}_Silver")

print("Silver Layer Completed")

                                                                                

Silver Layer Completed


### <span style="font-family: Latin Modern Roman; font-size: 23px;"> Gold Layer </span>

In [4]:
from pyspark.sql.functions import col, avg, lit, lag, when, expr, year
from pyspark.sql.window import Window

years = [2021, 2022, 2023, 2024]

for symbol in cryptos:
    for year_value in years:

        df_current = spark.read.parquet(f"/datos/gittba/gittba04/{symbol}_Silver/year={year_value}")
        if year_value > 2021:
            try:
                df_previous = spark.read.parquet(f"/datos/gittba/gittba04/{symbol}_Silver/year={year_value-1}")
                df_previous = df_previous.orderBy(col("date").desc()).limit(200)  # Keep only the last 200 days
                df = df_previous.union(df_current)  # Merge previous & current year data
            except Exception as e:
                print(f"Warning: Could not load previous year's data for {symbol} {year_value}: {e}")
                df = df_current
        else:
            df = df_current

        df = df.orderBy("date")

        # SMA200 Calculation (Simple Moving Average)
        window_spec_200 = Window.orderBy("date").rowsBetween(-199, 0)  # 200-day rolling window
        df = df.withColumn("SMA200", avg(col("close")).over(window_spec_200))

        # EMA50 Calculation (Exponential Moving Average 50)
        window_spec_50 = Window.orderBy("date").rowsBetween(-49, 0)
        df = df.withColumn("EMA50", avg(col("close")).over(window_spec_50))

        # MACD Calculation (12-day EMA, 26-day EMA, Signal Line)
        df = df.withColumn("EMA12", avg(col("close")).over(Window.orderBy("date").rowsBetween(-11, 0)))
        df = df.withColumn("EMA26", avg(col("close")).over(Window.orderBy("date").rowsBetween(-25, 0)))
        df = df.withColumn("MACD", col("EMA12") - col("EMA26"))
        df = df.withColumn("MACD_Signal", avg(col("MACD")).over(Window.orderBy("date").rowsBetween(-8, 0)))

        # RSI Calculation (Relative Strength Index - 14)
        window_rsi = Window.orderBy("date").rowsBetween(-14, -1)
        df = df.withColumn("delta", col("close") - lag(col("close")).over(Window.orderBy("date")))
        df = df.withColumn("gain", when(col("delta") > 0, col("delta")).otherwise(lit(0)))
        df = df.withColumn("loss", when(col("delta") < 0, -col("delta")).otherwise(lit(0)))
        df = df.withColumn("avg_gain", avg(col("gain")).over(window_rsi))
        df = df.withColumn("avg_loss", avg(col("loss")).over(window_rsi))
        df = df.withColumn("RS", col("avg_gain") / col("avg_loss"))
        df = df.withColumn("RSI", 100 - (100 / (1 + col("RS"))))

        # Select only relevant columns
        df = df.select("date", "SMA200", "EMA50", "RSI", "MACD")

        # Add 'year' column before filtering**
        df = df.withColumn("year", year(col("date")))

        # Filter only the correct year's data
        df = df.filter(col("year") == year_value)

        # Store results in Gold Layer
        output_path = f"/datos/gittba/gittba04/{symbol}_Gold"
        df.write.mode("append").partitionBy("year").parquet(output_path)

print("Gold Layer Completed")


                                                                                

Gold Layer Completed


In [5]:
# Load the Gold Layer Parquet for all years
df_btcusdt_gold = spark.read.parquet("/datos/gittba/gittba04/BTCUSDT_Gold")

# Show the first 50 rows
df_btcusdt_gold.show()

# Print the schema to verify column types
df_btcusdt_gold.printSchema()

+-------------------+------------------+------------------+------------------+-------------------+----+
|               date|            SMA200|             EMA50|               RSI|               MACD|year|
+-------------------+------------------+------------------+------------------+-------------------+----+
|2024-01-01 01:00:00| 32026.42679999998| 40620.74779999999| 55.35557511757836| 219.40762820511736|2024|
|2024-01-02 01:00:00|32119.436349999978| 40790.42739999999| 58.36250432756507| 245.45769230768929|2024|
|2024-01-03 01:00:00| 32201.07754999998| 40936.30819999999| 64.08089243162314| 202.79762820513133|2024|
|2024-01-04 01:00:00| 32290.13319999998|        41062.1662| 45.95945821323388| 223.38237179486168|2024|
|2024-01-05 01:00:00|32376.636999999984|        41221.7982| 51.27941595302999|   305.839615384597|2024|
|2024-01-06 01:00:00|32454.938649999982| 41368.88620000001|  50.7856977747711| 234.10211538460862|2024|
|2024-01-07 01:00:00|32524.614299999987|41516.104600000006| 51.1

In [6]:
df_gold = spark.read.parquet("/datos/gittba/gittba04/BTCUSDT_Gold")
row_count = df_gold.count()

print(f"Total rows in BTCUSDT_Gold: {row_count}")

Total rows in BTCUSDT_Gold: 1461
