In [51]:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, LongType, StructType, StructField, StringType, DoubleType
import pyspark.sql.functions as f
from pyspark.sql import Window
import findspark

In [180]:
findspark.init()
spark = SparkSession.builder.appName('Crypto').getOrCreate()
SCHEMA = StructType([
    StructField("dt", LongType(), False),
    StructField("base", StringType(), False),
    StructField("quote", StringType(), False),
    StructField("exchange", StringType(), False),
    StructField("bidPrice", DoubleType(), False),
    StructField("bidQty", DoubleType(), False),
    StructField("askPrice", DoubleType(), False),
    StructField("askQty", DoubleType(), False),
    
])
df = spark.read.options(delimiter='\t', ).csv("../logs9.tsv", header=False, schema=SCHEMA)

In [181]:
w = Window.partitionBy(['dt', "base", "quote"])
bids = df.withColumn('maxBid', f.max('bidPrice').over(w))\
    .where(f.col('bidPrice') == f.col('maxBid'))\
    .drop('maxBid').withColumnRenamed("exchange", "bidExchange") \
    .drop("askPrice").drop("askQty")
asks = df.withColumn('minAsk', f.min('askPrice').over(w))\
    .where(f.col('askPrice') == f.col('minAsk'))\
    .drop('minAsk').withColumnRenamed("exchange", "askExchange") \
    .drop("bidPrice").drop("bidQty")
test = bids.join(asks, on=["dt", "base", "quote"]) \
    .withColumn("Qty", f.least("bidQty", "askQty")) \
    .withColumn("revenue", (f.col("bidPrice") - f.col("askPrice")) * f.col("Qty"))
test = test[test["revenue"] > 0]

In [182]:
test3 = test2.groupBy(["base", "quote", "bidExchange", "askExchange"]) \
    .agg(f.collect_list(f.struct("dt", "bidPrice", "askPrice", "Qty")).alias("data"))

In [183]:
def get_values(row):
    return row["dt"], row["bidPrice"], row["askPrice"], row["Qty"]


def find(rows):
    arbitrages = []
    rows.sort(key=lambda x: x["dt"])
    old_dt = rows[0]["dt"]
    start = old_dt
    #changes = [get_values(rows[0])]
    for row in rows[1:]:
        new_dt, bid, ask, qty = get_values(row)
        if new_dt - old_dt > 10**8:
            arbitrages.append(start - old_dt + 10**8)#, changes))
            start = new_dt
            #changes = [(start, bid, ask, qty)]
        #if changes[-1][1:] != (bid, ask, qty):
        #    changes.append((new_dt, bid, ask, qty))
        old_dt = new_dt
#     if rows[-1]["dt"] - rows[-2]["dt"] > 10**8:
#         arbitrages.append((row["dt"], row["dt"] + 10**8, [get_values(row)]))
    if rows[-1]["dt"] - rows[-2]["dt"] > 10**8:
        arbitrages.append(10**8)
    return arbitrages

func = f.udf(find, ArrayType(LongType()))

In [184]:
test3 = test3.withColumn("arbitrations", func("data"))

In [185]:
def calc_avg(x):
    return sum(x) / len(x)
    

func2 = f.udf(calc_avg, DoubleType())
test3 = test3.withColumn("avg_arb", func2("arbitrations"))

In [186]:
test3.show()

+----+-----+-----------+-----------+--------------------+--------------------+-------+
|base|quote|bidExchange|askExchange|                data|        arbitrations|avg_arb|
+----+-----+-----------+-----------+--------------------+--------------------+-------+
| ETH|  EUR|     kraken|    binance|[{167143305097501...|[100000000, 10000...|  1.0E8|
| SOL|  BTC|   poloniex|    binance|[{167143304946357...|[100000000, 10000...|  1.0E8|
| DOT|  BTC|     kraken|   poloniex|[{167143305228024...|[100000000, 10000...|  1.0E8|
| BTC| USDT|    binance|   poloniex|[{167143304805604...|[100000000, 10000...|  1.0E8|
| BTC|  EUR|     kraken|    binance|[{167143304936261...|[100000000, 10000...|  1.0E8|
| PSG| USDT|      huobi|    binance|[{167143304996973...|[100000000, 10000...|  1.0E8|
| XCN| USDT|   poloniex|       gate|[{167143305137660...|[100000000, 10000...|  1.0E8|
|USDT|  USD|       gate|     kraken|[{167143305228024...|[100000000, 10000...|  1.0E8|
| REP|  ETH|     kraken|       gate|[{16714

In [178]:
test3.select(f.avg("avg_arb")).show()

+------------+
|avg(avg_arb)|
+------------+
|       1.0E8|
+------------+

