In [ ]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._


In [ ]:
val spark = sparkSession

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7cfbbf77


In [ ]:
val btcPriceSchema = StructType(
    StructField("datetime", StringType, false) ::
    StructField("opening_price", DoubleType, false) ::
    StructField("highest_price", DoubleType, false) ::
    StructField("lowest_price", DoubleType, false) ::
    StructField("closing_price", DoubleType, false) ::
    StructField("volume_btc", DoubleType, false) ::
    StructField("volume_currency", DoubleType, false) ::
    StructField("currency_code", StringType, false) :: Nil
)

val cryptoPriceSchema = StructType(
    StructField("datetime", StringType, false) ::
    StructField("opening_price", DoubleType, false) ::
    StructField("highest_price", DoubleType, false) ::
    StructField("lowest_price", DoubleType, false) ::
    StructField("closing_price", DoubleType, false) ::
    StructField("volume_crypto", DoubleType, false) ::
    StructField("volume_btc", DoubleType, false) ::
    StructField("currency_code", StringType, false) :: Nil
)

btcPriceSchema: org.apache.spark.sql.types.StructType = StructType(StructField(datetime,StringType,false), StructField(opening_price,DoubleType,false), StructField(highest_price,DoubleType,false), StructField(lowest_price,DoubleType,false), StructField(closing_price,DoubleType,false), StructField(volume_btc,DoubleType,false), StructField(volume_currency,DoubleType,false), StructField(currency_code,StringType,false))
cryptoPriceSchema: org.apache.spark.sql.types.StructType = StructType(StructField(datetime,StringType,false), StructField(opening_price,DoubleType,false), StructField(highest_price,DoubleType,false), StructField(lowest_price,DoubleType,false), StructField(closing_price,DoubleType,false), StructField(volume_crypto,DoubleType,false), StructField(volume_btc,DoubleType,false), S...

In [ ]:
val btcPricesDF = spark.read
  .format("csv")
  .schema(btcPriceSchema)
  .load("/home/dr0ff/hackspace/opensource/spark-notebooks-gallery/gallery/cryptocurrency-ts/crypto_data/btc_prices.csv")
  .withColumn("time", unix_timestamp($"datetime", "yyyy-MM-dd HH:mm:ssX"))
  .withColumn("date", from_unixtime($"time", "yyyy-MM-dd"))

val cryptoPricesDF = spark.read
  .format("csv")
  .schema(cryptoPriceSchema)
  .load("/home/dr0ff/hackspace/opensource/spark-notebooks-gallery/gallery/cryptocurrency-ts/crypto_data/crypto_prices.csv")
  .withColumn("time", unix_timestamp($"datetime", "MM/dd/yyyy HH:mm"))
  .withColumn("date", from_unixtime($"time", "yyyy-MM-dd"))

btcPricesDF: org.apache.spark.sql.DataFrame = [datetime: string, opening_price: double ... 8 more fields]
cryptoPricesDF: org.apache.spark.sql.DataFrame = [datetime: string, opening_price: double ... 8 more fields]


In [ ]:
btcPricesDF.show(5)

+--------------------+-------------+-------------+------------+-------------+----------+---------------+-------------+----------+----------+
|            datetime|opening_price|highest_price|lowest_price|closing_price|volume_btc|volume_currency|currency_code|      time|      date|
+--------------------+-------------+-------------+------------+-------------+----------+---------------+-------------+----------+----------+
|2013-03-10 20:00:...|        60.56|        60.56|       60.56|        60.56|    0.1981|           12.0|          AUD|1362960000|2013-03-11|
|2013-03-11 20:00:...|        60.56|        60.56|       41.38|        47.78|     47.11|         2297.5|          AUD|1363046400|2013-03-12|
|2013-03-12 20:00:...|        49.01|        59.14|       46.49|        59.14|     49.64|        2501.39|          AUD|1363132800|2013-03-13|
|2013-03-13 20:00:...|        50.15|        59.14|        49.7|        51.16|     31.37|        1592.73|          AUD|1363219200|2013-03-14|
|2013-03-14 2

In [ ]:
cryptoPricesDF.show(5)

+---------------+-------------+-------------+------------+-------------+-------------+----------+-------------+----------+----------+
|       datetime|opening_price|highest_price|lowest_price|closing_price|volume_crypto|volume_btc|currency_code|      time|      date|
+---------------+-------------+-------------+------------+-------------+-------------+----------+-------------+----------+----------+
|6/26/2017 20:00|       5.7E-7|       5.7E-7|      5.7E-7|       5.7E-7|          0.0|       0.0|          EOC|1498496400|2017-06-26|
|6/26/2017 20:00|       1.6E-7|       1.6E-7|      1.6E-7|       1.6E-7|          0.0|       0.0|        RATIO|1498496400|2017-06-26|
|6/26/2017 20:00|     2.335E-4|      2.43E-4|     2.18E-4|     2.139E-4|     90175.87|     20.64|          CPC|1498496400|2017-06-26|
|6/26/2017 20:00|       1.6E-7|       1.6E-7|      1.6E-7|       1.6E-7|          0.0|       0.0|          RYC|1498496400|2017-06-26|
|6/26/2017 20:00|       4.2E-4|       4.2E-4|      4.2E-4|    

## OHLC charts

In [ ]:
CustomPlotlyChart(btcPricesDF.where($"currency_code" === "USD"),
                  layout="{title: 'BTC price in USD', showlegend: false}",
                  dataOptions="{type: 'ohlc'}",
                  dataSources="""{
                    x: 'date',
                    close: 'closing_price',
                    high: 'highest_price',
                    low: 'lowest_price',
                    open: 'opening_price'
                  }""",
                 maxPoints=3000)

res9: notebook.front.widgets.charts.CustomPlotlyChart[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]] = <CustomPlotlyChart widget>


