In [1]:
import pyspark
print(pyspark.__version__)

3.5.6


In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
import os

In [3]:
conf = (SparkConf()
            .setMaster("yarn")
            .set("spark.executor.cores", 5)
            .set("spark.sql.shuffle.partitions", 200)
            .set("spark.default.parallelism", 200)
            .set("spark.executor.memory", "7g")
            .set("spark.dynamicAllocation.maxExecutors", 20)
        )

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .appName("doge") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
# Ejemplo de creaciÃ³n de un DataFrame a partir de los datos almacenados en HDFS
df = spark.read \
          .option("header","true") \
          .option("inferSchema", "true") \
          .csv("/datos/gittba26/gittba02/DOGE/doge_bronze")

print(f"Schema of the loaded data:")
df.printSchema()
print(f"Number of rows: {df.count()}")
print(f"Number of columns: {len(df.columns)}")

                                                                                

Schema of the loaded data:
root
 |-- datetime: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

Number of rows: 6576
Number of columns: 8


In [8]:
# Define output path for parquet
parquet_output = "/datos/gittba26/gittba02/DOGE/doge_silver"

# Write the DataFrame as Parquet format
df.write \
  .mode("overwrite") \
  .partitionBy("year", "month") \
  .parquet(parquet_output)

print(f"\nData successfully converted to Parquet format at '{parquet_output}/'")

                                                                                


Data successfully converted to Parquet format at '/datos/gittba26/gittba02/DOGE/doge_silver/'


In [9]:
# Verify by reading back the parquet files
df_parquet = spark.read.parquet(parquet_output)
print(f"Parquet verification - Rows: {df_parquet.count()}, Columns: {len(df_parquet.columns)}")
df_parquet.show(5)

Parquet verification - Rows: 6576, Columns: 8
+-------------------+------+------+------+------+-----------+----+-----+
|           datetime|  open|  high|   low| close|     volume|year|month|
+-------------------+------+------+------+------+-----------+----+-----+
|2025-08-01 02:00:00|0.2095|0.2113|0.2026|0.2097|1.2966565E7|2025|    8|
|2025-08-01 06:00:00|0.2082|0.2087|0.2038| 0.206|  5124193.0|2025|    8|
|2025-08-01 10:00:00|0.2059|0.2076|0.2005|0.2076|  6198772.0|2025|    8|
|2025-08-01 14:00:00|0.2075|0.2105|0.2022|  0.21|  3038858.0|2025|    8|
|2025-08-01 18:00:00|  0.21|0.2129|0.2039|0.2084|  4275527.0|2025|    8|
+-------------------+------+------+------+------+-----------+----+-----+
only showing top 5 rows



In [11]:
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, StructType, StructField

# --- Config ---
parquet_input = "/datos/gittba26/gittba02/DOGE/doge_silver"
parquet_output = "/datos/gittba26/gittba02/DOGE/doge_gold"

df_parquet = spark.read.parquet(parquet_input)

# --- Base window ---
w_ord = Window.orderBy("datetime")

# --- SMA 200 ---
w_sma200 = w_ord.rowsBetween(-199, 0)
df_indicators = df_parquet.withColumn("SMA_200", F.avg("Close").over(w_sma200))

# --- Precompute delta, gain, loss (used by RSI) ---
df_indicators = (
    df_indicators
    .withColumn("_delta", F.col("Close") - F.lag("Close", 1).over(w_ord))
    .withColumn("_gain", F.when(F.col("_delta") > 0, F.col("_delta")).otherwise(0.0))
    .withColumn("_loss", F.when(F.col("_delta") < 0, -F.col("_delta")).otherwise(0.0))
)

# --- Collect lists for EMA/RSI/MACD in a single pass per window size ---
ema_span_50 = 50
w_ema50 = w_ord.rowsBetween(-ema_span_50 * 4, 0)

rsi_period = 14
w_rsi = w_ord.rowsBetween(-rsi_period * 10, 0)

w_macd = w_ord.rowsBetween(-200, 0)

df_indicators = (
    df_indicators
    .withColumn("_close_list_ema", F.collect_list("Close").over(w_ema50))
    .withColumn("_gain_list", F.collect_list("_gain").over(w_rsi))
    .withColumn("_loss_list", F.collect_list("_loss").over(w_rsi))
    .withColumn("_close_list_macd", F.collect_list("Close").over(w_macd))
)

# --- UDFs ---
alpha_50 = 2.0 / (ema_span_50 + 1)
alpha_rsi = 1.0 / rsi_period
alpha_12 = 2.0 / 13.0
alpha_26 = 2.0 / 27.0
alpha_9 = 2.0 / 10.0

