In [0]:
import sys
print(sys.version)

In [0]:
"""
Module that contains the get trades functionality. This module will generate a random set of dummy positions.
"""
import random
import logging
import datetime
import uuid
import numpy as np
import pandas as pd


from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, current_timestamp

spark = SparkSession.builder.getOrCreate()

def check_if_valid_date(date: str):
    """
    Verify that the date format matches d/m/y
    :param date: str date in d/m/y format
    :return: True or False
    """
    date_format = "%d/%m/%Y"

    """ Warning to any non python devs reading this code..
        In Python the only way to test a valid date is with a try catch. Yep, it sux.
    """
    if not isinstance(date, str):
        return False

    try:
        datetime.datetime.strptime(date, date_format)
        valid_date = True
    except ValueError:
        valid_date = False

    return valid_date


def random_nan(x):
    """
    Replace x with a nan, if the random number == 1
    """
    if random.randrange(0, 15) == 1:
        x = np.nan

    return x


def generate_new_random_trade_position(date: str):
    """ Generates a new random trade position with the date, period sequence and volume sequence
    :param date: Date in d/m/y format
    :return: dict with data
    """

    period_list = [random_nan(i.strftime("%H:%M")) for i in pd.date_range("00:00", "23:59", freq="5min").time]
    volume = [random_nan(x) for x in random.sample(range(0, 500), len(period_list))]

    open_trade_position = {"date": date,
                           "time": period_list,
                           "volume": volume,
                           "id": uuid.uuid4().hex
                           }

    return open_trade_position


def get_trades(date: str):
    """
    Generate some random number of open trade positions
    :param date: date in d/m/y format
    :return:
    """

    if not check_if_valid_date(date=date):
        error_msg = "The supplied date {} is invalid.Please supply a date in the format d/m/Y.".format(date)
        logging.error(error_msg)
        raise ValueError(error_msg)

    # a randomly chosen number of open trades
    number_of_open_trades = random.randint(1, 101)
    logging.info("Generated" + str(number_of_open_trades) + " open trades randomly.")

    open_trades_list = []
    # Generate a list of open trade dicts
    for open_trade in range(0, number_of_open_trades):
        open_trades_list.append(generate_new_random_trade_position(date=date))

    return open_trades_list

In [0]:
trades = get_trades(date='01/03/2022')
df = pd.DataFrame(trades[0])
df.tail()

Unnamed: 0,date,time,volume,id
283,01/03/2022,23:35,55.0,daaa0078827d43f0a35267a75fea0bf5
284,01/03/2022,23:40,107.0,daaa0078827d43f0a35267a75fea0bf5
285,01/03/2022,23:45,475.0,daaa0078827d43f0a35267a75fea0bf5
286,01/03/2022,23:50,440.0,daaa0078827d43f0a35267a75fea0bf5
287,01/03/2022,,249.0,daaa0078827d43f0a35267a75fea0bf5


In [0]:
from pyspark.sql.functions import coalesce, lit, concat, to_timestamp, lead, unix_timestamp
from pyspark.sql.functions import lit, monotonically_increasing_id, row_number, max
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, DoubleType
from pyspark.sql.window import Window
from datetime import datetime, timedelta
import pandas as pd
from pyspark.sql import functions as F



schema = StructType(
    [StructField("date", StringType(), True),
     StructField("time", StringType(), True),
     StructField("volume", DoubleType(), True),
     StructField("id", StringType(), True)
    ])

sdf = spark.createDataFrame(df, schema)
sdf = sdf.withColumn("timestamp", to_timestamp(concat(sdf["date"],lit(' '),sdf["time"]),"dd/MM/yyyy HH:mm"))
sdf1 = sdf.withColumn("idx1", monotonically_increasing_id())
# Create the window specification
w1 = Window.orderBy("idx1")
w2 = Window.orderBy("idx2")

# Use row number with the window specification
sdf1 = sdf1.withColumn("id", row_number().over(w1))

# Drop the created increasing data column
sdf1 = sdf1.drop("idx1")


dates_range = (sdf.groupBy("date")
               .agg(F.max(F.col("timestamp")).alias("max_timestamp"),F.min(F.col("timestamp")).alias("min_timestamp"))
               .select("date",F.expr("sequence(min_timestamp, max_timestamp, interval 5 minutes)").alias("new_timestamp"))
               .withColumn("new_timestamp", F.explode("new_timestamp"))
               .withColumn("idx2", monotonically_increasing_id())
              )
dates_range = dates_range.withColumn("id", row_number().over(w2)).drop("idx2")

In [0]:
w = Window.partitionBy("date").orderBy("id")
result = (dates_range
          .join(sdf1, ["id", "date"], "left")
          .select("id","date","new_timestamp",
                  *[F.last(F.col(c), ignorenulls=True).over(w).alias(c)
              for c in sdf.columns if c not in ("id", "date", "time","volume","new_timestamp")],"volume")
          .fillna(0, subset=['volume'])
          .withColumn("hour", F.date_trunc('hour',"new_timestamp"))
          .groupby("hour").sum("volume")
         )

In [0]:
capStart = 23 #11 pm, 
capEnd = 1 # 1am


result = result.withColumn('start_unixtime', (F.unix_timestamp(F.col("hour").cast("timestamp"))))
result = result.withColumn('end_unixtime', (F.unix_timestamp(F.col("hour").cast("timestamp"))) + 3600)
result = result.withColumn('match', (result.start_unixtime >= capStart*60*60) & (result.end_unixtime <= capEnd*60*60))
# result = result.filter(result.match == True)


In [0]:
result.display()

hour,sum(volume),start_unixtime,end_unixtime,match
2022-03-01T00:00:00.000+0000,2744.0,1646092800,1646096400,False
2022-03-01T01:00:00.000+0000,3294.0,1646096400,1646100000,False
2022-03-01T02:00:00.000+0000,3012.0,1646100000,1646103600,False
2022-03-01T03:00:00.000+0000,2648.0,1646103600,1646107200,False
2022-03-01T04:00:00.000+0000,2203.0,1646107200,1646110800,False
2022-03-01T05:00:00.000+0000,3082.0,1646110800,1646114400,False
2022-03-01T06:00:00.000+0000,2736.0,1646114400,1646118000,False
2022-03-01T07:00:00.000+0000,2757.0,1646118000,1646121600,False
2022-03-01T08:00:00.000+0000,3282.0,1646121600,1646125200,False
2022-03-01T09:00:00.000+0000,3372.0,1646125200,1646128800,False


In [0]:
df = spark.sql("SELECT * FROM trade.power_position")
df.show()

In [0]:
dbutils.fs.ls('dbfs/tmp/')

In [0]:
read_df = spark.read.format("csv") \
                        .option("inferSchema", "false") \
                        .option("delimiter", ",") \
                        .option("header", "true") \
                        .option("escape","\"") \
                        .option("dateFormat", "yyyy-MM-dd") \
                        .load("dbfs:/dbfs/tmp/PowerPosition_20220923_0102.csv") 

In [0]:
read_df.display()

TimeStamp,Aggregated_Volume
2021-03-01T00:00:00.000Z,3190.0
2021-03-01T01:00:00.000Z,1440.0
2021-03-01T02:00:00.000Z,2268.0
2021-03-01T03:00:00.000Z,3165.0
2021-03-01T04:00:00.000Z,2348.0
2021-03-01T05:00:00.000Z,3293.0
2021-03-01T06:00:00.000Z,2235.0
2021-03-01T07:00:00.000Z,2513.0
2021-03-01T08:00:00.000Z,3263.0
2021-03-01T09:00:00.000Z,3012.0
