# PySpark Partitioning  
## Bitcoin Price from KraKen API

In [29]:
import pandas as pd
from datetime import datetime, date, time
import requests
import json

In [41]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

In [31]:
import warnings
warnings.filterwarnings('ignore')

In [32]:
spark = SparkSession.builder.appName("bitcoin_pyspark_kraken_json").getOrCreate()
spark
#spark.stop()

In [33]:
btc_spark_df = spark.read.csv("data/btc_spark_df.csv", header=True, inferSchema=True)

In [34]:
btc_spark_df.show(5)

+-------------------+---------+---------+------+------+------+------+
|               date|     open|     high|   low| close|volume|trades|
+-------------------+---------+---------+------+------+------+------+
|2013-10-06 00:00:00|    122.0|    122.0| 122.0| 122.0|   0.1|     1|
|2013-10-07 00:00:00|   123.61|   123.61|123.61|123.61|   0.1|     1|
|2013-10-08 00:00:00|   123.91|   124.19| 123.9|124.18|3.9916|     4|
|2013-10-09 00:00:00|124.01687|124.01687|123.84|123.84| 2.823|     3|
|2013-10-10 00:00:00|   125.85|   125.86|125.85|125.86|   2.0|     2|
+-------------------+---------+---------+------+------+------+------+
only showing top 5 rows



In [35]:
btc_spark_df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- trades: integer (nullable = true)



In [43]:
btc_spark_df = btc_spark_df.withColumn("date", to_date("date"))
btc_spark_df.show(5)

+----------+---------+---------+------+------+------+------+
|      date|     open|     high|   low| close|volume|trades|
+----------+---------+---------+------+------+------+------+
|2013-10-06|    122.0|    122.0| 122.0| 122.0|   0.1|     1|
|2013-10-07|   123.61|   123.61|123.61|123.61|   0.1|     1|
|2013-10-08|   123.91|   124.19| 123.9|124.18|3.9916|     4|
|2013-10-09|124.01687|124.01687|123.84|123.84| 2.823|     3|
|2013-10-10|   125.85|   125.86|125.85|125.86|   2.0|     2|
+----------+---------+---------+------+------+------+------+
only showing top 5 rows



In [49]:
!sysctl -n hw.ncpu

8


In [44]:
btc_spark_df.rdd.getNumPartitions()

1

### Partitioning By Size

In [45]:
# from pyspark.sql.functions import spark_partition_id

#### repartition()
- can size up or down

In [101]:
btc_repartition_4 = btc_spark_df.repartition(4)

In [102]:
btc_repartition_4.rdd.getNumPartitions()

4

In [103]:
btc_repartition_4.columns

['date', 'open', 'high', 'low', 'close', 'volume', 'trades']

In [104]:
col_names = btc_repartition_4.columns

In [105]:
btc_repartition_4.select(col_names).show(5)

+----------+-------+---------+--------+--------+-------------+------+
|      date|   open|     high|     low|   close|       volume|trades|
+----------+-------+---------+--------+--------+-------------+------+
|2017-10-25| 5545.5|   5777.7|  5400.0|  5762.8|4900.59050168| 19159|
|2019-03-23| 3982.1|   4000.4|  3961.0|  3982.5|1616.99006845|  4791|
|2014-07-27|  600.0|    600.0|   600.0|   600.0|       0.1545|     1|
|2016-01-10|  450.0|451.12999|  442.05|  449.13|  42.67798529|   100|
|2017-06-16|2428.72|  2526.35|2308.311|2470.303|8199.12396206| 17818|
+----------+-------+---------+--------+--------+-------------+------+
only showing top 5 rows



In [106]:
btc_repartition_4.select('date', 'open', 'high', 'low', 'close', 'volume', 'trades', spark_partition_id())\
.show(5)

+----------+-------+---------+--------+--------+-------------+------+--------------------+
|      date|   open|     high|     low|   close|       volume|trades|SPARK_PARTITION_ID()|
+----------+-------+---------+--------+--------+-------------+------+--------------------+
|2017-10-25| 5545.5|   5777.7|  5400.0|  5762.8|4900.59050168| 19159|                   0|
|2019-03-23| 3982.1|   4000.4|  3961.0|  3982.5|1616.99006845|  4791|                   0|
|2014-07-27|  600.0|    600.0|   600.0|   600.0|       0.1545|     1|                   0|
|2016-01-10|  450.0|451.12999|  442.05|  449.13|  42.67798529|   100|                   0|
|2017-06-16|2428.72|  2526.35|2308.311|2470.303|8199.12396206| 17818|                   0|
+----------+-------+---------+--------+--------+-------------+------+--------------------+
only showing top 5 rows



In [107]:
btc_repartition_4 = btc_repartition_4.withColumn('partition', spark_partition_id())

In [108]:
btc_repartition_4.show(5)