@F.udf(DoubleType())
def compute_ema(close_list, alpha):
    if not close_list:
        return None
    ema = float(close_list[0])
    for c in close_list[1:]:
        ema = alpha * float(c) + (1.0 - alpha) * ema
    return ema

@F.udf(DoubleType())
def compute_rsi(gain_list, loss_list, alpha):
    if not gain_list or len(gain_list) < 14:
        return None
    avg_gain = sum(float(g) for g in gain_list[:14]) / 14.0
    avg_loss = sum(float(l) for l in loss_list[:14]) / 14.0
    for i in range(14, len(gain_list)):
        avg_gain = alpha * float(gain_list[i]) + (1.0 - alpha) * avg_gain
        avg_loss = alpha * float(loss_list[i]) + (1.0 - alpha) * avg_loss
    if avg_loss == 0:
        return 100.0
    rs = avg_gain / avg_loss
    return 100.0 - (100.0 / (1.0 + rs))

macd_schema = StructType([
    StructField("MACD", DoubleType()),
    StructField("MACD_Signal", DoubleType()),
    StructField("MACD_Histogram", DoubleType()),
])

@F.udf(macd_schema)
def compute_macd(close_list, a12, a26, a9):
    if not close_list or len(close_list) < 26:
        return None
    ema12 = ema26 = float(close_list[0])
    macd_vals = []
    for c in close_list:
        c = float(c)
        ema12 = a12 * c + (1.0 - a12) * ema12
        ema26 = a26 * c + (1.0 - a26) * ema26
        macd_vals.append(ema12 - ema26)
    signal = macd_vals[0]
    for m in macd_vals:
        signal = a9 * m + (1.0 - a9) * signal
    macd = macd_vals[-1]
    return (macd, signal, macd - signal)

# --- Apply UDFs and extract MACD fields ---
df_indicators = (
    df_indicators
    .withColumn("EMA_50", compute_ema(F.col("_close_list_ema"), F.lit(alpha_50)))
    .withColumn("RSI_14", compute_rsi(F.col("_gain_list"), F.col("_loss_list"), F.lit(alpha_rsi)))
    .withColumn("_macd", compute_macd(F.col("_close_list_macd"), F.lit(alpha_12), F.lit(alpha_26), F.lit(alpha_9)))
    .withColumn("MACD", F.col("_macd.MACD"))
    .withColumn("MACD_Signal", F.col("_macd.MACD_Signal"))
    .withColumn("MACD_Histogram", F.col("_macd.MACD_Histogram"))
)

# --- Clean up and write ---
temp_cols = ["_delta", "_gain", "_loss", "_close_list_ema", "_gain_list", "_loss_list", "_close_list_macd", "_macd"]
df_final = df_indicators.drop(*temp_cols).orderBy("datetime")

df_final.write \
  .mode("overwrite") \
  .partitionBy("year", "month") \
  .parquet(parquet_output)

print("Technical Indicators Calculated: SMA_200, EMA_50, RSI_14, MACD, MACD_Signal, MACD_Histogram")
df_final.select("datetime", "Close", "SMA_200", "EMA_50", "RSI_14", "MACD", "MACD_Signal", "MACD_Histogram").show(20, truncate=False)


                                                                                

Technical Indicators Calculated: SMA_200, EMA_50, RSI_14, MACD, MACD_Signal, MACD_Histogram


[Stage 36:>                                                         (0 + 1) / 1]

+-------------------+------+-------------------+-------------------+------------------+----+-----------+--------------+
|datetime           |Close |SMA_200            |EMA_50             |RSI_14            |MACD|MACD_Signal|MACD_Histogram|
+-------------------+------+-------------------+-------------------+------------------+----+-----------+--------------+
|2023-01-01 01:00:00|0.0697|0.0697             |0.0697             |NULL              |NULL|NULL       |NULL          |
|2023-01-01 05:00:00|0.0694|0.06955            |0.06968823529411763|NULL              |NULL|NULL       |NULL          |
|2023-01-01 09:00:00|0.0696|0.06956666666666667|0.06968477508650518|NULL              |NULL|NULL       |NULL          |
|2023-01-01 13:00:00|0.0699|0.06965            |0.06969321527919126|NULL              |NULL|NULL       |NULL          |
|2023-01-01 17:00:00|0.0702|0.06976            |0.0697130891898112 |NULL              |NULL|NULL       |NULL          |
|2023-01-01 21:00:00|0.0703|0.0698500000

                                                                                