In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,sum as _sum,when
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName("TradeSurveillance").getOrCreate()

In [3]:
spark

In [4]:
market_data = spark.read.csv(r"D:\python_env\pyspark\market_data_analysis\market_data\prices.csv",header=True,inferSchema=True)

In [5]:
market_data

DataFrame[date: string, symbol: string, open: double, close: double, low: double, high: double, volume: int]

In [7]:
market_data.show()

+----------------+------+----------+----------+----------+----------+-------+
|            date|symbol|      open|     close|       low|      high| volume|
+----------------+------+----------+----------+----------+----------+-------+
|05-01-2024 00:00|  WLTW|    123.43|125.839996|122.309998|    126.25|2163600|
|06-01-2024 00:00|  WLTW|125.239998|119.980003|119.940002|125.540001|2386400|
|07-01-2024 00:00|  WLTW|116.379997|114.949997|    114.93|119.739998|2489500|
|08-01-2024 00:00|  WLTW|115.480003|116.620003|     113.5|117.440002|2006300|
|11-01-2024 00:00|  WLTW|117.010002|114.970001|114.089996|117.330002|1408600|
|12-01-2024 00:00|  WLTW|115.510002|115.550003|     114.5|116.059998|1098000|
|13-01-2024 00:00|  WLTW|116.459999|112.849998|112.589996|    117.07| 949600|
|14-01-2024 00:00|  WLTW|113.510002|114.379997|110.050003|115.029999| 785300|
|15-01-2024 00:00|  WLTW|113.330002|112.529999|111.919998|114.879997|1093700|
|19-01-2024 00:00|  WLTW|113.660004|110.379997|109.870003|115.87

In [8]:
market_data.printSchema()

root
 |-- date: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- open: double (nullable = true)
 |-- close: double (nullable = true)
 |-- low: double (nullable = true)
 |-- high: double (nullable = true)
 |-- volume: integer (nullable = true)



In [11]:
#datatype is checked to confirm if data conversion require for important field : open,close,low,high,volume
# here all these values are already in in double datatype , no conversion is needed

In [None]:
#there are not null values present in this dataset, therefore skipping null check for time being

In [None]:
#lets calculate vwap value for each day
# for this I am referring source: https://www.investopedia.com/terms/v/vwap.asp
# vwap = (cumulative typical price * volume)/cumulative volume
# typical price = (high price+low price+ closing price)/3
#cumulative = totale trades since the trading session opened


In [9]:
market_data = market_data.withColumn("cum_pric_vol", ((col("high")+col("low")+col("close"))/3)*col("volume") )

In [10]:
windowSpac = Window.partitionBy("symbol").orderBy("date").rowsBetween(-9,0)

In [12]:
market_data = market_data.withColumn("VWAP",_sum("cum_pric_vol").over(windowSpac) / _sum("volume").over(windowSpac))

In [13]:
market_data.show()

+----------------+------+---------+---------+---------+---------+-------+--------------------+------------------+
|            date|symbol|     open|    close|      low|     high| volume|        cum_pric_vol|              VWAP|
+----------------+------+---------+---------+---------+---------+-------+--------------------+------------------+
|01-02-2010 00:00|   AIV|    15.43|    15.74|    15.34|    15.74|1289900|2.0131039333333336E7| 15.60666666666667|
|01-02-2011 00:00|   AIV|    25.74|    25.75|    25.25|    25.85| 950400|2.4346079999999996E7|19.853197934800395|
|01-02-2012 00:00|   AIV|    24.73|    24.83|24.559999|    24.83|1552400|3.8406375482533336E7|21.853427588753835|
|01-02-2013 00:00|   AIV|27.360001|27.389999|    27.34|27.879999|2689400|     7.40571095404E7| 24.21138278586672|
|01-02-2024 00:00|   AIV|38.869999|39.610001|    38.68|40.049999|1659600|         6.5465688E7|27.316935327544208|
|01-03-2010 00:00|   AIV|    16.76|    16.75|    16.59|    16.83|1053200|1.7613014666666

In [None]:
# calculate volatility

In [14]:
market_data = market_data.withColumn('volatility',(col('high')-col('low'))/col('open'))
market_data.show()                                    

+----------------+------+---------+---------+---------+---------+-------+--------------------+------------------+--------------------+
|            date|symbol|     open|    close|      low|     high| volume|        cum_pric_vol|              VWAP|          volatility|
+----------------+------+---------+---------+---------+---------+-------+--------------------+------------------+--------------------+
|01-02-2010 00:00|   AIV|    15.43|    15.74|    15.34|    15.74|1289900|2.0131039333333336E7| 15.60666666666667|0.025923525599481554|
|01-02-2011 00:00|   AIV|    25.74|    25.75|    25.25|    25.85| 950400|2.4346079999999996E7|19.853197934800395|0.023310023310023367|
|01-02-2012 00:00|   AIV|    24.73|    24.83|24.559999|    24.83|1552400|3.8406375482533336E7|21.853427588753835|0.010917953902143026|
|01-02-2013 00:00|   AIV|27.360001|27.389999|    27.34|27.879999|2689400|     7.40571095404E7| 24.21138278586672|0.019736804834181172|
|01-02-2024 00:00|   AIV|38.869999|39.610001|    38.68|

In [17]:
market_data = market_data.withColumn("manipulation_flag", when(col("volatility")>0.1 , 1).otherwise(0))
#here I am keeping volatility 10%

In [15]:
market_data.show()

+----------------+------+---------+---------+---------+---------+-------+--------------------+------------------+--------------------+
|            date|symbol|     open|    close|      low|     high| volume|        cum_pric_vol|              VWAP|          volatility|
+----------------+------+---------+---------+---------+---------+-------+--------------------+------------------+--------------------+
|01-02-2010 00:00|   AIV|    15.43|    15.74|    15.34|    15.74|1289900|2.0131039333333336E7| 15.60666666666667|0.025923525599481554|
|01-02-2011 00:00|   AIV|    25.74|    25.75|    25.25|    25.85| 950400|2.4346079999999996E7|19.853197934800395|0.023310023310023367|
|01-02-2012 00:00|   AIV|    24.73|    24.83|24.559999|    24.83|1552400|3.8406375482533336E7|21.853427588753835|0.010917953902143026|
|01-02-2013 00:00|   AIV|27.360001|27.389999|    27.34|27.879999|2689400|     7.40571095404E7| 24.21138278586672|0.019736804834181172|
|01-02-2024 00:00|   AIV|38.869999|39.610001|    38.68|

In [18]:
market_data.filter('manipulation_flag=1').count()

2007

In [19]:
market_data.filter('manipulation_flag=1').show()

+----------------+------+----------+----------+----------+----------+--------+--------------------+------------------+-------------------+-----------------+
|            date|symbol|      open|     close|       low|      high|  volume|        cum_pric_vol|              VWAP|         volatility|manipulation_flag|
+----------------+------+----------+----------+----------+----------+--------+--------------------+------------------+-------------------+-----------------+
|04-10-2011 00:00|   AIV|     20.35|     22.16|     20.08|     22.23| 3200000|            6.8768E7| 28.39589438287778|0.10565110565110575|                1|
|06-05-2010 00:00|   AIV|     23.17| 22.110001| 20.280001|     23.52| 4310600| 9.470388487373333E7| 29.05007137423339|0.13983595166163146|                1|
|28-10-2011 00:00|   AIV|     26.92| 25.030001| 24.559999|     27.26| 4310100|1.1041039499999999E8|28.125903231905422|0.10029721396731056|                1|
|      03-02-2024|  ALXN|     132.5|145.970001|130.509995|