+----------+-------+---------+--------+--------+-------------+------+---------+
|      date|   open|     high|     low|   close|       volume|trades|partition|
+----------+-------+---------+--------+--------+-------------+------+---------+
|2017-10-25| 5545.5|   5777.7|  5400.0|  5762.8|4900.59050168| 19159|        0|
|2019-03-23| 3982.1|   4000.4|  3961.0|  3982.5|1616.99006845|  4791|        0|
|2014-07-27|  600.0|    600.0|   600.0|   600.0|       0.1545|     1|        0|
|2016-01-10|  450.0|451.12999|  442.05|  449.13|  42.67798529|   100|        0|
|2017-06-16|2428.72|  2526.35|2308.311|2470.303|8199.12396206| 17818|        0|
+----------+-------+---------+--------+--------+-------------+------+---------+
only showing top 5 rows



In [109]:
btc_repartition_4.filter(btc_repartition_4.partition == 1).show(3)

+----------+------+------+---------+------+----------------+------+---------+
|      date|  open|  high|      low| close|          volume|trades|partition|
+----------+------+------+---------+------+----------------+------+---------+
|2015-03-15|287.25| 288.0|281.51071| 288.0|      0.14544647|     9|        1|
|2020-04-02|6662.9|7269.0|   6576.4|6803.8|18344.4551351203| 43096|        1|
|2019-03-29|4013.3|4100.0|   4005.9|4088.9|   4465.02001546|  9736|        1|
+----------+------+------+---------+------+----------------+------+---------+
only showing top 3 rows



In [116]:
btc_repartition_4.filter(btc_repartition_4.partition == 3).show(3)

+----------+--------+-------+-------+--------+----------------+------+---------+
|      date|    open|   high|    low|   close|          volume|trades|partition|
+----------+--------+-------+-------+--------+----------------+------+---------+
|2020-11-20| 17827.4|18824.0|17780.0| 18680.2| 6868.2597017003| 33716|        3|
|2022-01-27| 36827.7|37242.5|35548.5| 37200.5| 4276.4801738801| 29424|        3|
|2017-07-20|2254.998|2934.44|2254.64|2854.882|16465.5437299603| 27528|        3|
+----------+--------+-------+-------+--------+----------------+------+---------+
only showing top 3 rows



#### coalesce()
- only reduce

In [118]:
btc_coalesce_1 = btc_repartition_4.coalesce(1)

In [119]:
btc_coalesce_1.rdd.getNumPartitions()

1

### Partition by Column

In [120]:
btc_spark_df.rdd.getNumPartitions()

1

In [122]:
btc_repartition_col = btc_spark_df.repartition("date")

In [123]:
btc_repartition_col.select("date", spark_partition_id()).show(5)

+----------+--------------------+
|      date|SPARK_PARTITION_ID()|
+----------+--------------------+
|2014-11-12|                   0|
|2015-03-09|                   0|
|2015-05-19|                   0|
|2016-03-01|                   0|
|2017-08-11|                   0|
+----------+--------------------+
only showing top 5 rows



In [124]:
btc_repartition_col = btc_repartition_col.withColumn("partition", spark_partition_id())

In [125]:
btc_repartition_col.show(5)

+----------+---------+---------+---------+---------+---------------+------+---------+
|      date|     open|     high|      low|    close|         volume|trades|partition|
+----------+---------+---------+---------+---------+---------------+------+---------+
|2014-11-12|383.33812|430.35988|   379.99|423.48202|   219.46096288|   195|        0|
|2015-03-09|270.12774|297.50744|270.12774|296.99983|    18.77793687|    34|        0|
|2015-05-19|233.31987|233.90998|231.43364|232.78497|    23.40544345|    50|        0|
|2016-03-01|  437.893|  439.406|  422.794|   431.48|  1807.94366283|  1178|        0|
|2017-08-11| 3434.099| 3735.199| 3425.006|   3670.0|5731.5368379203| 22244|        0|
+----------+---------+---------+---------+---------+---------------+------+---------+
only showing top 5 rows



In [127]:
btc_repartition_col.rdd.getNumPartitions()

1

In [128]:
btc_repartition_date_4 = btc_spark_df.repartition(4, "date")

In [129]:
btc_repartition_date_4 = btc_repartition_date_4.withColumn("partition", spark_partition_id())

In [131]:
btc_repartition_date_4.filter(btc_repartition_date_4.partition==3).show(5)

+----------+---------+---------+---------+---------+------+------+---------+
|      date|     open|     high|      low|    close|volume|trades|partition|
+----------+---------+---------+---------+---------+------+------+---------+
|2013-10-07|   123.61|   123.61|   123.61|   123.61|   0.1|     1|        3|
|2013-10-09|124.01687|124.01687|   123.84|   123.84| 2.823|     3|        3|
|2013-10-10|   125.85|   125.86|   125.85|   125.86|   2.0|     2|        3|
|2013-10-12|    127.5|    127.5|    127.0|    127.0|   4.0|     3|        3|
|2013-10-15|    135.8|    153.0|133.87975|133.87975|   4.3|     5|        3|
+----------+---------+---------+---------+---------+------+------+---------+
only showing top 5 rows

