In [None]:
from maystreet_data.cluster import client

# Remember! Please ensure you change the value below to the name of your cluster

cluster_client = client("YourClusterNameHere")


feeds = ["bats_edga"]
products = ["IBM", "AAPL", "TSLA", "V"]
year = 2022
month = 1
days = [5,6,7]

In [None]:
import datetime

from dask.dataframe import to_datetime
import maystreet_data


product_index_map = {product: index for index, product in enumerate(products)}


def get_daily_price_summary_query(requested_day):
    """
    Builds a query to fetch the price data broken into every minute for the products and feeds specified above.
    """

    dt = datetime.date(year, month, requested_day).isoformat()

    feeds_filter = ", ".join([f"'{feed}'" for feed in feeds])
    products_filter = ", ".join([f"'{product}'" for product in products])

    return f"""
    WITH
        price_and_time AS (
            SELECT
                product,
                DATE_TRUNC('minute', FROM_UNIXTIME(exchangetimestamp / 1000000000)) AS dp_minute,
                MIN(price) AS min_price,
                MAX(price) AS max_price,
                MIN(receipttimestamp) AS min_receipt,
                MAX(receipttimestamp) AS max_receipt
            FROM
                "prod_lake.p_mst_data_lake".mt_trade
            WHERE
                dt = '{dt}'
                AND product IN ({products_filter})
                AND f IN ({feeds_filter})
            GROUP BY 1, 2
        )
    SELECT
        product,
        dp_minute,
        min_price,
        max_price,
        (SELECT
            MIN(price)
        FROM
            "prod_lake.p_mst_data_lake".mt_trade
        WHERE
            dt = '{dt}'
            AND product IN ({products_filter})
            AND f IN ({feeds_filter})
            AND product = price_and_time.product
            AND receipttimestamp = min_receipt
        ) AS open_price,
        (SELECT
            MAX(price)
        FROM
            "prod_lake.p_mst_data_lake".mt_trade
        WHERE
            dt = '{dt}'
            AND product IN ({products_filter})
            AND f IN ({feeds_filter})
            AND product = price_and_time.product
            AND receipttimestamp = max_receipt
        ) close_price
    FROM
        price_and_time
    ORDER BY dp_minute
    """


# Now we create a distributed dataframe, with one partition per query.
# If your cluster has at least 3 workers, each partition can reside in a different worker.
# Queries are launched by the workers concurrently, but they all run in the same Data Lake cluster.
daily_summaries = maystreet_data.data_lake_distributed_dataframe(
    queries=[get_daily_price_summary_query(day) for day in days]
)

# this (trivial) computation is automatically distributed in the cluster
daily_summaries = daily_summaries.assign(
    dp_minute=to_datetime(daily_summaries["dp_minute"], unit="ms"),
    product_index=daily_summaries["product"].map(product_index_map),
)
daily_summaries = daily_summaries.assign(
    time_ordinal=daily_summaries["dp_minute"].map(datetime.datetime.toordinal)
)

# persist all the rows in the cluster memory
daily_summaries = cluster_client.persist(daily_summaries)


In [None]:
%matplotlib widget

import datetime

import matplotlib.pyplot as plt
import matplotlib.dates as mdates


fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')

x = daily_summaries['product_index']
y = mdates.date2num(daily_summaries['dp_minute'])
z = daily_summaries['max_price']

ax.scatter(x, y, z)
ax.yaxis.set_major_formatter(mdates.DateFormatter('%d %H:%M'))

plt.xticks(range(0, len(products)), products)
plt.show()
