In [1]:
import os
import requests
import pandas as pd
import asyncio
import time
import datetime as dt

from dateutil import parser
from dotenv import load_dotenv
from json.decoder import JSONDecodeError
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

In [14]:
# Build paths inside the project like this: BASE_DIR / 'subdir'.
# use this only in development
env_dir = Path('./').resolve()
load_dotenv(os.path.join(env_dir, '.env'))

True

In [15]:
POLYGON_API_KEY = os.environ.get('POLYGON_API_KEY')

# Extraction

#### Specify start and end date of stock records you want to retrieve 

In [None]:
def reformat_date(date: dt.datetime | str):
    datetime_obj = parser.parse(date)
    year = datetime_obj.year
    month = datetime_obj.month
    day = datetime_obj.day
    combined_date = f'{day} {month} {year}'

    # this is the format we need to use if we are going to make
    # api requests to polygon api
    reformed_date = dt.datetime.strptime(combined_date, '%d %m %Y').strftime('%Y-%m-%d')

    return reformed_date

# Stocks

#### Main url parts

In [5]:
start_date = 'january 1 2024'
end_date = 'january 1 2025'

In [6]:
stocks_ticker = "AAPL"
multiplier = 1
timespan = "day"
start_date_reformed = reformat_date(start_date)
end_date_reformed = reformat_date(end_date)

#### Parameters

In [7]:
params = {
    "adjusted": True,
    "sort": "asc",
    # "limit": 1,
}    

In [8]:
headers = {
    "Authorization": f"Bearer {POLYGON_API_KEY}",
    "Content-Type": "application/json"
}

In [9]:
url = f"https://api.polygon.io/v2/aggs/ticker/{stocks_ticker}/range/{multiplier}/{timespan}/{start_date_reformed}/{end_date_reformed}"
response = requests.get(url, params=params, headers=headers)
data = response.json()
data

