In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [4]:
spark=SparkSession.builder.config("spark.driver.memory","12g").config("spark.memory.offHeap.enabled","true") .config("spark.memory.offHeap.size","10g").appName('stocks_etf_ai_task2').getOrCreate()

23/04/29 13:15:01 WARN Utils: Your hostname, Adedayos-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 192.168.2.27 instead (on interface en0)
23/04/29 13:15:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/29 13:15:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
spark

In [30]:
from pyspark.sql.functions import  input_file_name
data_file_path = './transformed_stocks_etf_data.parquet/*.parquet'
pyspark_df = spark.read.parquet(data_file_path, header=True, inferSchema=True)
pyspark_df.show(3)

+------+--------------------+----------+-------------------+-------------------+-------------------+-------------------+--------------------+---------+
|Symbol|       Security Name|      Date|               Open|               High|                Low|              Close|           Adj Close|   Volume|
+------+--------------------+----------+-------------------+-------------------+-------------------+-------------------+--------------------+---------+
|   HPQ|HP Inc. Common Stock|1962-01-02|0.13127270340919495|0.13127270340919495|0.12417688220739365|0.12417688220739365| 0.00688728503882885|2480300.0|
|   HPQ|HP Inc. Common Stock|1962-01-03|0.12417688220739365|0.12417688220739365|0.12151595205068588|0.12284641712903976|0.006813489831984043| 507300.0|
|   HPQ|HP Inc. Common Stock|1962-01-04|0.12284641712903976|  0.126837819814682|0.11796803772449493|  0.120185486972332|0.006665901280939579| 845500.0|
+------+--------------------+----------+-------------------+-------------------+--------

In [31]:
pyspark_df=pyspark_df.withColumn('Date', F.to_date('Date'))#format date if string
new = (pyspark_df.groupby('Symbol').agg(F.expr('max(Date)').alias('max_date'),F.expr('min(Date)').alias('min_date'))#Compute max and min date for use in generating date range
.withColumn('Date',F.expr("explode(sequence(min_date,max_date,interval 1 day))"))#Use sequence to compute range
       .drop('max_date','min_date')#drop unwanted columns
      )
#Join new df back to df
pyspark_df = pyspark_df.join(new, how='right', on=['Symbol', 'Date']) 

In [32]:
pyspark_df.printSchema()

root
 |-- Symbol: string (nullable = true)
 |-- Date: date (nullable = false)
 |-- Security Name: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)



In [33]:
pyspark_df.filter('Symbol="HPQ"').orderBy('Date').show(100)

+------+----------+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+---------+
|Symbol|      Date|       Security Name|               Open|               High|                Low|              Close|           Adj Close|   Volume|
+------+----------+--------------------+-------------------+-------------------+-------------------+-------------------+--------------------+---------+
|   HPQ|1962-01-02|HP Inc. Common Stock|0.13127270340919495|0.13127270340919495|0.12417688220739365|0.12417688220739365| 0.00688728503882885|2480300.0|
|   HPQ|1962-01-03|HP Inc. Common Stock|0.12417688220739365|0.12417688220739365|0.12151595205068588|0.12284641712903976|0.006813489831984043| 507300.0|
|   HPQ|1962-01-04|HP Inc. Common Stock|0.12284641712903976|  0.126837819814682|0.11796803772449493|  0.120185486972332|0.006665901280939579| 845500.0|
|   HPQ|1962-01-05|HP Inc. Common Stock| 0.1197419986128807| 0.1197419986128807|0.117524

In [34]:
from pyspark.sql.window import Window 
pyspark_df = pyspark_df.withColumn("Open_New", F.last('Open', True).over(Window.partitionBy('Symbol').orderBy('Date')))
pyspark_df = pyspark_df.withColumn("High_New", F.last('High', True).over(Window.partitionBy('Symbol').orderBy('Date')))
pyspark_df = pyspark_df.withColumn("Low_New", F.last('Low', True).over(Window.partitionBy('Symbol').orderBy('Date')))
pyspark_df = pyspark_df.withColumn("Close_New", F.last('Close', True).over(Window.partitionBy('Symbol').orderBy('Date')))
pyspark_df = pyspark_df.withColumn("Adj Close New", F.last('Adj Close', True).over(Window.partitionBy('Symbol').orderBy('Date')))
pyspark_df = pyspark_df.withColumn("Volume_New", F.last('Volume', True).over(Window.partitionBy('Symbol').orderBy('Date')))
pyspark_df.printSchema()

