In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

## Empezamos configurando Spark

In [2]:
conf = (SparkConf()
            .setMaster("yarn")
            .set("spark.executor.cores", 5)
            .set("spark.sql.shuffle.partitions", 200)
            .set("spark.default.parallelism", 200)
            .set("spark.executor.memory", "7g")
            .set("spark.dynamicAllocation.maxExecutors", 20)
        )

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .appName("Test_PySpark") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Empezamos el análisis con los parquets de la capa Silver

### 1) PLATA → leer Parquet

In [23]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

SILVER_PATH = "/datos/gittba26/gittba05/Silver/BNB"
GOLD_PATH   = "/datos/gittba26/gittba05/Gold/BNB_kpis"

df = spark.read.parquet(SILVER_PATH)

df = (df
      .withColumn("datetime", F.to_timestamp("datetime"))
      .withColumn("close", F.col("close").cast("double"))
      .select("datetime","close","year","month")
      .orderBy("datetime")
     )

df.show(5, truncate=False)
df.printSchema()

[Stage 26:>                                                        (0 + 5) / 48]

+-------------------+-----+----+-----+
|datetime           |close|year|month|
+-------------------+-----+----+-----+
|2022-01-01 01:00:00|527.7|2022|1    |
|2022-01-02 01:00:00|531.4|2022|1    |
|2022-01-03 01:00:00|511.8|2022|1    |
|2022-01-04 01:00:00|507.0|2022|1    |
|2022-01-05 01:00:00|474.1|2022|1    |
+-------------------+-----+----+-----+
only showing top 5 rows

root
 |-- datetime: timestamp (nullable = true)
 |-- close: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)



                                                                                

### 2) KPI 1 — SMA 200 (exacta)

In [61]:
N_SMA = 200
w_sma200 = Window.orderBy("datetime").rowsBetween(-(N_SMA-1), 0)

df = df.withColumn("SMA_200", F.avg("close").over(w_sma200))

### 3) KPI 2 — EMA 50

In [62]:
# Creamos un índice ordenado
df_idx = df.withColumn("idx", F.row_number().over(Window.orderBy("datetime")))

alpha = 2/(50+1)

# Convertimos a una lista ordenada (collect_list sobre struct idx)
arr_df = df_idx.agg(
    F.sort_array(F.collect_list(F.struct("idx","datetime","close","year","month","SMA_200"))).alias("arr")
)

In [65]:
# Calcular EMA50 recursiva con aggregate
from pyspark.sql import functions as F
from pyspark.sql.window import Window

alpha50 = 2/(50+1)

df_idx = df.withColumn("idx", F.row_number().over(Window.orderBy("datetime")))

arr_df = df_idx.agg(
    F.sort_array(
        F.collect_list(F.struct("idx","datetime","close","year","month","SMA_200"))
    ).alias("arr")
)

# acumulador tipado: cast(array() as array<double>)
ema50_expr = F.expr(f"""
aggregate(
  arr,
  cast(array() as array<double>),
  (acc, x) -> concat(
    acc,
    array(
      case
        when size(acc)=0 then x.close
        else {alpha50}*x.close + (1-{alpha50})*element_at(acc, -1)
      end
    )
  )
)
""")

out_df = (arr_df
    .select("arr", ema50_expr.alias("ema50_arr"))
    .select(
        F.posexplode("arr").alias("pos","x"),
        F.col("ema50_arr")[F.col("pos")].alias("EMA_50")
    )
    .select(
        F.col("x.datetime").alias("datetime"),
        F.col("x.close").alias("close"),
        F.col("x.SMA_200").alias("SMA_200"),
        F.col("EMA_50"),
        F.col("x.year").alias("year"),
        F.col("x.month").alias("month")
    )
    .orderBy("datetime")
)

out_df.show(5, truncate=False)

[Stage 125:>                                                        (0 + 1) / 1]

+-------------------+-----+-----------------+-----------------+----+-----+
|datetime           |close|SMA_200          |EMA_50           |year|month|
+-------------------+-----+-----------------+-----------------+----+-----+
|2022-01-01 01:00:00|527.7|527.7            |527.7            |2022|1    |
|2022-01-02 01:00:00|531.4|529.55           |527.8450980392157|2022|1    |
|2022-01-03 01:00:00|511.8|523.6333333333333|527.2158785082661|2022|1    |
|2022-01-04 01:00:00|507.0|519.4749999999999|526.4230989589223|2022|1    |
|2022-01-05 01:00:00|474.1|510.3999999999999|524.371212725239 |2022|1    |
+-------------------+-----+-----------------+-----------------+----+-----+
only showing top 5 rows



                                                                                

### 4) KPI 3 — RSI (14)

In [67]:
w_lag = Window.orderBy("datetime")
w14 = Window.orderBy("datetime").rowsBetween(-13, 0)

out_df = (out_df
    .withColumn("prev_close", F.lag("close", 1).over(w_lag))
    .withColumn("delta", F.col("close") - F.col("prev_close"))
    .withColumn("gain", F.when(F.col("delta") > 0, F.col("delta")).otherwise(F.lit(0.0)))
    .withColumn("loss", F.when(F.col("delta") < 0, -F.col("delta")).otherwise(F.lit(0.0)))
    .withColumn("avg_gain_14", F.avg("gain").over(w14))
    .withColumn("avg_loss_14", F.avg("loss").over(w14))
    .withColumn(
        "RSI_14",
        F.when(F.col("avg_loss_14") == 0, F.lit(100.0))
         .otherwise(100 - (100 / (1 + (F.col("avg_gain_14") / F.col("avg_loss_14")))))
    )
)

### 5) KPI 4 — MACD

