In [18]:
import yfinance as yf
from functools import reduce 
import datetime
import time


'''
Suppose we want all the minute-by-minute data for Google stocks for the past 3 full weeks
We want to normalize the data and get a rolling average on the scale of 10, 30, 60 minutes
'''



rawDataT1 = time.time()
fri_to_mon = datetime.timedelta(days=4)
preivous_week = datetime.timedelta(days=7)
reference = datetime.datetime(day=19,month=6,year=2020)

ranges = []
for i in reversed(range(0,3)):
    start = reference-fri_to_mon-i*preivous_week
    end = reference-i*preivous_week
    ranges.append((start.strftime("%Y-%m-%d"),end.strftime("%Y-%m-%d")))
    
dfs = [
        yf.download(
            tickers="GOOG",
            start=_start,
            end=_end,
            period="1wk",
            interval="1m")
        for (_start,_end) in
        ranges
      ]

allData = reduce(lambda a,b: a.add(b,fill_value=0), dfs)

allData.reset_index(level=0,inplace=True)
allData.drop(allData.tail(1).index,inplace=True)

rawDataT2 = time.time()

dataGatherTime = rawDataT2-rawDataT1
print(dataGatherTime)

allData.to_csv(r'./GOOG.stock.csv')
print(allData.info(verbose=True))
allData

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
1.5332601070404053
<class 'pandas.core.frame.DataFrame'>
Int64Index: 4589 entries, 0 to 4588
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype                           
---  ------     --------------  -----                           
 0   Datetime   4589 non-null   datetime64[ns, America/New_York]
 1   Open       4589 non-null   float64                         
 2   High       4589 non-null   float64                         
 3   Low        4589 non-null   float64                         
 4   Close      4589 non-null   float64                         
 5   Adj Close  4589 non-null   float64                         
 6   Volume     4589 non-null   float64                         
dtypes: datetime64[ns, America/New_York](1), float64(6)
memory usage: 286.8 KB
None

Unnamed: 0,Datetime,Open,High,Low,Close,Adj Close,Volume
0,2020-06-01 09:30:00-04:00,1418.390015,1421.680054,1418.000000,1420.318970,1420.318970,49542.0
1,2020-06-01 09:31:00-04:00,1421.449951,1422.500000,1421.199951,1421.479980,1421.479980,1687.0
2,2020-06-01 09:32:00-04:00,1423.594971,1425.699951,1423.329956,1424.989990,1424.989990,16166.0
3,2020-06-01 09:33:00-04:00,1424.494019,1424.813965,1423.540039,1424.813965,1424.813965,4088.0
4,2020-06-01 09:34:00-04:00,1423.750000,1428.062500,1423.040039,1428.057495,1428.057495,5017.0
...,...,...,...,...,...,...,...
4584,2020-06-18 15:55:00-04:00,1434.959961,1435.650024,1434.770020,1435.010010,1435.010010,18386.0
4585,2020-06-18 15:56:00-04:00,1435.030029,1435.030029,1432.569946,1432.979980,1432.979980,13427.0
4586,2020-06-18 15:57:00-04:00,1433.199951,1433.750000,1432.739990,1433.562988,1433.562988,13254.0
4587,2020-06-18 15:58:00-04:00,1433.459961,1434.719971,1433.459961,1433.760010,1433.760010,17072.0


In [31]:
'''
If yfinance is broken, import the CSV and 
massage/convert data as needed.
'''
import pandas as pd
import datetime
allData = pd.read_csv(r'./GOOG.stock.csv')
allData[['Datetime']] = pd.to_datetime(allData['Datetime'],infer_datetime_format=True)
allData.pop(allData.columns[0])

allData.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4589 entries, 0 to 4588
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype                                 
---  ------     --------------  -----                                 
 0   Datetime   4589 non-null   datetime64[ns, pytz.FixedOffset(-240)]
 1   Open       4589 non-null   float64                               
 2   High       4589 non-null   float64                               
 3   Low        4589 non-null   float64                               
 4   Close      4589 non-null   float64                               
 5   Adj Close  4589 non-null   float64                               
 6   Volume     4589 non-null   float64                               
dtypes: datetime64[ns, pytz.FixedOffset(-240)](1), float64(6)
memory usage: 251.1 KB


In [14]:
allData.info(verbose=True)

<class 'pandas.core.frame.DataFrame'>
Int64Index: 4589 entries, 0 to 4588
Data columns (total 7 columns):
 #   Column     Non-Null Count  Dtype                           
---  ------     --------------  -----                           
 0   Datetime   4589 non-null   datetime64[ns, America/New_York]
 1   Open       4589 non-null   float64                         
 2   High       4589 non-null   float64                         
 3   Low        4589 non-null   float64                         
 4   Close      4589 non-null   float64                         
 5   Adj Close  4589 non-null   float64                         
 6   Volume     4589 non-null   float64                         
dtypes: datetime64[ns, America/New_York](1), float64(6)
memory usage: 286.8 KB


In [36]:
'''
"Manual" data massaging
On this small trial, pandas is more simpler and quickly on a single system.
'''

import time
import pandas as pd
from sklearn import preprocessing

def massageData(row):
    dt = row.Datetime
    return (dt.weekofyear,dt.dayofweek,dt.hour*60+dt.minute-570,row.Open,row.High,row.Low,row.Close)


t1 = time.time()
arr = []

for r in allData.iloc:
    arr.append(massageData(r))
new_data = pd.DataFrame(arr,columns=['weekofyear','weekday','minute','open','high','low','close'])
scaler = preprocessing.MinMaxScaler()
new_data[['open','high','low','close']] = scaler.fit_transform(new_data[['open','high','low','close']])




