In [4]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ohlcv").getOrCreate()
sc = spark.sparkContext

In [44]:
import requests

coin = "ethereum"
period = "365"
url = f"https://api.coingecko.com/api/v3/coins/{coin}/ohlc?vs_currency=usd&days={period}"
caption = ["time", "open", "high", "low", "close"]

data = requests.get(url).json()

file_name = f"{coin}_{period}.csv" 
with open(file_name, "w") as file:
    line = ",".join(caption) + "\n"
    file.write(line)
    for record in data:
        line = ",".join(map(str, record)) + "\n"
        file.write(line)

In [45]:
df = spark.read.option("header", "true").csv(file_name, inferSchema=True)
df.printSchema()

root
 |-- time: long (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)



In [46]:
df.show()

+-------------+-------+-------+-------+-------+
|         time|   open|   high|    low|  close|
+-------------+-------+-------+-------+-------+
|1607299200000| 571.19| 601.97| 571.19| 601.97|
|1607644800000| 592.39| 592.39| 554.33| 560.45|
|1607990400000| 545.98| 590.32| 545.98| 585.54|
|1608336000000| 589.07| 654.42| 589.07| 654.42|
|1608681600000| 659.32| 659.32| 610.43| 634.98|
|1609027200000| 587.96| 636.74| 587.96| 636.74|
|1609372800000| 689.66| 752.86| 689.66| 752.86|
|1609632000000| 738.62|  777.7| 730.15|  777.7|
|1609977600000|  967.0|1208.58|  967.0|1208.58|
|1610323200000|1229.47|1282.98|1223.73|1267.73|
|1610668800000|1092.91|1216.91|1045.41|1216.91|
|1611014400000|1171.86|1255.98|1171.86|1255.98|
|1611360000000|1383.48|1385.85|1122.91|1236.68|
|1611705600000|1231.18|1392.54|1231.18|1355.23|
|1612051200000|1253.14|1380.28|1253.14|1372.43|
|1612310400000|1317.05|1514.23|1317.05|1514.23|
|1612656000000| 1661.0|1724.86| 1587.8|1683.94|
|1613001600000|1608.64|1769.05|1608.64|1

In [133]:
from pyspark.sql.functions import lit, avg, row_number, udf
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType


@udf(returnType=IntegerType())
def define_group(value, sma_num):
    return value // sma_num


sma_num = 30

order_by_time = Window.orderBy("time")
partition_by_group = Window.partitionBy("group")

# нумерум строки
df = df.withColumn("row_num", row_number().over(order_by_time))
# разбиваем на группы по sma_num (30)
df = df.withColumn("group", define_group(df["row_num"], lit(sma_num)))
# считаем sma для каждой группы
df = df.withColumn("sma", avg("close").over(partition_by_group))

df.show()

21/12/04 11:22:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/04 11:22:26 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------+-------+-------+-------+-------+-------+-----+------------------+
|         time|   open|   high|    low|  close|row_num|group|               sma|
+-------------+-------+-------+-------+-------+-------+-----+------------------+
|1607299200000| 571.19| 601.97| 571.19| 601.97|      1|    0|1311.5003448275866|
|1607644800000| 592.39| 592.39| 554.33| 560.45|      2|    0|1311.5003448275866|
|1607990400000| 545.98| 590.32| 545.98| 585.54|      3|    0|1311.5003448275866|
|1608336000000| 589.07| 654.42| 589.07| 654.42|      4|    0|1311.5003448275866|
|1608681600000| 659.32| 659.32| 610.43| 634.98|      5|    0|1311.5003448275866|
|1609027200000| 587.96| 636.74| 587.96| 636.74|      6|    0|1311.5003448275866|
|1609372800000| 689.66| 752.86| 689.66| 752.86|      7|    0|1311.5003448275866|
|1609632000000| 738.62|  777.7| 730.15|  777.7|      8|    0|1311.5003448275866|
|1609977600000|  967.0|1208.58|  967.0|1208.58|      9|    0|1311.5003448275866|
|1610323200000|1229.47|1282.

In [137]:
columns_to_drop = ["row_num", "group"]
df = df.drop(*columns_to_drop)

In [141]:
result_df = df.filter(df["close"] > df["sma"])
result_df.show()

21/12/04 11:24:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/04 11:24:40 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-------------+-------+-------+-------+-------+------------------+
|         time|   open|   high|    low|  close|               sma|
+-------------+-------+-------+-------+-------+------------------+
|1611705600000|1231.18|1392.54|1231.18|1355.23|1311.5003448275866|
|1612051200000|1253.14|1380.28|1253.14|1372.43|1311.5003448275866|
|1612310400000|1317.05|1514.23|1317.05|1514.23|1311.5003448275866|
|1612656000000| 1661.0|1724.86| 1587.8|1683.94|1311.5003448275866|
|1613001600000|1608.64|1769.05|1608.64|1739.16|1311.5003448275866|
|1613347200000|1782.51| 1841.2|1782.51|1804.98|1311.5003448275866|
|1613692800000|1775.76|1938.57|1775.76|1938.57|1311.5003448275866|
|1614038400000|1969.98|1969.98|1788.62|1788.62|1311.5003448275866|
|1614384000000|1563.92|1628.39|1450.99|1450.99|1311.5003448275866|
|1614470400000|1480.13| 1570.4|1416.66|1497.09|1311.5003448275866|
|1615075200000|1579.43|1661.93|1539.05|1661.93|1311.5003448275866|
|1615420800000|1727.46|1869.33|1727.46|1802.31|1311.5003448275

In [145]:
result_df.drop("sma").write.csv("result")

21/12/04 11:30:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/12/04 11:30:55 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