In [68]:
# Helper para EMA recursiva exacta
def add_ema_recursive_exact(df_base, period, out_col):
    a = 2/(period+1)

    df_idx = df_base.withColumn("idx", F.row_number().over(Window.orderBy("datetime")))

    arr_df = df_idx.agg(
        F.sort_array(
            F.collect_list(
                F.struct(*df_idx.columns)   # guarda todas las columnas por fila
            )
        ).alias("arr")
    )

    ema_expr = F.expr(f"""
    aggregate(
      arr,
      cast(array() as array<double>),
      (acc, x) -> concat(
        acc,
        array(
          case
            when size(acc)=0 then x.close
            else {a}*x.close + (1-{a})*element_at(acc, -1)
          end
        )
      )
    )
    """)

    # reconstruimos dataframe (todas cols + nueva EMA)
    df_out = (arr_df
        .select("arr", ema_expr.alias("ema_arr"))
        .select(F.posexplode("arr").alias("pos","x"), F.col("ema_arr")[F.col("pos")].alias(out_col))
        .select("x.*", out_col)
        .drop("idx")  # por si lo traía
        .orderBy("datetime")
    )
    return df_out

In [69]:
# Calcular EMA12 y EMA26 y MACD_line
tmp = add_ema_recursive_exact(out_df, 12, "EMA_12")
tmp = add_ema_recursive_exact(tmp, 26, "EMA_26")

df_kpis = (tmp
    .withColumn("MACD_line", F.col("EMA_12") - F.col("EMA_26"))
    .select("datetime","close","SMA_200","EMA_50","RSI_14","MACD_line","year","month")
    .orderBy("datetime")
)

df_kpis.show(5, truncate=False)

[Stage 132:>                                                        (0 + 1) / 1]

+-------------------+-----+-----------------+-----------------+------------------+-------------------+----+-----+
|datetime           |close|SMA_200          |EMA_50           |RSI_14            |MACD_line          |year|month|
+-------------------+-----+-----------------+-----------------+------------------+-------------------+----+-----+
|2022-01-01 01:00:00|527.7|527.7            |527.7            |100.0             |0.0                |2022|1    |
|2022-01-02 01:00:00|531.4|529.55           |527.8450980392157|100.0             |0.29515669515672016|2022|1    |
|2022-01-03 01:00:00|511.8|523.6333333333333|527.2158785082661|15.879828326180032|-1.0404915544517053|2022|1    |
|2022-01-04 01:00:00|507.0|519.4749999999999|526.4230989589223|13.167259786476663|-2.4579881425556778|2022|1    |
|2022-01-05 01:00:00|474.1|510.3999999999999|524.371212725239 |6.06557377049171  |-6.165052556112187 |2022|1    |
+-------------------+-----+-----------------+-----------------+------------------+------

                                                                                

### 6) Guardar en ORO (Parquet particionado por year/month)

In [70]:
GOLD_PATH = "/datos/gittba26/gittba05/Gold/BNB_kpis"

!hdfs dfs -mkdir -p /datos/gittba26/gittba05/Gold

(df_kpis.write
   .mode("overwrite")
   .partitionBy("year","month")
   .parquet(GOLD_PATH))

!hdfs dfs -ls {GOLD_PATH}

                                                                                

Found 5 items
-rw-r--r--   3 gittba_bdt24 supergroup          0 2026-02-09 18:23 /datos/gittba26/gittba05/Gold/BNB_kpis/_SUCCESS
drwxr-xr-x   - gittba_bdt24 supergroup          0 2026-02-09 18:23 /datos/gittba26/gittba05/Gold/BNB_kpis/year=2022
drwxr-xr-x   - gittba_bdt24 supergroup          0 2026-02-09 18:23 /datos/gittba26/gittba05/Gold/BNB_kpis/year=2023
drwxr-xr-x   - gittba_bdt24 supergroup          0 2026-02-09 18:23 /datos/gittba26/gittba05/Gold/BNB_kpis/year=2024
drwxr-xr-x   - gittba_bdt24 supergroup          0 2026-02-09 18:23 /datos/gittba26/gittba05/Gold/BNB_kpis/year=2025


### 7) Checks rápidos

In [71]:
# RSI rango
df_kpis.select(F.min("RSI_14").alias("min_RSI"), F.max("RSI_14").alias("max_RSI")).show()

# últimas filas para comparar con TradingView
df_kpis.orderBy(F.desc("datetime")).show(10, truncate=False)

                                                                                

+-----------------+-------+
|          min_RSI|max_RSI|
+-----------------+-------+
|3.520456707897182|  100.0|
+-----------------+-------+



[Stage 160:>                                                        (0 + 1) / 1]

+-------------------+-----+-----------------+-----------------+------------------+-------------------+----+-----+
|datetime           |close|SMA_200          |EMA_50           |RSI_14            |MACD_line          |year|month|
+-------------------+-----+-----------------+-----------------+------------------+-------------------+----+-----+
|2025-12-31 01:00:00|863.4|884.7305000000005|890.5370639151233|57.8052550231839  |-9.788927321705728 |2025|12   |
|2025-12-30 01:00:00|860.9|883.6445000000004|891.6446991769651|45.386064030131806|-11.487619802160566|2025|12   |
|2025-12-29 01:00:00|851.5|882.6180000000005|892.8995848576576|47.81043791241753 |-13.234131007364226|2025|12   |
|2025-12-28 01:00:00|859.2|881.6365000000004|894.5893638314395|44.87612612612614 |-14.313727040080835|2025|12   |
|2025-12-27 01:00:00|844.8|880.6790000000004|896.0338276612941|35.74165298303228 |-16.267484492164954|2025|12   |
|2025-12-26 01:00:00|834.3|879.8230000000004|898.1250043005306|37.31980779498131 |-17.05

                                                                                