columns_for_rolling = ['open','high','low','close']

new_data[["ten_ra_"+i for i in columns_for_rolling]] = new_data[columns_for_rolling].rolling(window=10).mean()
new_data[["thirty_ra_"+i for i in columns_for_rolling]] = new_data[columns_for_rolling].rolling(window=30).mean()
new_data[["sixty_ra_"+i for i in columns_for_rolling]] = new_data[columns_for_rolling].rolling(window=60).mean()

new_data = new_data[60 < new_data.minute]
new_data = new_data[new_data.minute < 389-60]

t2 = time.time()
print("Time to process {0:f}".format(t2-t1))

new_data

Time to process 1.018100


Unnamed: 0,weekofyear,weekday,minute,open,high,low,close,ten_ra_open,ten_ra_high,ten_ra_low,ten_ra_close,thirty_ra_open,thirty_ra_high,thirty_ra_low,thirty_ra_close,sixty_ra_open,sixty_ra_high,sixty_ra_low,sixty_ra_close
61,23,0,61,0.532083,0.528962,0.536862,0.545826,0.522950,0.516768,0.525483,0.530190,0.537165,0.531970,0.534707,0.541870,0.517814,0.514551,0.514961,0.524231
62,23,0,62,0.542214,0.540878,0.544701,0.548862,0.524941,0.520045,0.528752,0.534070,0.537200,0.532419,0.535413,0.542699,0.519967,0.516543,0.517126,0.526177
63,23,0,63,0.549203,0.536398,0.546913,0.548840,0.528409,0.521873,0.531444,0.535434,0.538038,0.533201,0.536532,0.543411,0.522063,0.518637,0.519288,0.528157
64,23,0,64,0.530918,0.527890,0.536253,0.544775,0.528747,0.523314,0.531760,0.536847,0.538299,0.533518,0.536940,0.544107,0.523998,0.519943,0.521369,0.529438
65,23,0,65,0.537988,0.524161,0.543258,0.540782,0.530030,0.523704,0.533017,0.537399,0.538631,0.533251,0.537263,0.543834,0.525282,0.521090,0.522959,0.530593
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4523,25,3,324,0.498312,0.489282,0.503690,0.504962,0.506497,0.496323,0.507968,0.507764,0.519540,0.509737,0.520602,0.521514,0.533846,0.523926,0.535176,0.536093
4524,25,3,325,0.498428,0.483889,0.499941,0.501664,0.504669,0.494473,0.506726,0.506932,0.518631,0.508676,0.519606,0.520657,0.532198,0.522243,0.533572,0.534531
4525,25,3,326,0.498370,0.487613,0.503748,0.502284,0.503859,0.493412,0.506000,0.506297,0.517705,0.507861,0.518694,0.519777,0.530623,0.520698,0.532199,0.533035
4526,25,3,327,0.499402,0.484678,0.504216,0.501868,0.502804,0.492237,0.505362,0.505660,0.516857,0.506810,0.517822,0.518729,0.529032,0.519070,0.530607,0.531448


In [80]:
'''
Pyspark data massaging
Pyspark is more closer to enterprise level software.
It shows that this is more akin to Java and C#.
Pandas is better for quick projects, but Pyspark likely scales better.
'''

import time
import pandas as pd
import pyspark
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.ml.feature import VectorAssembler,MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
import pyspark.sql.functions as F



setup_t1 = time.time()
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf=conf)
setup_t2 = time.time()

def massageData(row):
    dt = row.Datetime
    weekyear = dt.isocalendar()[1]
    return (weekyear,dt.weekday(),dt.hour*60+dt.minute,row.Open,row.High,row.Low,row.Close)

t1 = time.time()
spark = pyspark.sql.SparkSession.builder.getOrCreate()
rdd = spark.createDataFrame(allData).collect()
spark_df = sc.parallelize(rdd,4).map(massageData).toDF(schema=['weekofyear','weekday','minute','open','high','low','close'])

minMinute = spark_df.agg({'minute': 'min'}).collect()[0][0]

spark_df = spark_df.withColumn('minute', spark_df.minute-minMinute)




#Normalization code.
def normalizeColumn(df,col):
    return Pipeline(stages=[
            VectorAssembler(inputCols=[col],outputCol=col+"_vect"),
            MinMaxScaler(inputCol=col+"_vect",outputCol=col+"_scaled")
        ]).fit(df).transform(df).drop(col).drop(col+"_vect")


spark_df = normalizeColumn(spark_df,'open')
spark_df = normalizeColumn(spark_df,'high')
spark_df = normalizeColumn(spark_df,'low')
spark_df = normalizeColumn(spark_df,'close')



'''
#Rolling average code. Its a relative mess to even do just one column.
select_cols = ['weekofyear','weekday','minute','open','ten_ra_open']
w = Window.partitionBy(['weekofyear','weekday']).orderBy('minute').rangeBetween(-10,0)
spark_df_with_rolling_average = spark_df.withColumn('ten_ra_open', F.avg('open').over(w))
spark_df_with_rolling_average.select(select_cols).orderBy('weekofyear','weekday','minute').show()
'''

t2 = time.time()

print("Time to initialize {0:f}".format(setup_t2-setup_t1))
print("Time to process {0:f}".format(t2-t1))

'''
Times to just massage data without rolling averages is already around 1.6 seconds. 50% longer than pandas.
Again, pandas does not have the ability to scale.
'''

Time to initialize 0.001728
Time to process 1.683202
