In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql.types import *
import os

In [21]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [2]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--executor-memory 1g --driver-memory 10g --num-executors 2 pyspark-shell'
conf = pyspark.SparkConf()
conf.set("spark.sql.shuffle.partitions", "400")
sc = pyspark.SparkContext(appName="arb", conf = conf)
sql = pyspark.SQLContext(sc)

In [17]:
schema = StructType([
    StructField("ts", LongType(), True),
    StructField("exchange", StringType(), True),
    StructField("tradingPair", StringType(), True),
    StructField("orderType", StringType(), True),
    StructField("price", DecimalType(20,10), True),
    StructField("amount", DecimalType(20,10), True)
])

data = sql.read.csv(
    "20171210/*.gz", header=False, mode="DROPMALFORMED", schema=schema
).cache()

In [47]:
def findProfits(tradingPair):
    pairData = data.where(data.tradingPair == tradingPair)
    pairData.createOrReplaceTempView(tradingPair)

    query = """
      SELECT 
        from_unixtime(cast(a.ts/1000 as BIGINT), "yyyMMdd:HHmm") AS ts1,
        from_unixtime(cast(a.ts/1000 as BIGINT), "yyyMMdd:HHmm") AS ts2,
        a.tradingPair, 
        a.exchange AS a_ex, 
        b.exchange AS b_ex, 
        a.orderType AS a_o, 
        b.orderType AS b_o, 
        a.price AS a_price, 
        b.price AS b_price,
        a.amount AS a_amount,
        b.amount AS b_amount,
        ABS(a.price - b.price) AS abs_price_diff,
        a.price * a.amount AS a_cost,
        b.price * b.amount AS b_cost,   
        IF(a.orderType = "SELL", 
            (b.price * CASE WHEN a.amount < b.amount THEN a.amount ELSE b.amount END) - (a.price * CASE WHEN a.amount < b.amount THEN a.amount ELSE b.amount END), 
          (a.price * CASE WHEN a.amount < b.amount THEN a.amount ELSE b.amount END) - (b.price * CASE WHEN a.amount < b.amount THEN a.amount ELSE b.amount END)
        ) AS net_profit,
        FROM %s a 
        JOIN %s b 
        ON a.exchange != b.exchange 
        AND a.orderType != b.orderType
        AND from_unixtime(cast(a.ts/1000 as BIGINT), "yyyMMdd:HHmm") = from_unixtime(cast(b.ts/1000 as BIGINT), "yyyMMdd:HHmm")
        ORDER BY profit DESC
        """ % (tradingPair, tradingPair)

    result = sql.sql(query)
    print(result.count())
    result.show()
    import datetime
    #result.write.csv("profits/%s_%s.csv" % (tradingPair, datetime.datetime.now()))

In [48]:
findProfits("BTCUSD")

0
+---+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+
|ts1|ts2|tradingPair|a_ex|b_ex|a_o|b_o|a_price|b_price|a_amount|b_amount|abs_price_diff|a_cost|b_cost|profit|
+---+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+
+---+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+



In [40]:
tradingPairs = [i.tradingPair for i in data.select("tradingPair").distinct().collect()]
for t in tradingPairs:
    findProfits(t)

886
+-------------------+-----------+-------+-------+----+----+------------+------------+------------+------------+--------------+--------------------+--------------------+-------+
|                 ts|tradingPair|   a_ex|   b_ex| a_o| b_o|     a_price|     b_price|    a_amount|    b_amount|abs_price_diff|              a_cost|              b_cost| profit|
+-------------------+-----------+-------+-------+----+----+------------+------------+------------+------------+--------------+--------------------+--------------------+-------+
|2017-12-09 16:00:49|    DASHBTC|BITTREX| COINGI| BUY|SELL|0.0475573100|0.0886000000|0.0365666700|    1.000E-7|  0.0410426900|0.001739012460857...|    8.86000000000E-9|-4.1E-9|
|2017-12-09 16:02:20|    DASHBTC|BITTREX| COINGI| BUY|SELL|0.0475284100|0.0886000000|0.0365666500|    1.000E-7|  0.0410715900|0.001737954733526...|    8.86000000000E-9|-4.1E-9|
|2017-12-09 16:00:40|    DASHBTC| COINGI|BITTREX|SELL| BUY|0.0886000000|0.0475500200|    1.000E-7|0.0365666100|

0
+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+
| ts|tradingPair|a_ex|b_ex|a_o|b_o|a_price|b_price|a_amount|b_amount|abs_price_diff|a_cost|b_cost|profit|
+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+
+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+

0
+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+
| ts|tradingPair|a_ex|b_ex|a_o|b_o|a_price|b_price|a_amount|b_amount|abs_price_diff|a_cost|b_cost|profit|
+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+
+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+

0
+---+-----------+----+----+---+---+-------+-------+--------+--------+--------------+------+------+------+
| ts|tradingPair|a_ex|b_ex|a_o|b_o|a_p

In [None]:
data.where(data.tradingPair == tradingPair).groupBy(data.exchange, data.orderType).count().show()

In [None]:
sc.stop()