{'ticker': 'AAPL',
 'queryCount': 252,
 'resultsCount': 252,
 'adjusted': True,
 'results': [{'v': 81964874.0,
   'vw': 185.9465,
   'o': 187.15,
   'c': 185.64,
   'h': 188.44,
   'l': 183.885,
   't': 1704171600000,
   'n': 1008871},
  {'v': 58414460.0,
   'vw': 184.3226,
   'o': 184.22,
   'c': 184.25,
   'h': 185.88,
   'l': 183.43,
   't': 1704258000000,
   'n': 656853},
  {'v': 71878670.0,
   'vw': 182.0183,
   'o': 182.15,
   'c': 181.91,
   'h': 183.0872,
   'l': 180.88,
   't': 1704344400000,
   'n': 712692},
  {'v': 62371161.0,
   'vw': 181.474,
   'o': 181.99,
   'c': 181.18,
   'h': 182.76,
   'l': 180.17,
   't': 1704430800000,
   'n': 682334},
  {'v': 59144470.0,
   'vw': 184.3702,
   'o': 182.085,
   'c': 185.56,
   'h': 185.6,
   'l': 181.5,
   't': 1704690000000,
   'n': 669173},
  {'v': 42841809.0,
   'vw': 184.3706,
   'o': 183.92,
   'c': 185.14,
   'h': 185.15,
   'l': 182.73,
   't': 1704776400000,
   'n': 538180},
  {'v': 46192908.0,
   'vw': 185.2509,
   'o': 18

#### 'volume', 'weighted_volume', 'opening_price', 'closing_price', 'highest_price', 'lowest_price', 'timestamp', 'transactions'

In [10]:
aapl_stocks = pd.DataFrame(data['results'])
aapl_stocks

Unnamed: 0,v,vw,o,c,h,l,t,n
0,81964874.0,185.9465,187.150,185.64,188.4400,183.885,1704171600000,1008871
1,58414460.0,184.3226,184.220,184.25,185.8800,183.430,1704258000000,656853
2,71878670.0,182.0183,182.150,181.91,183.0872,180.880,1704344400000,712692
3,62371161.0,181.4740,181.990,181.18,182.7600,180.170,1704430800000,682334
4,59144470.0,184.3702,182.085,185.56,185.6000,181.500,1704690000000,669173
...,...,...,...,...,...,...,...,...
247,22420512.0,257.4366,255.490,258.20,258.2100,255.290,1735016400000,245915
248,26226173.0,258.9280,258.190,259.02,260.1000,257.630,1735189200000,362309
249,40542367.0,255.2263,257.830,255.59,258.7000,253.060,1735275600000,474853
250,33166048.0,252.2717,252.230,252.20,253.5000,250.750,1735534800000,421590


In [11]:
aapl_stocks.dtypes

v     float64
vw    float64
o     float64
c     float64
h     float64
l     float64
t       int64
n       int64
dtype: object

In [12]:
aapl_stocks['t']

0      1704171600000
1      1704258000000
2      1704344400000
3      1704430800000
4      1704690000000
           ...      
247    1735016400000
248    1735189200000
249    1735275600000
250    1735534800000
251    1735621200000
Name: t, Length: 252, dtype: int64

#### Note the timestamp 1735534800000 alone will give `OSError: [Errno 22] Invalid argument` error however if we divide by a 1000 first then it will not only run without error but will give us now the correct timestamp and can now be converted to a datetime object

In [13]:
dt.datetime.fromtimestamp(1735534800000 / 1000)

datetime.datetime(2024, 12, 30, 13, 0)

In [14]:
aapl_stocks['t'].apply(lambda timestamp: dt.datetime.fromtimestamp(timestamp / 1000))

0     2024-01-02 13:00:00
1     2024-01-03 13:00:00
2     2024-01-04 13:00:00
3     2024-01-05 13:00:00
4     2024-01-08 13:00:00
              ...        
247   2024-12-24 13:00:00
248   2024-12-26 13:00:00
249   2024-12-27 13:00:00
250   2024-12-30 13:00:00
251   2024-12-31 13:00:00
Name: t, Length: 252, dtype: datetime64[ns]

In [15]:
aapl_stocks.to_csv(f'./data/aapl_stocks_{multiplier}{timespan}.csv')

# Forex

In [16]:
start_date = 'january 1 2024'
end_date = 'january 1 2025'

In [17]:
forex_ticker = "C:USDPHP"
multiplier = 4
timespan = "hour"
start_date_reformed = reformat_date(start_date)
end_date_reformed = reformat_date(end_date)
start_date_reformed

'2024-01-01'

In [18]:
end_date_reformed

'2025-01-01'

In [19]:
params = {
    "adjusted": True,
    "sort": "asc",
    # "limit": 1,   
}    

In [20]:
headers = {
    "Authorization": f"Bearer {POLYGON_API_KEY}",
    "Content-Type": "application/json"
}

#### Polygon.io caveat on using free tier
because we are using a free tier of polygon API we can only have 5 API requests per minute, and after that the `response.json()` will return `{'status': 'ERROR', 'request_id': 'd8b699aad576ab7d765401d5ad767c15', 'error': "You've exceeded the maximum requests per minute, please wait or upgrade your subscription to continue. https://polygon.io/pricing"}`

In [21]:
url = f"https://api.polygon.io/v2/aggs/ticker/{forex_ticker}/range/{multiplier}/{timespan}/{start_date_reformed}/{end_date_reformed}"
data_batches = []
interval = 5
start = 0
while True:
    response = requests.get(url, params=params, headers=headers)
    data_batch = response.json()
    print(data_batch)
    if not "next_url" in data_batch:
        break

    df = pd.DataFrame(data_batch['results'])
    data_batches.append(df)
    url = data_batch['next_url']

    # sleep for 1 minute to avoid rate limiting
    if (start + 1) % interval == 0:
        time.sleep(60)

    # increment after 60 seconds
    start += 1

{'ticker': 'C:USDPHP', 'queryCount': 5000, 'resultsCount': 44, 'adjusted': True, 'results': [{'v': 1, 'vw': 55.388, 'o': 55.388, 'c': 55.388, 'h': 55.388, 'l': 55.388, 't': 1704067200000, 'n': 1}, {'v': 1, 'vw': 55.388, 'o': 55.388, 'c': 55.388, 'h': 55.388, 'l': 55.388, 't': 1704139200000, 'n': 1}, {'v': 533, 'vw': 55.5054, 'o': 55.405, 'c': 55.573, 'h': 55.586, 'l': 55.242, 't': 1704153600000, 'n': 533}, {'v': 473, 'vw': 55.5836, 'o': 55.58, 'c': 55.672, 'h': 55.685, 'l': 55.445, 't': 1704168000000, 'n': 473}, {'v': 942, 'vw': 55.6157, 'o': 55.679, 'c': 55.618, 'h': 55.683, 'l': 55.505, 't': 1704182400000, 'n': 942}, {'v': 662, 'vw': 55.6114, 'o': 55.619, 'c': 55.592, 'h': 55.662, 'l': 55.48, 't': 1704196800000, 'n': 662}, {'v': 155, 'vw': 55.5755, 'o': 55.589, 'c': 55.583, 'h': 55.6, 'l': 55.48, 't': 1704211200000, 'n': 155}, {'v': 52, 'vw': 55.5663, 'o': 55.588, 'c': 55.563, 'h': 55.6, 'l': 55.48, 't': 1704225600000, 'n': 52}, {'v': 504, 'vw': 55.7318, 'o': 55.564, 'c': 55.681, 'h'

In [22]:
len(data_batches)

38

In [23]:
data_batches

[       v       vw       o       c       h       l              t     n
 0      1  55.3880  55.388  55.388  55.388  55.388  1704067200000     1
 1      1  55.3880  55.388  55.388  55.388  55.388  1704139200000     1
 2    533  55.5054  55.405  55.573  55.586  55.242  1704153600000   533
 3    473  55.5836  55.580  55.672  55.685  55.445  1704168000000   473
 4    942  55.6157  55.679  55.618  55.683  55.505  1704182400000   942
 5    662  55.6114  55.619  55.592  55.662  55.480  1704196800000   662
 6    155  55.5755  55.589  55.583  55.600  55.480  1704211200000   155
 7     52  55.5663  55.588  55.563  55.600  55.480  1704225600000    52
 8    504  55.7318  55.564  55.681  55.805  55.550  1704240000000   504
 9    798  55.6446  55.679  55.583  55.739  55.480  1704254400000   798
 10  1012  55.6643  55.558  55.751  55.825  54.920  1704268800000  1012
 11   684  55.7647  55.754  55.802  55.845  55.593  1704283200000   684
 12   547  55.7301  55.815  55.738  55.838  55.600  170429760000

In [24]:
usd_php_forex = pd.concat(data_batches, ignore_index=True, axis=0)
usd_php_forex

Unnamed: 0,v,vw,o,c,h,l,t,n
0,1,55.3880,55.388,55.388,55.388,55.388,1704067200000,1
1,1,55.3880,55.388,55.388,55.388,55.388,1704139200000,1
2,533,55.5054,55.405,55.573,55.586,55.242,1704153600000,533
3,473,55.5836,55.580,55.672,55.685,55.445,1704168000000,473
4,942,55.6157,55.679,55.618,55.683,55.505,1704182400000,942
...,...,...,...,...,...,...,...,...
1559,805,59.0058,59.047,58.965,59.176,58.844,1734624000000,805
1560,153,58.8926,58.966,58.834,59.040,58.744,1734638400000,153
1561,524,58.7982,58.840,58.897,59.076,56.597,1734652800000,524
1562,739,58.8458,58.906,58.802,58.958,58.665,1734667200000,739


In [25]:
usd_php_forex.to_csv(f'./data/usd_php_forex_{multiplier}{timespan}.csv')

# Transformation

#### Apache Spark (PySpark) over Pandas for data manipulation/transformation 

In [159]:
from pyspark.sql import SparkSession

In [160]:
spark = SparkSession.builder.appName('feature-engineering').getOrCreate()
spark

if `PySparkRuntimeError: Java gateway process exited before sending its port number windows` error occurs one possible reason is JAVA_HOME is not set because java is not installed.

if `Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext. : java.lang.UnsupportedOperationException: getSubject is supported only if a security manager is allowed` error occurs one possible reason is because you may be using a version of the java compiler than that which is currently used by the latest version of apache spark. For instance you may have jdk-23 installed but apache spark latest release uses jdk-17 so delete jdk-23 and switch to jdk-17. Current Spark release documentation as of january 2025 says spark runs on jdk-17.

In [161]:
# turn inferSchema to True so as to allow pyspark to imitate the same data types in the csv
# as opposed to turning all the columns to string data types
# DataFrame[_c0: string, v: string, vw: string, o: string, c: string, h: string, l: string, t: string, n: string]
usd_php_forex_4h_spark_df = spark.read.csv(f'./include/data/usd_php_forex_{multiplier}{timespan}.csv', header=True, inferSchema=True)
usd_php_forex_4h_spark_df

DataFrame[_c0: int, v: int, vw: double, o: double, c: double, h: double, l: double, t: bigint, n: int]

In [162]:
usd_php_forex_4h_spark_df.show()

+---+----+-------+------+------+------+------+-------------+----+
|_c0|   v|     vw|     o|     c|     h|     l|            t|   n|
+---+----+-------+------+------+------+------+-------------+----+
|  0|   1| 55.388|55.388|55.388|55.388|55.388|1704067200000|   1|
|  1|   1| 55.388|55.388|55.388|55.388|55.388|1704139200000|   1|
|  2| 533|55.5054|55.405|55.573|55.586|55.242|1704153600000| 533|
|  3| 473|55.5836| 55.58|55.672|55.685|55.445|1704168000000| 473|
|  4| 942|55.6157|55.679|55.618|55.683|55.505|1704182400000| 942|
|  5| 662|55.6114|55.619|55.592|55.662| 55.48|1704196800000| 662|
|  6| 155|55.5755|55.589|55.583|  55.6| 55.48|1704211200000| 155|
|  7|  52|55.5663|55.588|55.563|  55.6| 55.48|1704225600000|  52|
|  8| 504|55.7318|55.564|55.681|55.805| 55.55|1704240000000| 504|
|  9| 798|55.6446|55.679|55.583|55.739| 55.48|1704254400000| 798|
| 10|1012|55.6643|55.558|55.751|55.825| 54.92|1704268800000|1012|
| 11| 684|55.7647|55.754|55.802|55.845|55.593|1704283200000| 684|
| 12| 547|

In [163]:
usd_php_forex_4h_spark_df.createOrReplaceTempView("usd_php_forex")

In [164]:
result = spark.sql("""
    SELECT v AS volume, o AS open, c AS close, h AS high, l AS low, t AS timestamp, n AS transactions FROM usd_php_forex;
""")

In [165]:
result.show()

+------+------+------+------+------+-------------+------------+
|volume|  open| close|  high|   low|    timestamp|transactions|
+------+------+------+------+------+-------------+------------+
|     1|55.388|55.388|55.388|55.388|1704067200000|           1|
|     1|55.388|55.388|55.388|55.388|1704139200000|           1|
|   533|55.405|55.573|55.586|55.242|1704153600000|         533|
|   473| 55.58|55.672|55.685|55.445|1704168000000|         473|
|   942|55.679|55.618|55.683|55.505|1704182400000|         942|
|   662|55.619|55.592|55.662| 55.48|1704196800000|         662|
|   155|55.589|55.583|  55.6| 55.48|1704211200000|         155|
|    52|55.588|55.563|  55.6| 55.48|1704225600000|          52|
|   504|55.564|55.681|55.805| 55.55|1704240000000|         504|
|   798|55.679|55.583|55.739| 55.48|1704254400000|         798|
|  1012|55.558|55.751|55.825| 54.92|1704268800000|        1012|
|   684|55.754|55.802|55.845|55.593|1704283200000|         684|
|   547|55.815|55.738|55.838|  55.6|1704

#### because the column t is a timestamp in unix format we need to convert it to a datetime type instead

In [166]:
result = spark.sql("""
    SELECT CAST(FROM_UNIXTIME(t / 1000) AS TIMESTAMP) AS new_datetime FROM usd_php_forex;
""")

In [167]:
result.show()

+-------------------+
|       new_datetime|
+-------------------+
|2024-01-01 08:00:00|
|2024-01-02 04:00:00|
|2024-01-02 08:00:00|
|2024-01-02 12:00:00|
|2024-01-02 16:00:00|
|2024-01-02 20:00:00|
|2024-01-03 00:00:00|
|2024-01-03 04:00:00|
|2024-01-03 08:00:00|
|2024-01-03 12:00:00|
|2024-01-03 16:00:00|
|2024-01-03 20:00:00|
|2024-01-04 00:00:00|
|2024-01-04 04:00:00|
|2024-01-04 08:00:00|
|2024-01-04 12:00:00|
|2024-01-04 16:00:00|
|2024-01-04 20:00:00|
|2024-01-05 00:00:00|
|2024-01-05 04:00:00|
+-------------------+
only showing top 20 rows



#### Shows the last 5 rows of the query

In [168]:
result.tail(5)

[Row(new_datetime=datetime.datetime(2024, 12, 20, 0, 0)),
 Row(new_datetime=datetime.datetime(2024, 12, 20, 4, 0)),
 Row(new_datetime=datetime.datetime(2024, 12, 20, 8, 0)),
 Row(new_datetime=datetime.datetime(2024, 12, 20, 12, 0)),
 Row(new_datetime=datetime.datetime(2024, 12, 20, 16, 0))]

In [169]:
new_df = spark.sql("""
    WITH new_df AS (SELECT 
        v AS volume, 
        vw AS volume_weighted, 
        o AS opening_price,
        c AS closing_price,
        h AS highest_price,
        l AS lowest_price,
        t AS timestamp,
        n AS transactions,
        CAST(FROM_UNIXTIME(t / 1000) AS TIMESTAMP) AS new_datetime 
    FROM usd_php_forex)
                   
    SELECT * FROM new_df;
""")

In [170]:
new_df.show()

+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+
|volume|volume_weighted|opening_price|closing_price|highest_price|lowest_price|    timestamp|transactions|       new_datetime|
+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704067200000|           1|2024-01-01 08:00:00|
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704139200000|           1|2024-01-02 04:00:00|
|   533|        55.5054|       55.405|       55.573|       55.586|      55.242|1704153600000|         533|2024-01-02 08:00:00|
|   473|        55.5836|        55.58|       55.672|       55.685|      55.445|1704168000000|         473|2024-01-02 12:00:00|
|   942|        55.6157|       55.679|       55.618|       55.683|      55.505|1704182400000|         942|2024-

In [171]:
new_df.tail(5)

[Row(volume=805, volume_weighted=59.0058, opening_price=59.047, closing_price=58.965, highest_price=59.176, lowest_price=58.844, timestamp=1734624000000, transactions=805, new_datetime=datetime.datetime(2024, 12, 20, 0, 0)),
 Row(volume=153, volume_weighted=58.8926, opening_price=58.966, closing_price=58.834, highest_price=59.04, lowest_price=58.744, timestamp=1734638400000, transactions=153, new_datetime=datetime.datetime(2024, 12, 20, 4, 0)),
 Row(volume=524, volume_weighted=58.7982, opening_price=58.84, closing_price=58.897, highest_price=59.076, lowest_price=56.597, timestamp=1734652800000, transactions=524, new_datetime=datetime.datetime(2024, 12, 20, 8, 0)),
 Row(volume=739, volume_weighted=58.8458, opening_price=58.906, closing_price=58.802, highest_price=58.958, lowest_price=58.665, timestamp=1734667200000, transactions=739, new_datetime=datetime.datetime(2024, 12, 20, 12, 0)),
 Row(volume=1162, volume_weighted=58.7964, opening_price=58.804, closing_price=58.769, highest_price=

#### features to engineer
* 1st derivative of signals
* 2nd derivative of signals
* <s>max</s>
* <s>min</s>
* <s>mean</s>
* <s>median</s>
* <s>standard deviation</s>
* <s>range</s>
* shannon entropy
* <s>skewness</s>
* <s>kurtosis</s>

1. calculate median: https://subhralina.medium.com/5-ways-to-calculate-median-in-sql-cffba38aa945
2. moving median: https://www.essentialsql.com/calculate-moving-median-in-sql/
3. moving median using user defined functions: https://stackoverflow.com/questions/46767807/how-to-calculate-rolling-median-in-pyspark-using-window
4. https://stackoverflow.com/questions/76760672/median-over-window-function-is-not-supported

In [172]:
# calculating moving average or mean of a window of 24 hours, 
# because each row is 4 hours and 24 hours is basically 6 
# samples of the data, we want to calculate the moving average
# of 6 rows as it is 24 hours 
result = spark.sql("""
    WITH trans_1 AS (SELECT 
        v AS volume, 
        vw AS volume_weighted, 
        o AS opening_price,
        c AS closing_price,
        h AS highest_price,
        l AS lowest_price,
        t AS timestamp,
        n AS transactions,
        CAST(FROM_UNIXTIME(t / 1000) AS TIMESTAMP) AS new_datetime 
    FROM usd_php_forex),

    trans_2 AS (SELECT
        *,
        MAX(closing_price) OVER(ORDER BY new_datetime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS moving_max_close,
        MIN(closing_price) OVER(ORDER BY new_datetime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS moving_min_close,
        AVG(closing_price) OVER(ORDER BY new_datetime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS moving_avg_close
    FROM trans_1),
                   
    trans_3 AS (SELECT
        *,
        (moving_max_close - moving_min_close) AS range_close
    FROM trans_2),
                   
    trans_4 AS (SELECT
        *,
        PERCENTILE(closing_price, 0.5) OVER(ORDER BY new_datetime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS moving_median_close   
    FROM trans_3)
                   
    SELECT * FROM trans_4;
""")
result.show()

+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+----------------+----------------+------------------+-------------------+-------------------+
|volume|volume_weighted|opening_price|closing_price|highest_price|lowest_price|    timestamp|transactions|       new_datetime|moving_max_close|moving_min_close|  moving_avg_close|        range_close|moving_median_close|
+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+----------------+----------------+------------------+-------------------+-------------------+
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704067200000|           1|2024-01-01 08:00:00|          55.388|          55.388|            55.388|                0.0|             55.388|
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704139200000|           1|2024-01-02 04:

if we were to calculate a 7 day moving average we use `AVG(closing_price) OVER(ORDER BY new_datetime ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS moving_avg_close`
```
|-----|
|  1  | 1st row
|  2  | 2nd row
|  3  | 3rd row
|  4  | 4th row
|  5  | 5th row
|  6  | 6th row
|  7  | current row
|  8  |
|-----|
```
rows 1 to 6 are the preceding rows and current row is the 7th, and what we basically say 

#### However other features like median, standard deviation, shannon entropy, kurtosis, skewness involve more complex calculations

In [152]:
# calculating moving average or mean of a window of 24 hours, 
# because each row is 4 hours and 24 hours is basically 6 
# samples of the data, we want to calculate the moving average
# of 6 rows as it is 24 hours 
result = spark.sql("""
    
                   
    WITH trans_1 AS (SELECT 
        v AS volume, 
        vw AS volume_weighted, 
        o AS opening_price,
        c AS closing_price,
        h AS highest_price,
        l AS lowest_price,
        t AS timestamp,
        n AS transactions,
        CAST(FROM_UNIXTIME(t / 1000) AS TIMESTAMP) AS new_datetime 
    FROM usd_php_forex),

           
    trans_2 AS (SELECT
        *,
        PERCENTILE(closing_price, 0.5) OVER(ORDER BY new_datetime ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS moving_median_close
    FROM trans_1)
                   
    SELECT * FROM trans_2;
""")
result.show()

+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+-------------------+
|volume|volume_weighted|opening_price|closing_price|highest_price|lowest_price|    timestamp|transactions|       new_datetime|moving_median_close|
+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+-------------------+
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704067200000|           1|2024-01-01 08:00:00|             55.388|
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704139200000|           1|2024-01-02 04:00:00|             55.388|
|   533|        55.5054|       55.405|       55.573|       55.586|      55.242|1704153600000|         533|2024-01-02 08:00:00|             55.388|
|   473|        55.5836|        55.58|       55.672|       55.685|      55.445|1704168000000|         473|2024-01-02 1

In [117]:
result.tail(5)

[Row(volume=981, volume_weighted=58.981, opening_price=59.012, closing_price=59.034, highest_price=59.161, lowest_price=58.377, timestamp=1734595200000, transactions=981, new_datetime=datetime.datetime(2024, 12, 19, 16, 0), quartile=4),
 Row(volume=1550, volume_weighted=59.0116, opening_price=59.034, closing_price=59.043, highest_price=59.165, lowest_price=57.266, timestamp=1734609600000, transactions=1550, new_datetime=datetime.datetime(2024, 12, 19, 20, 0), quartile=4),
 Row(volume=488, volume_weighted=58.94, opening_price=58.97, closing_price=59.059, highest_price=59.07, lowest_price=56.332, timestamp=1734494400000, transactions=488, new_datetime=datetime.datetime(2024, 12, 18, 12, 0), quartile=4),
 Row(volume=1519, volume_weighted=58.9584, opening_price=58.776, closing_price=59.178, highest_price=59.199, lowest_price=58.072, timestamp=1734537600000, transactions=1519, new_datetime=datetime.datetime(2024, 12, 19, 0, 0), quartile=4),
 Row(volume=270, volume_weighted=59.1372, opening_

In [134]:
# calculating moving average or mean of a window of 24 hours, 
# because each row is 4 hours and 24 hours is basically 6 
# samples of the data, we want to calculate the moving average
# of 6 rows as it is 24 hours 
result = spark.sql("""
    WITH trans_1 AS (SELECT 
        v AS volume, 
        vw AS volume_weighted, 
        o AS opening_price,
        c AS closing_price,
        h AS highest_price,
        l AS lowest_price,
        t AS timestamp,
        n AS transactions,
        CAST(FROM_UNIXTIME(t / 1000) AS TIMESTAMP) AS new_datetime 
    FROM usd_php_forex),

           
    trans_2 AS (SELECT
        *
    FROM (
        SELECT *, NTILE(4) OVER(ORDER BY closing_price) AS quartile FROM trans_1
    ) WHERE quartile = 2)
                   
    SELECT MAX(closing_price) AS median FROM trans_2;
""")
result.show()

+------+
|median|
+------+
| 57.31|
+------+



calculating skewness over a window: https://stackoverflow.com/questions/64967300/pyspark-data-skewness-with-window-functions
```
# w = Window.partitionBy('id').orderBy('timestamp')

# # ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
# window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
# window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)
```

In [247]:
from pyspark.sql.window import Window
from pyspark.sql.functions import skewness, kurtosis, stddev, udf, collect_list
from pyspark.sql.types import FloatType, IntegerType
from scipy.stats import entropy
import numpy as np

In [230]:
# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

# ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW which would be a 7 day moving standard deviation
window = Window.orderBy("new_datetime").rowsBetween(-5, Window.currentRow)

In [231]:
result.select(skewness("closing_price").over(window=window).alias('skew_close')).show()

+--------------------+
|          skew_close|
+--------------------+
|                NULL|
|                NULL|
|  0.7071067811865472|
| 0.23521492085871912|
| -0.1977323257964816|
|-0.46000779253145485|
| -1.2178312565589355|
|   1.032679651212858|
|  0.3335380952059178|
|  1.1448642131401856|
|  0.9211764524023685|
| 0.37625073100996637|
| -0.2529556276146163|
| -0.3885879155725848|
|-0.18915249339063764|
|-0.34690675260692705|
|  0.3085279053443559|
|  0.6501101031946914|
|  0.9497373075280433|
| 0.16514833368884574|
+--------------------+
only showing top 20 rows



In [232]:
result.select(stddev(result.closing_price).over(window=window).alias('std_close')).show()

+--------------------+
|           std_close|
+--------------------+
|                NULL|
|                 0.0|
| 0.10680979980008208|
| 0.14129254521500176|
| 0.13234500368355517|
| 0.12123984493556637|
| 0.09642406338668762|
| 0.03989695059358191|
|0.048602126153764386|
| 0.04202221634643605|
| 0.07412354551692849|
|  0.1001513854122846|
|  0.0960701132853828|
| 0.07628542892759109|
| 0.09495981606272627|
| 0.10937900468859148|
| 0.11503158986411453|
|  0.0836795474812494|
| 0.05274466797696325|
| 0.02884441020370986|
+--------------------+
only showing top 20 rows



In [225]:
result.show()

+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+----------------+----------------+------------------+-------------------+-------------------+
|volume|volume_weighted|opening_price|closing_price|highest_price|lowest_price|    timestamp|transactions|       new_datetime|moving_max_close|moving_min_close|  moving_avg_close|        range_close|moving_median_close|
+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+----------------+----------------+------------------+-------------------+-------------------+
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704067200000|           1|2024-01-01 08:00:00|          55.388|          55.388|            55.388|                0.0|             55.388|
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704139200000|           1|2024-01-02 04:

In [226]:
test_stats = result \
.withColumn("moving_std_close", stddev("closing_price").over(window)) \
.withColumn("moving_skew_close", skewness("closing_price").over(window)) \
.withColumn("moving_kurt_close", kurtosis("closing_price").over(window))
test_stats.show()

+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+----------------+----------------+------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|volume|volume_weighted|opening_price|closing_price|highest_price|lowest_price|    timestamp|transactions|       new_datetime|moving_max_close|moving_min_close|  moving_avg_close|        range_close|moving_median_close|    moving_std_close|   moving_skew_close|   moving_kurt_close|
+------+---------------+-------------+-------------+-------------+------------+-------------+------------+-------------------+----------------+----------------+------------------+-------------------+-------------------+--------------------+--------------------+--------------------+
|     1|         55.388|       55.388|       55.388|       55.388|      55.388|1704067200000|           1|2024-01-01 08:00:00|          55.388|        

# Loading

# Next steps

* setup airflow such that it will have part of its workflow automatically send requests to scrape the stock and forex data
* setup airflow to store the scraped stock and forex data to s3 bucket
* setup airflow to retrieve stored data in s3 bucket and do data transformations on it using apache spark (pyspark), you will now do your sql transformations here
* setup airflow to store the transformed data in new s3 bucket
* setup airflow to load the transformed data in s3 bucket in a data warehouse like snowflake or databricks or bigquery

# Resources