In [1]:
import numpy as np
import pandas as pd
import os
import tensorflow as tf
from datetime import datetime
from datetime import timedelta
import re
from sklearn.preprocessing import MinMaxScaler
from sklearn.impute import SimpleImputer
import matplotlib.pyplot as plt
import support.ts_class as ts_class
import support.load_and_process_data as lpdata
import sys

In [2]:
import pyspark.sql.functions as F
from pyspark.sql import Window

In [3]:
import warnings
warnings.filterwarnings('ignore')

In [4]:
from importlib import reload
ts_class = reload(ts_class)
lpdata = reload(lpdata)

In [5]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()



:: loading settings :: url = jar:file:/opt/anaconda3/envs/traiding_spark_delta/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/esak/.ivy2/cache
The jars for the packages stored in: /Users/esak/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9d9813c8-4557-45b4-a054-158b5427617e;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.2.1 in central
	found io.delta#delta-storage;1.2.1 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in central
:: resolution report :: resolve 245ms :: artifacts dl 10ms
	:: modules in use:
	io.delta#delta-core_2.12;1.2.1 from central in [default]
	io.delta#delta-storage;1.2.1 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evic

In [6]:
from delta.tables import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import pyspark.pandas as ps



In [7]:
datasets_path = "/Users/esak/Projects/stock_traiding/forecast_indices/datasets/"
train_data_path = "/Users/esak/Projects/stock_traiding/forecast_indices/train_data/"

In [8]:
TODAY = datetime.today().strftime('%Y-%m-%d')
START_DATE = '2009-01-01 00:00:00'
END_DATE = TODAY+' 23:59:59'

In [9]:
date_range = pd.DataFrame(data=[], index=pd.date_range(START_DATE,END_DATE, freq="min",name="Datetime")).reset_index()

In [10]:
date_range["Datetime"] = pd.Series(date_range["Datetime"].dt.to_pydatetime(), dtype=object)

In [11]:
date_range.head()

Unnamed: 0,Datetime
0,2009-01-01 00:00:00
1,2009-01-01 00:01:00
2,2009-01-01 00:02:00
3,2009-01-01 00:03:00
4,2009-01-01 00:04:00


In [12]:
date_range.dtypes

Datetime    object
dtype: object

In [13]:
date_range = ps.from_pandas(date_range).to_spark()

In [22]:
w_forward = Window.partitionBy().orderBy('Datetime').rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [28]:
def prepare_stock_data(data_name, source_data="EMPTY"):
    if source_data == "EMPTY":
        source_data = data_name
    results = spark.read.format("delta").load(datasets_path+source_data).select(col("Datetime"), col("Close"), col("Volume"))
    results = date_range.join(results, on = "Datetime", how = "left")
    results = results.withColumn('Close',F.last('Close', ignorenulls=True).over(w_forward))
    results = results.withColumn('Volume',F.last('Volume', ignorenulls=True).over(w_forward))
    results = results.fillna(0)
    results.coalesce(1)
    results.write.format("delta").mode("overwrite").save(train_data_path+data_name)

In [24]:
tickers = ['SP500','30Y_BOND','10Y_NOTE','GOLD']

In [26]:
for ticker in tickers:
    prepare_stock_data(ticker)

22/11/01 10:22:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:22:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:22:18 WARN TaskSetManager: Stage 353 contains a task of very large size (14108 KiB). The maximum recommended task size is 1000 KiB.
22/11/01 10:22:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:22:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:22:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:22:40 WARN TaskMemo

In [29]:
prepare_stock_data("MSFT","stocks_per_minute")

22/11/01 10:29:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:29:32 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:29:32 WARN TaskSetManager: Stage 435 contains a task of very large size (14108 KiB). The maximum recommended task size is 1000 KiB.
22/11/01 10:29:35 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:29:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:29:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/11/01 10:29:39 WARN TaskMemo

In [14]:
econ_data =  spark.read.format("delta").load(datasets_path+"economic_data")

                                                                                

In [None]:
query = "DELETE \
FROM delta.`{}`\
WHERE Event LIKE '%Speaks%'"\
.format(datasets_path+"economic_data")

In [None]:
spark.sql(query).show()

In [15]:
query = "SELECT DISTINCT Event \
FROM delta.`{}`"\
.format(datasets_path+"economic_data")

In [16]:
distinct_events = spark.sql(query)

In [20]:
prepare_econ_train_data(distinct_events.collect())

'Distillate Fuel Production'

In [30]:
def prepare_econ_train_data(events):
    for count, i in enumerate(events):
        event = i[0]
        sys.stdout.write('\r'+str(count)+' '+event)
        event_data = econ_data.filter(col("Event") == event).select(col("Datetime"),col("Actual"))
        event_data = date_range.join(event_data, on = "Datetime", how = "left")
        event_data = event_data.withColumn('Actual',F.last('Actual', ignorenulls=True).over(w_forward))
        event_data = event_data.fillna(0)
        event_data.coalesce(1)
        event = event.replace("(","").replace(")","").replace("&","").replace("/","").strip()
        event_data.write.format("delta").mode("overwrite").save(train_data_path+event)