root
 |-- Symbol: string (nullable = true)
 |-- Date: date (nullable = false)
 |-- Security Name: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Open_New: double (nullable = true)
 |-- High_New: double (nullable = true)
 |-- Low_New: double (nullable = true)
 |-- Close_New: double (nullable = true)
 |-- Adj Close New: double (nullable = true)
 |-- Volume_New: double (nullable = true)



In [35]:
pyspark_df.filter('Symbol="HPQ"').orderBy('Date').select('Volume','Volume_New').show(100)

+---------+----------+
|   Volume|Volume_New|
+---------+----------+
|2480300.0| 2480300.0|
| 507300.0|  507300.0|
| 845500.0|  845500.0|
| 338200.0|  338200.0|
|     null|  338200.0|
|     null|  338200.0|
| 873700.0|  873700.0|
| 930100.0|  930100.0|
| 450900.0|  450900.0|
| 422700.0|  422700.0|
| 535500.0|  535500.0|
|     null|  535500.0|
|     null|  535500.0|
| 310000.0|  310000.0|
| 197200.0|  197200.0|
| 422700.0|  422700.0|
| 479100.0|  479100.0|
|1409200.0| 1409200.0|
|     null| 1409200.0|
|     null| 1409200.0|
| 563700.0|  563700.0|
| 817300.0|  817300.0|
| 338200.0|  338200.0|
| 817300.0|  817300.0|
|1381000.0| 1381000.0|
|     null| 1381000.0|
|     null| 1381000.0|
| 197200.0|  197200.0|
| 197200.0|  197200.0|
| 563700.0|  563700.0|
| 535500.0|  535500.0|
| 140900.0|  140900.0|
|     null|  140900.0|
|     null|  140900.0|
| 676400.0|  676400.0|
| 563700.0|  563700.0|
| 535500.0|  535500.0|
| 732800.0|  732800.0|
| 366400.0|  366400.0|
|     null|  366400.0|
|     null|

In [36]:
from pyspark.sql.window import Window
days = lambda i: i * 86400
pyspark_df = pyspark_df.withColumn('vol_moving_avg', 
                                   F.avg("Volume_New").over(Window.partitionBy('Symbol').orderBy(F.col("Date").cast("timestamp").cast('long')).rangeBetween(-days(30), -1))
                                  ) 
pyspark_df.select('Symbol','Date','Volume','Volume_New', 'vol_moving_avg' ).where(pyspark_df['Symbol']=='HPQ').orderBy('Date').show(35)

+------+----------+---------+----------+-----------------+
|Symbol|      Date|   Volume|Volume_New|   vol_moving_avg|
+------+----------+---------+----------+-----------------+
|   HPQ|1962-01-02|2480300.0| 2480300.0|             null|
|   HPQ|1962-01-03| 507300.0|  507300.0|        2480300.0|
|   HPQ|1962-01-04| 845500.0|  845500.0|        1493800.0|
|   HPQ|1962-01-05| 338200.0|  338200.0|        1277700.0|
|   HPQ|1962-01-06|     null|  338200.0|        1042825.0|
|   HPQ|1962-01-07|     null|  338200.0|         901900.0|
|   HPQ|1962-01-08| 873700.0|  873700.0|         807950.0|
|   HPQ|1962-01-09| 930100.0|  930100.0|817342.8571428572|
|   HPQ|1962-01-10| 450900.0|  450900.0|         831437.5|
|   HPQ|1962-01-11| 422700.0|  422700.0|789155.5555555555|
|   HPQ|1962-01-12| 535500.0|  535500.0|         752510.0|
|   HPQ|1962-01-13|     null|  535500.0|732781.8181818182|
|   HPQ|1962-01-14|     null|  535500.0|716341.6666666666|
|   HPQ|1962-01-15| 310000.0|  310000.0|702430.769230769

In [38]:
pyspark_df = pyspark_df.withColumnRenamed('Adj Close New', 'Adj_Close_New')

In [46]:
pyspark_df = pyspark_df.withColumn(
    "adj_close_rolling_med",
    F.percentile_approx('Adj_Close_New',0.5).over(
        Window.partitionBy("Symbol")
        .orderBy(F.col("Date").cast("timestamp").cast('long'))
        .rangeBetween(-days(30), -1) 
    ),
)

pyspark_df.select('Symbol','Date','Volume','Volume_New', 'vol_moving_avg', 'Adj_Close_New', 'adj_close_rolling_med' ).where(pyspark_df['Symbol']=='HPQ').orderBy('Date').show(35)

[Stage 155:>                                                        (0 + 1) / 1]

