In [1]:
import os
import pyspark
import pandas as pd
from pathlib import Path
from pyspark.sql import SparkSession

PROJECT_PATH = (Path.cwd().parents[0])

In [2]:
data_path = os.path.join(PROJECT_PATH, "data/asset.csv")

In [3]:
spark = SparkSession.builder.appName('spark_app').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/31 13:13:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark

In [5]:
df = spark \
  .read \
  .option('header', 'true') \
  .csv(data_path, inferSchema=True)

In [6]:
df.head(5)

[Row(name='AMDd_EQ', asset_description='Advanced Micro Devices', tag_list='Sample Tag_5578,hey', value=14.56, profit=3.87, price=203.45, recent_high_30d=207.72, recent_low_30d=199.12, pct_drawdown=-0.020556518390140624, volatility_30d=0.00725840181043737, price_vs_ma_50=-0.00769747771282199, ma_30=203.83766666666668, ma_50=205.0282, dca_bias=0.014083090461242845, trend='Bearish', data_date=datetime.datetime(2026, 1, 30, 18, 21, 27, 146654)),
 Row(name='CBUKd_EQ', asset_description='iShares MSCI China Tech (Acc)', tag_list='Sample Tag_1055,Sample Tag_7162,akin', value=136.49, profit=-2.31, price=4.5705, recent_high_30d=4.6001, recent_low_30d=4.5627, pct_drawdown=-0.006434642725158212, volatility_30d=0.0009408665818506059, price_vs_ma_50=-0.00199404668322698, ma_30=4.578996666666667, ma_50=4.579632, dca_bias=0.0041090266940549585, trend='Bearish', data_date=datetime.datetime(2026, 1, 30, 18, 21, 27, 146654)),
 Row(name='CHVd_EQ', asset_description='Chevron', tag_list='Sample Tag_7162', v

In [7]:
df.show()

+-----------+--------------------+--------------------+------+------+-------+---------------+--------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-------+--------------------+
|       name|   asset_description|            tag_list| value|profit|  price|recent_high_30d|recent_low_30d|        pct_drawdown|      volatility_30d|      price_vs_ma_50|             ma_30|             ma_50|            dca_bias|  trend|           data_date|
+-----------+--------------------+--------------------+------+------+-------+---------------+--------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-------+--------------------+
|    AMDd_EQ|Advanced Micro De...| Sample Tag_5578,hey| 14.56|  3.87| 203.45|         207.72|        199.12|-0.02055651839014...| 0.00725840181043737|-0.00769747771282199|203.83766666666668|          205.0282|0.014083090

In [8]:
type(df)

pyspark.sql.classic.dataframe.DataFrame

In [9]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- asset_description: string (nullable = true)
 |-- tag_list: string (nullable = true)
 |-- value: double (nullable = true)
 |-- profit: double (nullable = true)
 |-- price: double (nullable = true)
 |-- recent_high_30d: double (nullable = true)
 |-- recent_low_30d: double (nullable = true)
 |-- pct_drawdown: double (nullable = true)
 |-- volatility_30d: double (nullable = true)
 |-- price_vs_ma_50: double (nullable = true)
 |-- ma_30: double (nullable = true)
 |-- ma_50: double (nullable = true)
 |-- dca_bias: double (nullable = true)
 |-- trend: string (nullable = true)
 |-- data_date: timestamp (nullable = true)



In [10]:
df.columns

['name',
 'asset_description',
 'tag_list',
 'value',
 'profit',
 'price',
 'recent_high_30d',
 'recent_low_30d',
 'pct_drawdown',
 'volatility_30d',
 'price_vs_ma_50',
 'ma_30',
 'ma_50',
 'dca_bias',
 'trend',
 'data_date']

In [11]:
df.tail(3)

[Row(name='HCHSl_EQ', asset_description='HSBC MSCI CHINA (Acc)', tag_list=None, value=174.3, profit=3.27, price=7.016, recent_high_30d=7.065, recent_low_30d=7.01, pct_drawdown=-0.006935598018400619, volatility_30d=0.0012101639798961666, price_vs_ma_50=-0.0022185562318675777, ma_30=7.0308, ma_50=7.0316, dca_bias=0.0044762378999369575, trend='Bearish', data_date=datetime.datetime(2026, 1, 30, 18, 21, 27, 146654)),
 Row(name='EMIDl_EQ', asset_description='iShares MSCI Europe Mid Cap (Dist)', tag_list=None, value=122.48, profit=6.01, price=7.676, recent_high_30d=7.692, recent_low_30d=7.667, pct_drawdown=-0.002080083203328135, volatility_30d=0.0008109418352990216, price_vs_ma_50=-0.00033339584505426745, ma_30=7.6796, ma_50=7.67856, dca_bias=0.0012544941232156767, trend='Bullish', data_date=datetime.datetime(2026, 1, 30, 18, 21, 27, 146654)),
 Row(name='NKE_US_EQ', asset_description='Nike', tag_list=None, value=23.88, profit=-4.2, price=61.09, recent_high_30d=62.52, recent_low_30d=61.09, pct

In [12]:
df.select('name').show()

+-----------+
|       name|
+-----------+
|    AMDd_EQ|
|   CBUKd_EQ|
|    CHVd_EQ|
|   ISFEl_EQ|
| SNII_US_EQ|
|   ANRJl_EQ|
|    IUS3_EQ|
| ADBE_US_EQ|
|  PFE_US_EQ|
|   EUNYd_EQ|
| SMCI_US_EQ|
|   VJPBl_EQ|
|   VERGl_EQ|
|   SHELl_EQ|
|   SEMIl_EQ|
|   IUSZl_EQ|
|   EXH1d_EQ|
|   IDVYa_EQ|
|GOOGL_US_EQ|
|   ISLNl_EQ|
+-----------+
only showing top 20 rows


In [13]:
df.select(['name', 'asset_description']).show()

+-----------+--------------------+
|       name|   asset_description|
+-----------+--------------------+
|    AMDd_EQ|Advanced Micro De...|
|   CBUKd_EQ|iShares MSCI Chin...|
|    CHVd_EQ|             Chevron|
|   ISFEl_EQ|iShares MSCI AC F...|
| SNII_US_EQ|   Rigetti Computing|
|   ANRJl_EQ|Amundi Global Hyd...|
|    IUS3_EQ|iShares S&P Small...|
| ADBE_US_EQ|               Adobe|
|  PFE_US_EQ|              Pfizer|
|   EUNYd_EQ|iShares EM Divide...|
| SMCI_US_EQ|Super Micro Computer|
|   VJPBl_EQ|Vanguard FTSE Jap...|
|   VERGl_EQ|Vanguard FTSE Dev...|
|   SHELl_EQ|               Shell|
|   SEMIl_EQ|iShares MSCI Glob...|
|   IUSZl_EQ|iShares MSCI USA ...|
|   EXH1d_EQ|iShares STOXX Eur...|
|   IDVYa_EQ|iShares Euro Divi...|
|GOOGL_US_EQ|  Alphabet (Class A)|
|   ISLNl_EQ|iShares Physical ...|
+-----------+--------------------+
only showing top 20 rows


In [14]:
df.dtypes

[('name', 'string'),
 ('asset_description', 'string'),
 ('tag_list', 'string'),
 ('value', 'double'),
 ('profit', 'double'),
 ('price', 'double'),
 ('recent_high_30d', 'double'),
 ('recent_low_30d', 'double'),
 ('pct_drawdown', 'double'),
 ('volatility_30d', 'double'),
 ('price_vs_ma_50', 'double'),
 ('ma_30', 'double'),
 ('ma_50', 'double'),
 ('dca_bias', 'double'),
 ('trend', 'string'),
 ('data_date', 'timestamp')]

In [15]:
df.describe().show()

26/01/31 13:13:40 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 7:>                                                          (0 + 1) / 1]

+-------+----------+--------------------+--------+-----------------+------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+-----------------+--------------------+-------+
|summary|      name|   asset_description|tag_list|            value|            profit|             price|   recent_high_30d|    recent_low_30d|        pct_drawdown|      volatility_30d|      price_vs_ma_50|             ma_30|            ma_50|            dca_bias|  trend|
+-------+----------+--------------------+--------+-----------------+------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+-----------------+--------------------+-------+
|  count|        59|                  59|      17|               59|                59|                59|                59|                59|                  59|             

                                                                                

In [18]:
df = df.withColumn('New Col',  df['ma_30'] * 2)

In [19]:
df.show()

+-----------+--------------------+--------------------+------+------+-------+---------------+--------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-------+--------------------+------------------+
|       name|   asset_description|            tag_list| value|profit|  price|recent_high_30d|recent_low_30d|        pct_drawdown|      volatility_30d|      price_vs_ma_50|             ma_30|             ma_50|            dca_bias|  trend|           data_date|           New Col|
+-----------+--------------------+--------------------+------+------+-------+---------------+--------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-------+--------------------+------------------+
|    AMDd_EQ|Advanced Micro De...| Sample Tag_5578,hey| 14.56|  3.87| 203.45|         207.72|        199.12|-0.02055651839014...| 0.00725840181043737|-0.0076974777

In [21]:
df = df.drop('New Col')

In [22]:
df.show()

+-----------+--------------------+--------------------+------+------+-------+---------------+--------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-------+--------------------+
|       name|   asset_description|            tag_list| value|profit|  price|recent_high_30d|recent_low_30d|        pct_drawdown|      volatility_30d|      price_vs_ma_50|             ma_30|             ma_50|            dca_bias|  trend|           data_date|
+-----------+--------------------+--------------------+------+------+-------+---------------+--------------+--------------------+--------------------+--------------------+------------------+------------------+--------------------+-------+--------------------+
|    AMDd_EQ|Advanced Micro De...| Sample Tag_5578,hey| 14.56|  3.87| 203.45|         207.72|        199.12|-0.02055651839014...| 0.00725840181043737|-0.00769747771282199|203.83766666666668|          205.0282|0.014083090