In [1]:
import os
import pyspark
from pyspark.sql import SparkSession, types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/11 13:47:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read \
    .option("header", "true") \
    .parquet('data/price_n_volume/2021/AAPL.parquet')\
    .drop('__index_level_0__')

In [4]:
df.printSchema()

root
 |-- date: timestamp (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- adjusted_close: double (nullable = true)
 |-- volume: long (nullable = true)
 |-- dividend_amount: double (nullable = true)
 |-- split_coefficient: double (nullable = true)
 |-- close_percent_change: double (nullable = true)
 |-- company: string (nullable = true)



In [5]:
from pyspark.sql.functions import col

df.select(\
          col("company")
    ).groupBy('company').count().show()

+-------+-----+
|company|count|
+-------+-----+
|   AAPL|  252|
+-------+-----+



In [6]:
# source: https://stackoverflow.com/questions/71038595/pyspark-cumulative-sum-within-partition-for-moving-last-2-n-rows
from pyspark.sql import functions as F, Window

w = Window.partitionBy('company').orderBy('date').rowsBetween(Window.currentRow, 2)

df = df.withColumn(
    'close_percent_change_new', 
    F.avg('close_percent_change').over(w))

In [7]:
df.show()

+-------------------+------+--------+-------+------+----------------+---------+---------------+-----------------+--------------------+-------+------------------------+
|               date|  open|    high|    low| close|  adjusted_close|   volume|dividend_amount|split_coefficient|close_percent_change|company|close_percent_change_new|
+-------------------+------+--------+-------+------+----------------+---------+---------------+-----------------+--------------------+-------+------------------------+
|2021-01-04 00:00:00|133.52|133.6116| 126.76|129.41|127.519519957544|143301887|            0.0|              1.0|             -2.4719|   AAPL|                 -1.5339|
|2021-01-05 00:00:00|128.89|  131.74| 128.43|131.01|129.096146431016| 97664898|            0.0|              1.0|              1.2364|   AAPL|     0.42749999999999994|
|2021-01-06 00:00:00|127.72|131.0499|126.382| 126.6|124.750569713508|155087970|            0.0|              1.0|             -3.3662|   AAPL|     0.30306666666

In [72]:
df.toPandas().head()

Unnamed: 0,date,open,high,low,close,adjusted_close,volume,dividend_amount,split_coefficient,close_percent_change,company,close_percent_change_new
0,2021-01-04,133.52,133.6116,126.76,129.41,127.51952,143301887,0.0,1.0,-2.4719,AAPL,-1.5339
1,2021-01-05,128.89,131.74,128.43,131.01,129.096146,97664898,0.0,1.0,1.2364,AAPL,0.4275
2,2021-01-06,127.72,131.0499,126.382,126.6,124.75057,155087970,0.0,1.0,-3.3662,AAPL,0.303067
3,2021-01-07,128.36,131.63,127.86,130.92,129.007461,109578157,0.0,1.0,3.4123,AAPL,0.650167
4,2021-01-08,132.43,132.63,130.23,132.05,130.120954,105158245,0.0,1.0,0.8631,AAPL,-0.5338


In [69]:
-3.3662 + 1.2364 -2.4719 

-4.601700000000001

In [10]:
# source: https://stackoverflow.com/questions/71038595/pyspark-cumulative-sum-within-partition-for-moving-last-2-n-rows
from pyspark.sql import functions as F, Window


def window_all():
    path = 'data/price_n_volume/2021/*.parquet'
    df = spark.read \
        .option("header", "true") \
        .parquet(path)\
        .drop('__index_level_0__')
    w = Window.partitionBy('company').orderBy('date').rowsBetween(Window.currentRow, 2)

    df = df.withColumn(
        'close_percent_change_new', 
        F.avg('close_percent_change').over(w))
    
    return df


df_all = window_all()


In [11]:
df_all.select(\
          col("company")
    ).groupBy('company').count().show()

+-------+-----+
|company|count|
+-------+-----+
|   AAPL|  252|
|   AMGN|  252|
|   AMZN|  252|
|    ADI|  252|
|    IBM|  252|
|    AMD|  252|
|   ABNB|  252|
|    AEP|  252|
|   ADBE|  252|
|   ANSS|  252|
|   ALGN|  252|
+-------+-----+



In [18]:
df_all.createOrReplaceTempView("all_stocks")
df_appl = spark.sql("SELECT * FROM all_stocks WHERE company = 'AAPL'")

In [23]:
# Test to check if df does not take into consideration other companies average 

False in (df_appl.toPandas().close_percent_change_new == df.toPandas().close_percent_change_new)

False

In [68]:
from pyspark.sql.functions import udf

def udf_if_outlier(val, avg, stddev):
    if ( val > (avg + stddev)) or ( val < (avg - stddev)):
        return True
    return False

def udf_if_outlier_high_only(val, avg, stddev):
    if ( val > (avg + stddev)):
        return True
    return False


def window_create(df, on_column ,number_of_days, both=1):
    w = Window.partitionBy('company').orderBy('date').rowsBetween(Window.currentRow+1, number_of_days)
    
    avg_column = f'avg_{on_column}_{number_of_days}'
    stddev_column = f'stddev_{on_column}_{number_of_days}'
    outlier_column = f'if_outlier_{on_column}_{number_of_days}'

    df = df.withColumn(
        avg_column, 
        F.avg(f'{on_column}').over(w))
    
    df = df.withColumn(
        stddev_column, 
        F.stddev(f'{on_column}').over(w))
    
    if both:
        udfValueToOutlier = udf(udf_if_outlier, types.BooleanType())
    else:
        udfValueToOutlier = udf(udf_if_outlier_high_only, types.BooleanType())
     
    # here does not work
    # df = df.withColumn(
    #        outlier_column, 
    #        udfValueToOutlier(df[f'{on_column}'], df[f'{avg_column}'], df[f'{stddev_column}']))

    
    return df


path = 'data/price_n_volume/2021/*.parquet'
df = spark.read \
        .option("header", "true") \
        .parquet(path)\
        .drop('__index_level_0__')

df = window_create(df, 'close_percent_change', 15)
df.toPandas().head()
    

Unnamed: 0,date,open,high,low,close,adjusted_close,volume,dividend_amount,split_coefficient,close_percent_change,company,avg_close_percent_change_15,stddev_close_percent_change_15
0,2021-01-04,133.52,133.6116,126.76,129.41,127.51952,143301887,0.0,1.0,-2.4719,AAPL,0.697053,2.154657
1,2021-01-05,128.89,131.74,128.43,131.01,129.096146,97664898,0.0,1.0,1.2364,AAPL,0.5634,2.180832
2,2021-01-06,127.72,131.0499,126.382,126.6,124.75057,155087970,0.0,1.0,-3.3662,AAPL,0.55458,2.198059
3,2021-01-07,128.36,131.63,127.86,130.92,129.007461,109578157,0.0,1.0,3.4123,AAPL,0.07762,2.307179
4,2021-01-08,132.43,132.63,130.23,132.05,130.120954,105158245,0.0,1.0,0.8631,AAPL,0.130213,2.335185


In [70]:
on_column = 'close_percent_change'
number_of_days = 15
avg_column = f'avg_{on_column}_{number_of_days}'
stddev_column = f'stddev_{on_column}_{number_of_days}'
outlier_column = f'if_outlier_{on_column}_{number_of_days}'

df = df.withColumn(
            outlier_column, 
            udfValueToOutlier(df[f'{on_column}'], df[f'{avg_column}'], df[f'{stddev_column}']))

df.show()

+-------------------+------+--------+-------+------+----------------+---------+---------------+-----------------+--------------------+-------+---------------------------+------------------------------+----------------------------------+
|               date|  open|    high|    low| close|  adjusted_close|   volume|dividend_amount|split_coefficient|close_percent_change|company|avg_close_percent_change_15|stddev_close_percent_change_15|if_outlier_close_percent_change_15|
+-------------------+------+--------+-------+------+----------------+---------+---------------+-----------------+--------------------+-------+---------------------------+------------------------------+----------------------------------+
|2021-01-04 00:00:00|133.52|133.6116| 126.76|129.41|127.519519957544|143301887|            0.0|              1.0|             -2.4719|   AAPL|         0.6970533333333333|             2.154657454335563|                              true|
|2021-01-05 00:00:00|128.89|  131.74| 128.43|131.01|

In [53]:

on_column = 'close_percent_change'
number_of_days = 15
avg_column = f'avg_{on_column}_{number_of_days}'
stddev_column = f'stddev_{on_column}_{number_of_days}'
outlier_column = f'if_outlier_{on_column}_{number_of_days}'


.show()

+-------------------+------+--------+-------+------+----------------+---------+---------------+-----------------+--------------------+-------+---------------------------+------------------------------+----------------------------------+
|               date|  open|    high|    low| close|  adjusted_close|   volume|dividend_amount|split_coefficient|close_percent_change|company|avg_close_percent_change_15|stddev_close_percent_change_15|if_outlier_close_percent_change_15|
+-------------------+------+--------+-------+------+----------------+---------+---------------+-----------------+--------------------+-------+---------------------------+------------------------------+----------------------------------+
|2021-01-04 00:00:00|133.52|133.6116| 126.76|129.41|127.519519957544|143301887|            0.0|              1.0|             -2.4719|   AAPL|         0.6970533333333333|             2.154657454335563|                              true|
|2021-01-05 00:00:00|128.89|  131.74| 128.43|131.01|

In [58]:
df.select(\
          col("company"),
        col("if_outlier_close_percent_change_15") 
    ).where("if_outlier_close_percent_change_15 != None")\
    .groupBy("company", "if_outlier_close_percent_change_15").count().show()

AttributeError: 'NoneType' object has no attribute 'select'

![Outlier](https://upload.wikimedia.org/wikipedia/commons/thumb/8/8c/Standard_deviation_diagram.svg/1920px-Standard_deviation_diagram.svg.png)

In [35]:
def udf_if_outlier(val, avg, stddev):
    if ( val > (avg + stddev)) or ( val < (avg - stddev)):
        return 1
    return 0

print(udf_if_outlier(0.8, 1, 0.1),udf_if_outlier(1.2, 1, 0.1) )

1 1


1