+------+----------+---------+----------+-----------------+--------------------+---------------------+
|Symbol|      Date|   Volume|Volume_New|   vol_moving_avg|       Adj_Close_New|adj_close_rolling_med|
+------+----------+---------+----------+-----------------+--------------------+---------------------+
|   HPQ|1962-01-02|2480300.0| 2480300.0|             null| 0.00688728503882885|                 null|
|   HPQ|1962-01-03| 507300.0|  507300.0|        2480300.0|0.006813489831984043|  0.00688728503882885|
|   HPQ|1962-01-04| 845500.0|  845500.0|        1493800.0|0.006665901280939579| 0.006813489831984043|
|   HPQ|1962-01-05| 338200.0|  338200.0|        1277700.0|0.006518316920846701| 0.006813489831984043|
|   HPQ|1962-01-06|     null|  338200.0|        1042825.0|0.006518316920846701| 0.006665901280939579|
|   HPQ|1962-01-07|     null|  338200.0|         901900.0|0.006518316920846701| 0.006665901280939579|
|   HPQ|1962-01-08| 873700.0|  873700.0|         807950.0|0.006616706028580666| 0.

                                                                                

In [49]:
%pip install numpy

Collecting numpy
  Downloading numpy-1.24.3-cp311-cp311-macosx_11_0_arm64.whl (13.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m56.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.24.3
Note: you may need to restart the kernel to use updated packages.


In [54]:
import numpy as np 
from pyspark.sql.types import FloatType

w = Window.partitionBy("Symbol").orderBy(F.col("Date").cast("timestamp").cast('long')).rangeBetween(-days(30), -1)
median_udf = F.udf(lambda x: float(np.median(x)), FloatType())

pyspark_df= pyspark_df.withColumn("list", F.collect_list('Adj_Close_New').over(w)).withColumn("adj_close_rolling_med", median_udf("list"))

In [55]:
pyspark_df.select('Symbol','Date','Volume','Volume_New', 'vol_moving_avg', 'Adj_Close_New', 'adj_close_rolling_med' ).where(pyspark_df['Symbol']=='HPQ').orderBy('Date').show(35)

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


+------+----------+---------+----------+-----------------+--------------------+---------------------+
|Symbol|      Date|   Volume|Volume_New|   vol_moving_avg|       Adj_Close_New|adj_close_rolling_med|
+------+----------+---------+----------+-----------------+--------------------+---------------------+
|   HPQ|1962-01-02|2480300.0| 2480300.0|             null| 0.00688728503882885|                  NaN|
|   HPQ|1962-01-03| 507300.0|  507300.0|        2480300.0|0.006813489831984043|          0.006887285|
|   HPQ|1962-01-04| 845500.0|  845500.0|        1493800.0|0.006665901280939579|         0.0068503874|
|   HPQ|1962-01-05| 338200.0|  338200.0|        1277700.0|0.006518316920846701|           0.00681349|
|   HPQ|1962-01-06|     null|  338200.0|        1042825.0|0.006518316920846701|         0.0067396956|
|   HPQ|1962-01-07|     null|  338200.0|         901900.0|0.006518316920846701|         0.0066659013|
|   HPQ|1962-01-08| 873700.0|  873700.0|         807950.0|0.006616706028580666|   

In [57]:
pyspark_df.printSchema()

root
 |-- Symbol: string (nullable = true)
 |-- Date: date (nullable = false)
 |-- Security Name: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Open_New: double (nullable = true)
 |-- High_New: double (nullable = true)
 |-- Low_New: double (nullable = true)
 |-- Close_New: double (nullable = true)
 |-- Adj_Close_New: double (nullable = true)
 |-- Volume_New: double (nullable = true)
 |-- vol_moving_avg: double (nullable = true)
 |-- adj_close_rolling_med: float (nullable = true)
 |-- list: array (nullable = false)
 |    |-- element: double (containsNull = false)





In [58]:
pyspark_df=pyspark_df.drop('list')

In [62]:
pyspark_df = pyspark_df.select('Symbol','Volume_New', 'vol_moving_avg', 'adj_close_rolling_med')
pyspark_df.show(3)

[Stage 258:>                                                        (0 + 1) / 1]

+------+----------+--------------+---------------------+
|Symbol|Volume_New|vol_moving_avg|adj_close_rolling_med|
+------+----------+--------------+---------------------+
|    AA|   55900.0|          null|                  NaN|
|    AA|   74500.0|       55900.0|            1.5366576|
|    AA|   80500.0|       65200.0|            1.5484347|
+------+----------+--------------+---------------------+
only showing top 3 rows



                                                                                

In [63]:
pyspark_df.write.mode('overwrite').parquet("ml_input_data.parquet")

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
  return _methods._mean(a, axis=

In [64]:
pyspark_df.count()

                                                                                

41211538