In [56]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import when, avg, abs
from pyspark.sql.functions import expr


In [32]:

#    .config("spark.jars.packages", "org.postgresql:postgresql:42.7.1") \

spark = SparkSession.builder \
    .appName("binance") \
    .getOrCreate()

# schema = StructType([
#     StructField("datetime", TimestampType(), True),
#     StructField("symbol", StringType(), True),
#     StructField("open", DecimalType(18, 2), True),
#     StructField("high", DecimalType(18, 2), True),
#     StructField("low", DecimalType(18, 2), True),
#     StructField("close", DecimalType(18, 2), True),
#     StructField("volume", DecimalType(18, 2), True),
#     StructField("QuoteAssetVolume", DecimalType(18, 2), True),
#     StructField("NumTrades", IntegerType(), True),
#     StructField("TakerBuyBaseAssetVolume", DecimalType(18, 2), True),
#     StructField("TakerBuyQuoteAssetVolume", DecimalType(18, 2), True),
#     StructField("Ignore", StringType(), True)
# ])


In [407]:
df = spark.read.csv("Btcusdt_kline_1d.csv", header=True, inferSchema=True)


In [196]:
def RSI(df, period=14):
    
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ema(pdf):
        pdf['ewm'] = pdf['count'].ewm(span=5, min_periods=1).mean()
        return pdf
    # 이전 행의 종가를 가져옴
    df = df.orderBy("datetime", ascending=True).limit(14)
    df = df.withColumn("prev_close", F.lag(df["close"]).over(Window.orderBy("datetime")))

    df = df.withColumn("delta", df["close"] - df["prev_close"])
    
    ups = when(df["delta"] > 0, df["delta"]).otherwise(0)
    downs = when(df["delta"] < 0, abs(df["delta"])).otherwise(0)
    df = df.withColumn("ups", ups).withColumn("downs", downs)
    
    windowSpec = Window.orderBy(F.monotonically_increasing_id())

    # 행의 개수 계산
    total_rows = df.count()
    
    # 가중치 계산
    df = df.withColumn("weighted_ups", F.row_number().over(windowSpec) / period  * F.col("ups"))
    df = df.withColumn("weighted_downs", F.row_number().over(windowSpec) / period  * F.col("downs"))


    # 가중치를 더하여 새로운 컬럼에 할당
    df.select("datetime","ups","weighted_ups","downs","weighted_downs").show()
    AU = df.agg(F.avg("weighted_ups")).collect()[0][0]
    AD = df.agg(F.avg("weighted_downs")).collect()[0][0]
    print(AU,AD)
    # RSI 계산
    rsi = 100 - (100 / (1 + (AU / AD)))
    print(rsi)
    
    return rsi

In [197]:
RSI(df)

+-------------------+------------------+------------------+------------------+------------------+
|           datetime|               ups|      weighted_ups|             downs|    weighted_downs|
+-------------------+------------------+------------------+------------------+------------------+
|2017-08-17 00:00:00|               0.0|               0.0|               0.0|               0.0|
|2017-08-18 00:00:00|               0.0|               0.0|176.71000000000004|25.244285714285716|
|2017-08-19 00:00:00|31.609999999999673| 6.773571428571358|               0.0|               0.0|
|2017-08-20 00:00:00|               0.0|               0.0|  53.6899999999996|15.339999999999884|
|2017-08-21 00:00:00|               0.0|               0.0| 70.28999999999996|25.103571428571417|
|2017-08-22 00:00:00|              24.0|10.285714285714285|               0.0|               0.0|
|2017-08-23 00:00:00| 74.01000000000022| 37.00500000000011|               0.0|               0.0|
|2017-08-24 00:00:00

76.4612396047748

In [411]:
from pyspark.sql.functions import col, lag, when, collect_list, lit
from pyspark.sql.window import Window
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import pandas_udf


def RSI2(df, period=14):


    df.orderBy("datetime", ascending=True)
    windowSpec = Window.orderBy("datetime")
    df = df.withColumn("prev_close", lag("close", 1).over(windowSpec))

    df = df.withColumn("delta", col("close") - col("prev_close"))
    df = df.withColumn("up", when(col("delta") > 0, col("delta")).otherwise(0))
    df = df.withColumn("down", when(col("delta") < 0, -col("delta")).otherwise(0))


   # EWMA 계산

    df = df.withColumn("up_ewma", F.avg("up").over(Window.orderBy("datetime").rowsBetween(-period+1, 0)))
    df = df.withColumn("down_ewma", F.avg("down").over(Window.orderBy("datetime").rowsBetween(-period+1, 0)))
    df = df.withColumn("rsi", 100 - (100 / (1 + (df["up_ewma"] / df["down_ewma"]))))


    return df

In [412]:
rsi_df = RSI2(df)
rsi_df = rsi_df.orderBy("datetime", ascending=False).limit(14)

rsi_df.select("datetime", "close","up","down","rsi").show()


+-------------------+--------+------------------+-----------------+-----------------+
|           datetime|   close|                up|             down|              rsi|
+-------------------+--------+------------------+-----------------+-----------------+
|2024-03-14 00:00:00|73012.75|               0.0|59.66000000000349|75.72044397090468|
|2024-03-13 00:00:00|73072.41|1620.4000000000087|              0.0|71.85822804780895|
|2024-03-12 00:00:00|71452.01|               0.0|626.0900000000111|75.63635277140384|
|2024-03-11 00:00:00| 72078.1| 3122.220000000001|              0.0|79.28869018796605|
|2024-03-10 00:00:00|68955.88| 642.6100000000006|              0.0|79.02723301378214|
|2024-03-09 00:00:00|68313.27|189.08000000000175|              0.0| 78.6809573741784|
|2024-03-08 00:00:00|68124.19| 1301.020000000004|              0.0|79.13482183208993|
|2024-03-07 00:00:00|66823.17| 749.1300000000047|              0.0|76.71939998974895|
|2024-03-06 00:00:00|66074.04|2350.0299999999916|     