### Convert to TimeSeriesRDD

In [ ]:
import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._

import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._


In [ ]:
val btcUSDPricesTsRdd = TimeSeriesRDD
  .fromDF(dataFrame = btcPricesDF.where($"currency_code" === "USD"))(isSorted = true, timeUnit = SECONDS)

btcUSDPricesTsRdd: com.twosigma.flint.timeseries.TimeSeriesRDD = com.twosigma.flint.timeseries.TimeSeriesRDDImpl@4fcd104a


## generate clockTS RDD for time bins

In [ ]:
import com.twosigma.flint.timeseries.Summarizers

import com.twosigma.flint.timeseries.Summarizers


In [ ]:
val timeBin = Duration(14, DAYS).toSeconds.toInt

val minMaxTs = btcPricesDF.where($"currency_code" === "USD").select(min($"time"), max($"time")).head

val (minTs, maxTs) = (minMaxTs(0).asInstanceOf[Long],  minMaxTs(1).asInstanceOf[Long])

timeBin: Int = 1209600
minMaxTs: org.apache.spark.sql.Row = [1279324800,1498521600]
minTs: Long = 1279324800
maxTs: Long = 1498521600


In [ ]:
val clockTs = minTs to maxTs by timeBin

clockTs: scala.collection.immutable.NumericRange[Long] = NumericRange(1279324800, 1280534400, 1281744000, 1282953600, 1284163200, 1285372800, 1286582400, 1287792000, 1289001600, 1290211200, 1291420800, 1292630400, 1293840000, 1295049600, 1296259200, 1297468800, 1298678400, 1299888000, 1301097600, 1302307200, 1303516800, 1304726400, 1305936000, 1307145600, 1308355200, 1309564800, 1310774400, 1311984000, 1313193600, 1314403200, 1315612800, 1316822400, 1318032000, 1319241600, 1320451200, 1321660800, 1322870400, 1324080000, 1325289600, 1326499200, 1327708800, 1328918400, 1330128000, 1331337600, 1332547200, 1333756800, 1334966400, 1336176000, 1337385600, 1338595200, 1339804800, 1341014400, 1342224000, 1343433600, 1344643200, 1345852800, 1347062400, 1348272000, 1349481600, 1350691200, 1351900...

In [ ]:
val clockTsRdd = TimeSeriesRDD
  .fromDF(dataFrame = spark.sparkContext.parallelize(clockTs).toDF("time"))(isSorted = true, timeUnit = SECONDS)

clockTsRdd: com.twosigma.flint.timeseries.TimeSeriesRDD = com.twosigma.flint.timeseries.TimeSeriesRDDImpl@6d839506


In [ ]:
val meanClosingPricesByTwoWeeks = btcUSDPricesTsRdd.summarizeIntervals(clockTsRdd, Summarizers.mean("closing_price"))

meanClosingPricesByTwoWeeks: com.twosigma.flint.timeseries.TimeSeriesRDD = com.twosigma.flint.timeseries.TimeSeriesRDDImpl@6f6a6626


In [ ]:
CustomPlotlyChart(meanClosingPricesByTwoWeeks.toDF.withColumn("date", from_unixtime(($"time" / 1e9), "yyyy-MM-dd")),
                  layout="""{
                    title: 'BTC mean closing price in USD over the last seven years (in 14 day intervals)', 
                    showlegend: false, 
                    yaxis: {title: 'Mean Closing Price (BTC/USD)'}}""",
                  dataOptions="{type: 'scatter'}",
                  dataSources="""{
                    x: 'date',
                    y: 'closing_price_mean'
                  }""")

res18: notebook.front.widgets.charts.CustomPlotlyChart[org.apache.spark.sql.DataFrame] = <CustomPlotlyChart widget>


##  Window Functions

In [ ]:
btcPricesDF.where($"currency_code" === "USD").createOrReplaceTempView("btc_prices")

In [ ]:
val dailyBTCreturnsDF = spark.sql("""
SELECT time,
       closing_price / lead(closing_price) over prices AS daily_factor
FROM (
   SELECT time,
          closing_price
   FROM btc_prices
   WHERE currency_code = 'USD'
   GROUP BY 1,2
) sub window prices AS (ORDER BY time DESC)
""")

dailyBTCreturnsDF: org.apache.spark.sql.DataFrame = [time: bigint, daily_factor: double]


In [ ]:
CustomPlotlyChart(dailyBTCreturnsDF.withColumn("date", from_unixtime($"time", "yyyy-MM-dd")),
                  layout="""{
                    title: 'BTC daily return (as a factor of the previous day’s rate) over the last seven years', 
                    showlegend: false, 
                    yaxis: {title: 'Daily Return (BTC/USD)', type: 'log'}}""",
                  dataOptions="{type: 'scatter', line: {width: 1}}",
                  dataSources="""{
                    x: 'date',
                    y: 'daily_factor'
                  }""",
                 maxPoints=3000)

res23: notebook.front.widgets.charts.CustomPlotlyChart[org.apache.spark.sql.DataFrame] = <CustomPlotlyChart widget>


## Volumes by currency

In [ ]:
val btcPricesTsRdd = TimeSeriesRDD.fromDF(dataFrame = btcPricesDF)(isSorted = false, timeUnit = SECONDS)

btcPricesTsRdd: com.twosigma.flint.timeseries.TimeSeriesRDD = com.twosigma.flint.timeseries.TimeSeriesRDDImpl@5453a7b8


In [ ]:
val btcVolumeByCurrencyByInterval = btcPricesTsRdd.summarizeIntervals(clockTsRdd, 
                                                                      Summarizers.sum("volume_btc"),
                                                                      key=Seq("currency_code"))

btcVolumeByCurrencyByInterval: com.twosigma.flint.timeseries.TimeSeriesRDD = com.twosigma.flint.timeseries.TimeSeriesRDDImpl@588693a2


In [ ]:
CustomPlotlyChart(btcVolumeByCurrencyByInterval.toDF.withColumn("date", from_unixtime(($"time" / 1e9), "yyyy-MM-dd")),
                  layout="""{
                    title: 'Volume of BTC in different fiat currencies over the last seven years (in 14 day intervals, stacked)',
                    barmode: 'stack',
                    yaxis: {title: 'Volume(BTC/fiat)'}}""",
                  dataOptions="{type: 'bar', splitBy: 'currency_code'}",
                  dataSources="""{
                    x: 'date',
                    y: 'volume_btc_sum'
                  }""",
                 maxPoints=3000)

res103: notebook.front.widgets.charts.CustomPlotlyChart[org.apache.spark.sql.DataFrame] = <CustomPlotlyChart widget>


In [ ]:
CustomPlotlyChart(btcVolumeByCurrencyByInterval.toDF.where($"currency_code" === "CNY").withColumn("date", from_unixtime(($"time" / 1e9), "yyyy-MM-dd")),
                  layout="""{
                    title: 'Volume of BTC in CNY over the last year (in 14 day intervals)', 
                    showlegend: false, 
                    yaxis: {title: 'Mean Closing Price (BTC/USD)'}}""",
                  dataOptions="{type: 'scatter'}",
                  dataSources="""{
                    x: 'date',
                    y: 'volume_btc_sum'
                  }""")

res107: notebook.front.widgets.charts.CustomPlotlyChart[org.apache.spark.sql.DataFrame] = <CustomPlotlyChart widget>
