In [1]:
import pandas as pd
import datetime
import numpy as np

from forex.pre_training_data_prep.config import config

import pyspark.sql.functions as f
#from pyspark.sql.types import BooleanType, IntegerType, ArrayType, FloatType

In [2]:
config

{'database_name': 'django',
 'dag_id': 'NEW_prepare_forex_data',
 'tz_name': 'US/Eastern',
 'price_type_name': 'mid',
 'instrument_name': 'EUR/USD',
 'interval_name': 'Minute',
 'retries_pull_forex_data': 1,
 'retry_delay_minutes_pull_forex_data': 5,
 'directory_output': '/home/emily/Desktop/projects/test/badass-data-science/badassdatascience/forecasting/deep_learning/forex/output',
 'filename_candlesticks_query_results': 'candlesticks_query_results.parquet',
 'filename_timezone_added': 'candlesticks_timezone_added.parquet',
 'filename_offset': 'candlesticks_timezone_weekday_offset.parquet',
 'filename_weekday_shift_merged': 'candlesticks_weekday_offset_merged.parquet',
 'filename_shift_days_and_hours_as_needed': 'candlesticks_shifted_as_needed.parquet',
 'filename_finalized_pandas': 'candlesticks_finalized_pandas.parquet',
 'filename_conversion_to_spark': 'spark_converted.parquet',
 'filename_pivot_and_sort': 'spark_pivot_and_sort.parquet',
 'filename_timestamp_diff': 'spark_timestamp

## Get previous Pandas dataframe and put it into Spark

In [3]:
from utilities.spark_session import get_spark_session
from utilities.spark_session import load_pandas_df_parquet_into_spark_df

spark = get_spark_session(config['spark_config'])

sdf_arrays = load_pandas_df_parquet_into_spark_df(
    config['directory_output'] + '/' + config['filename_finalized_pandas'],
    spark,
    truncate_to_row_number = None, #10,
    n_processors_to_coalesce = config['n_processors_to_coalesce'],
)

sdf_arrays.write.mode('overwrite').parquet(config['directory_output'] + '/' + config['filename_conversion_to_spark'])

25/03/10 08:21:33 WARN Utils: Your hostname, emily-MS-7B96 resolves to a loopback address: 127.0.1.1; using 192.168.1.82 instead (on interface wlp5s0)
25/03/10 08:21:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/10 08:21:34 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  [UNSUPPORTED_DATA_TYPE_FOR_ARROW_CONVERSION] uint8 is not supported in conversion to Arrow.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
25/03/10 08:27:21 WARN TaskSetManager: Stage 0 contains a task of very large size (18671 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [4]:
sdf_arrays.show(5)
spark.stop()

25/03/10 08:27:26 WARN TaskSetManager: Stage 1 contains a task of very large size (18671 KiB). The maximum recommended task size is 1000 KiB.
25/03/10 08:27:30 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1 (TID 20): Attempting to kill Python Worker
                                                                                

+----------+-------+-------+-------+-------+------+-------------------+----------+-------+---------------+---------------------+--------------------+--------------------+------------------+
| timestamp|      o|      l|      h|      c|volume|        datetime_tz|weekday_tz|hour_tz|weekday_shifted|original_date_shifted|              Return|          Volatility|          lhc_mean|
+----------+-------+-------+-------+-------+------+-------------------+----------+-------+---------------+---------------------+--------------------+--------------------+------------------+
|1220599980|1.42861|1.42825|1.42876| 1.4284|    51|2008-09-05 00:33:00|         4|      3|              4|           2008-09-05|-2.10000000000043...|5.100000000000104E-4|           1.42847|
|1220600040|1.42845|1.42823|1.42863|1.42832|    57|2008-09-05 00:34:00|         4|      3|              4|           2008-09-05|-1.29999999999963...|3.999999999999559...|1.4283933333333334|
|1220600100|1.42837|1.42827|1.42857|1.42844|    26

In [5]:
from pyspark.sql.types import ArrayType, IntegerType, FloatType

In [6]:
def difference_an_array(the_array, seconds_divisor):
    return [int((y - x) / seconds_divisor) for x, y in zip(the_array[0:-1], the_array[1:])]

udf_difference_an_array = f.udf(difference_an_array, ArrayType(IntegerType()))

def argsort_an_array(the_array):
    the_sort_order = np.argsort(np.array(the_array))
    return [int(x) for x in the_sort_order]

udf_argsort_an_array = f.udf(argsort_an_array, ArrayType(IntegerType()))

def apply_argsort_integer(the_array, argsort_array):
    return [int(x) for x in np.array(the_array)[argsort_array]]

def apply_argsort_float(the_array, argsort_array):
    return [float(x) for x in np.array(the_array)[argsort_array]]

udf_apply_argsort_integer = f.udf(apply_argsort_integer, ArrayType(IntegerType()))
udf_apply_argsort_float = f.udf(apply_argsort_float, ArrayType(FloatType()))

In [7]:
def task_pivot_and_sort_arrays(**config):

    spark = get_spark_session(config['spark_config'])
    sdf_arrays = (

        #
        # load
        #
        spark.read.parquet(config['directory_output'] + '/' + config['filename_conversion_to_spark'])
        
        #
        # pivot
        #
        .select('original_date_shifted', 'timestamp', 'Return', 'Volatility', 'lhc_mean', 'volume')
        .withColumnRenamed('original_date_shifted', 'date_post_shift')
        .orderBy('timestamp')
        .groupBy('date_post_shift')
        .agg(
            f.collect_list('timestamp').alias('timestamp_array'),
            f.collect_list('Return').alias('return_array'),
            f.collect_list('Volatility').alias('volatility_array'),
            f.collect_list('lhc_mean').alias('lhc_mean_array'),
            f.collect_list('volume').alias('volume_array'),
        )

        #
        # ensure proper sorting (probably not necessary but insurance)
        #
        .withColumn(
            'timestamp_argsort',
            udf_argsort_an_array(f.col('timestamp_array'))
        )
        .withColumn('sorted_timestamp_array', udf_apply_argsort_integer(f.col('timestamp_array'), f.col('timestamp_argsort')))
        .withColumn('sorted_return_array', udf_apply_argsort_float(f.col('return_array'), f.col('timestamp_argsort')))
        .withColumn('sorted_volatility_array', udf_apply_argsort_float(f.col('volatility_array'), f.col('timestamp_argsort')))
        .withColumn('sorted_lhc_mean_array', udf_apply_argsort_float(f.col('lhc_mean_array'), f.col('timestamp_argsort')))
        .withColumn('sorted_volume_array', udf_apply_argsort_float(f.col('volume_array'), f.col('timestamp_argsort')))

        .drop('timestamp_array', 'return_array', 'volatility_array', 'lhc_mean_array', 'volume_array')
        .orderBy('date_post_shift')
    )
    sdf_arrays.write.mode('overwrite').parquet(config['directory_output'] + '/' + config['filename_pivot_and_sort'])
    spark.stop()

def task_diff_the_timestamp_arrays(**config):
    spark = get_spark_session(config['spark_config'])
    sdf_arrays = (
        spark.read.parquet(config['directory_output'] + '/' + config['filename_pivot_and_sort'])
        .withColumn('seconds_divisor', f.lit(config['seconds_divisor']))
        .withColumn(
            'diff_sorted_timestamp_array',
            udf_difference_an_array(
                f.col('sorted_timestamp_array'),
                f.col('seconds_divisor'),
            )
        )
        .drop('seconds_divisor')
        .orderBy('date_post_shift')
    )
    sdf_arrays.write.mode('overwrite').parquet(config['directory_output'] + '/' + config['filename_timestamp_diff'])
    spark.stop()

In [8]:
task_pivot_and_sort_arrays(**config)

                                                                                

In [9]:
task_diff_the_timestamp_arrays(**config)

                                                                                