## initialization

### imports

In [1]:
from pyspark import SparkConf, SparkContext
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pyarrow.parquet as pq
import os
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F
from functools import reduce
from pyspark.sql.window import Window

### configs

In [2]:
PATH_TRADE = '/home/user1/Data/'
PATH_PORTFOLIO = '/home/user1/Data/Portfolio/'
PRICE_PATH =  '/home/user1/Data/'
VALID_SYMBOLS_PATH = '/home/user1/Data/'

HOUR_SECONDS = 60 * 60
MINUTE_SECONDS = 60

MIN_ANALYSIS_DATE = 13980101
MAX_ANALYSIS_DATE = 13980331

N_QUANTILES = 10

### general functions

In [3]:
def display_df(df):
    df.persist()
    print(df.count())
    df.show(3, False)

def min_max(df):
    return df.agg(F.min('date').alias('min_date'), F.max('date').alias('max_date')).show()

def modify_time(x):
    hour = x // 10000
    minute = (x % 10000) // 100
    second = x % 100
    return HOUR_SECONDS * 3600 + MINUTE_SECONDS * 60 + second
modify_time_udf = F.udf(modify_time, T.IntegerType())

dropSpace = F.udf(lambda x: x.replace(' ', ''), T.StringType())

mappingDict = {
              'ما  ' : 'ما',
              'جم  ' : 'جم',
              'جمپیلن' : 'جم پیلن',
              'افقملت' : 'افق ملت',
              'آسپ' : 'آ س پ',
              'آپ  ' : 'آپ',
              'سپ  ' : 'سپ',
              'غپاذر' : 'غپآذر',
              'هدشت' : 'دهدشت',
              'نگان' : 'زنگان',
              'فبورس' : 'فرابورس',
              'شیری' : 'دشیری',
              'وتعان' : 'وتعاون',
              'آس پ' : 'آ س پ',
              'انرژی1': 'انرژی 1',
              'انرژی2' : 'انرژی 2',
              'انرژی3' : 'انرژی 3',
              'انرژیح1' : 'انرژیح 1',
              'انرژیح2' : 'انرژیح 2',
              'انرژیح3' : 'انرژیح 3',
              'فناوا' : 'فن آوا',
              'فنآوا' : 'فن آوا',
              'امینیکم' : 'امین یکم',
              'هایوب' : 'های وب',
              'کیبیسی' : 'کی بی سی',
              'کیبیسیح' : 'کی بی سیح',
              'واتوس' : 'وآتوس'
              }

def replace_arabic_characters_and_correct_symbol_names(data):
    mapping = {
        'ك': 'ک',
        'گ': 'گ',
        'دِ': 'د',
        'بِ': 'ب',
        'زِ': 'ز',
        'ذِ': 'ذ',
        'شِ': 'ش',
        'سِ': 'س',
        'ى': 'ی',
        'ي': 'ی',
    }
    for i in mapping:
        data = (
            data
            .withColumn('symbol', F.regexp_replace('symbol', i, mapping[i]))
        )
    data = (
        data
        .withColumn(
        'symbol',
        F.when((F.col('symbol').substr(1, 1) == 'ذ') & (F.col('symbol') != 'ذوب'), F.col('symbol').substr(2, 30)).otherwise(
            F.col('symbol'))
        )
        .withColumn(
        'symbol',
        F.when(F.col('symbol').substr(1, 2) == 'گژ', F.col('symbol').substr(3, 30)).otherwise(
            F.col('symbol'))
        )
        .withColumn(
        'symbol',
        F.when(F.col('symbol').substr(1, 1) == 'ژ', F.col('symbol').substr(2, 30)).otherwise(
            F.col('symbol'))
        )
        .replace(mappingDict,subset=['symbol'])
    )
    return data

spaceDeleteUDF1 = F.udf(lambda s: s.replace('\u200d', ''), T.StringType())
spaceDeleteUDF2 = F.udf(lambda s: s.replace('\u200c', ''), T.StringType())

### Spark instaniation

In [4]:
conf = SparkConf()
conf.set('spark.driver.memory', '130g').set('spark.shuffle.service.index.cache.size', '1g').setAppName('Practice') #.set('spark.executer.cores', '58')
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

22/02/23 08:57:57 WARN Utils: Your hostname, user1-ubuntu resolves to a loopback address: 127.0.1.1; using 172.16.32.107 instead (on interface eth0)
22/02/23 08:57:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/23 08:57:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## data inputs

### load daily trade data

In [5]:
raw_trade_df = spark.read.parquet(PATH_TRADE + "tradeData.parquet")

display_df(raw_trade_df)
# capital increas?
# make sure trade value and number of shares are not zero!

                                                                                

27464132
+--------+------+------+------------------------------------+------------------------------------+------------+----------+--------------------+
|date    |time  |symbol|buyerAccountId                      |sellerAccountId                     |nTradeShares|tradePrice|tradeSettlementValue|
+--------+------+------+------------------------------------+------------------------------------+------------+----------+--------------------+
|13980110|115203|ثامان |ECC77A89-C9AD-494F-8B49-4F178C2D7F3E|2AF253F2-7044-44EA-858B-09DA6A224E86|24928       |3390.0    |8.450592            |
|13980110|120816|ثاباد |9288482E-9715-4DE3-AAB6-D20D2FB157DE|02F7AA8E-29E7-4594-ACD4-FDAEE6BA957B|2000        |2320.0    |0.464               |
|13980110|122054|آسیا  |5D9B391F-C8F1-48E8-A9D0-2215FFCB9FCB|C7470FDD-F4DA-472D-895A-EBD1ED249BAD|7573        |1876.0    |1.4206948           |
+--------+------+------+------------------------------------+------------------------------------+------------+----------+-----

In [6]:
min_max(raw_trade_df)



+--------+--------+
|min_date|max_date|
+--------+--------+
|13980105|13980329|
+--------+--------+



                                                                                

### load portfolio data

In [7]:
raw_portfolio_df = spark.read.parquet(
    PATH_PORTFOLIO + "{}".format("raw_portfolio_df.parquet")
    )
display_df(raw_portfolio_df)



12109854
+------+--------+------------------------------------+------+
|SPSYMB|SPDATE  |SPACC#                              |SPTROH|
+------+--------+------------------------------------+------+
|خساپا |13980105|37D6BD7D-DCF6-4B34-9ACC-AAA8111E0243|32850 |
|خساپا |13980105|F5094FE0-46A8-4C7F-86CD-6EA2DC8D5B42|2237  |
|اخابر |13980105|47229110-AA3E-4972-985B-670E61BAC864|384   |
+------+--------+------------------------------------+------+
only showing top 3 rows



                                                                                

### load daily price and shrout data

In [8]:
price_df = (
    spark.read.parquet(PRICE_PATH.format('Cleaned_Stock_Prices_14001116.parquet'))
    .filter(F.col('jalaliDate').between(MIN_ANALYSIS_DATE, MAX_ANALYSIS_DATE))
    .select(
        F.col('jalaliDate').alias('date'),
        F.col('name').alias('symbol'),
        'close_price',
        'close_price_adjusted',
        'shrout',
        (F.col('MarketCap') / 10**7).alias('mktcap')
    )
    .dropDuplicates()
)

price_df = replace_arabic_characters_and_correct_symbol_names(price_df)

display_df(price_df)

22/02/23 08:58:48 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

57091


22/02/23 08:59:11 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


+--------+------+-----------+--------------------+------+---------+
|date    |symbol|close_price|close_price_adjusted|shrout|mktcap   |
+--------+------+-----------+--------------------+------+---------+
|13980221|آ س پ |1366.0     |1249.0              |9.0E8 |122940.0 |
|13980125|آتیمس |30009.0    |30380.0             |1.0E9 |3000900.0|
|13980202|آتیمس |30880.0    |31262.0             |1.0E9 |3088000.0|
+--------+------+-----------+--------------------+------+---------+
only showing top 3 rows



In [9]:
added_price_df = spark.createDataFrame(pd.DataFrame({
                                                        'date' : [13980105],
                                                        'symbol' : ['ومشان'],
                                                        'close_price' : [561],
                                                        'close_price_adjusted' : [np.nan],
                                                        'shrout' : [20000000],
                                                        'mktcap' : [1122]
                                                    })
                                      )

price_df = price_df.union(added_price_df)

In [10]:
min_max(price_df)

22/02/23 08:59:12 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB

+--------+--------+
|min_date|max_date|
+--------+--------+
|13980105|13980329|
+--------+--------+



                                                                                

In [11]:
MIN_PRICE_DATE = price_df.agg(F.min('date')).collect()[0][0]
MAX_PRICE_DATE = price_df.agg(F.max('date')).collect()[0][0]

price_df.agg(F.countDistinct('symbol')).show()
price_df.filter(F.col('date') == MIN_PRICE_DATE).agg(F.countDistinct('symbol')).show()
price_df.filter(F.col('date') == MAX_PRICE_DATE).agg(F.countDistinct('symbol')).show()

22/02/23 08:59:17 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:19 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:22 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:30 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

+-------------+
|count(symbol)|
+-------------+
|         1031|
+-------------+



22/02/23 08:59:31 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:34 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

+-------------+
|count(symbol)|
+-------------+
|         1014|
+-------------+



22/02/23 08:59:35 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:37 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


+-------------+
|count(symbol)|
+-------------+
|         1024|
+-------------+



                                                                                

### load valid symbols data

In [12]:
valid_symbols_df = (
    spark.read.parquet(VALID_SYMBOLS_PATH + '{}'.format('Symbols_14001116.parquet'))
    .select('Ticker')
    .withColumnRenamed('Ticker','symbol')
    .dropDuplicates()
)

valid_symbols_df = replace_arabic_characters_and_correct_symbol_names(valid_symbols_df)

display_df(valid_symbols_df)

22/02/23 08:59:47 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:51 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


701


22/02/23 08:59:51 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


+------+
|symbol|
+------+
|دسبحان|
|فن آوا|
|دلقما |
+------+
only showing top 3 rows



In [13]:
ETFs = ['آتیمس',
 'آرمانی',
 'آساس',
 'آسام',
 'آسامید',
 'آوا',
 'آکورد',
 'آگاس',
 'ارزش',
 'اطلس',
 'اعتماد',
 'افران',
 'افق ملت',
 'الماس',
 'امین یکم',
 'انار',
 'اهرم',
 'اوج',
 'اوصتا',
 'بذر',
 'تاراز',
 'تصمیم',
 'ثبات',
 'ثروتم',
 'ثمین',
 'ثهام',
 'خاتم',
 'دارا',
 'دارا یکم',
 'داریوش',
 'داریک',
 'رماس',
 'رویش',
 'زر',
 'زرین',
 'زیتون',
 'سبز',
 'سحرخیز',
 'سخند',
 'سرو',
 'سپاس',
 'سپر',
 'سپیدما',
 'سیناد',
 'صایند',
 'صغرب',
 'صنم',
 'صنوین',
 'طلا',
 'عیار',
 'فراز',
 'فردا',
 'فیروزا',
 'مانی',
 'مثقال',
 'مدیر',
 'نارون',
 'نسیم',
 'نهال',
 'هامرز',
 'همای',
 'وبازار',
 'ویستا',
 'پادا',
 'پارند',
 'پالایش',
 'کارا',
 'کاردان',
 'کاریس',
 'کارین',
 'کامیاب',
 'کمند',
 'کهربا',
 'کیان',
 'گنبد',
 'گنجین',
 'گنجینه',
 'گوهر',
 'یارا',
 'یاقوت',
 'فیروزه',]
ETFs = [(i,) for i in ETFs]
ETFs = spark.createDataFrame(data= ETFs,  schema= valid_symbols_df.schema)
valid_symbols_df = valid_symbols_df.union(ETFs).dropDuplicates()
display_df(valid_symbols_df)


right_offers = ['آ س پح',
 'آرمانح',
 'آریانح',
 'آرینح',
 'آکنتورح',
 'اتکامح',
 'اتکایح',
 'اخابرح',
 'ارفعح',
 'اعتلاح',
 'افراح',
 'افقح',
 'البرزح',
 'امیدح',
 'امینح',
 'اوانح',
 'بالبرح',
 'بایکاح',
 'بترانسح',
 'بتکح',
 'بدکوح',
 'برکتح',
 'بزاگرسح',
 'بساماح',
 'بسویچح',
 'بشهابح',
 'بصباح',
 'بفجرح',
 'بموتوح',
 'بمیلاح',
 'بنوح',
 'بنیروح',
 'بهپاکح',
 'بپاسح',
 'بکابح',
 'بکامح',
 'بکهنوجح',
 'تاصیکوح',
 'تاپکیشح',
 'تاپیکوح',
 'تایراح',
 'تجلیح',
 'تشتادح',
 'تفیروح',
 'تلیسهح',
 'تماوندح',
 'تمحرکهح',
 'تملتح',
 'تنوینح',
 'توریلح',
 'تپمپیح',
 'تپکوح',
 'تکشاح',
 'تکمباح',
 'تکنارح',
 'تکنوح',
 'تیپیکوح',
 'ثابادح',
 'ثاختح',
 'ثاصفاح',
 'ثالوندح',
 'ثامانح',
 'ثاژنح',
 'ثباغح',
 'ثترانح',
 'ثرودح',
 'ثشاهدح',
 'ثشرقح',
 'ثعتماح',
 'ثعمراح',
 'ثغربح',
 'ثفارسح',
 'ثقزویح',
 'ثمسکنح',
 'ثنورح',
 'ثنوساح',
 'ثپردیسح',
 'جمح',
 'جهرمح',
 'حبندرح',
 'حتایدح',
 'حتوکاح',
 'حخزرح',
 'حسیناح',
 'حفاریح',
 'حپارساح',
 'حپتروح',
 'حکشتیح',
 'حکمتح',
 'خاذینح',
 'خاهنح',
 'خاورح',
 'خبهمنح',
 'ختراکح',
 'ختورح',
 'ختوقاح',
 'خدیزلح',
 'خریختح',
 'خرینگح',
 'خزامیاح',
 'خزرح',
 'خساپاح',
 'خشرقح',
 'خصدراح',
 'خفناورح',
 'خفنرح',
 'خفولاح',
 'خلنتح',
 'خمحرکهح',
 'خمحورح',
 'خمهرح',
 'خموتورح',
 'خنصیرح',
 'خودروح',
 'خوسازح',
 'خپارسح',
 'خپویشح',
 'خچرخشح',
 'خکارح',
 'خکاوهح',
 'خکمکح',
 'خگسترح',
 'دابورح',
 'دارابح',
 'داروح',
 'داسوهح',
 'دالبرح',
 'دامینح',
 'داناح',
 'دبالکح',
 'دتمادح',
 'دتهرانح',
 'دتوزیعح',
 'دتولیح',
 'دتولیدح',
 'دجابرح',
 'ددامح',
 'درازکح',
 'درهآورح',
 'دروزح',
 'دزهراویح',
 'دسانکوح',
 'دسبحاح',
 'دسبحانح',
 'دسیناح',
 'دشیریح',
 'دشیمیح',
 'دعبیدح',
 'دفاراح',
 'دفراح',
 'دقاضیح',
 'دلرح',
 'دلقماح',
 'دهدشتح',
 'دپارسح',
 'دکوثرح',
 'دکپسولح',
 'دکیمیح',
 'دیرانح',
 'رانفورح',
 'رتاپح',
 'رتکوح',
 'رمپناح',
 'رنیکح',
 'رپارسح',
 'رکیشح',
 'زفکاح',
 'زقیامح',
 'زملاردح',
 'زمگساح',
 'زنجانح',
 'زنگانح',
 'زگلدشتح',
 'ساذریح',
 'سارابح',
 'ساربیلح',
 'ساروجح',
 'سارومح',
 'سامانح',
 'سباقرح',
 'سبجنوح',
 'سبحانح',
 'سبهانح',
 'سترانح',
 'سجامح',
 'سخاشح',
 'سخزرح',
 'سخوافح',
 'سخوزح',
 'سدبیرح',
 'سدشتح',
 'سدورح',
 'سرودح',
 'سرچشمهح',
 'سشرقح',
 'سشمالح',
 'سصفهاح',
 'سصوفیح',
 'سغربح',
 'سفارح',
 'سفارسح',
 'سفارودح',
 'سفاسیتح',
 'سفانوح',
 'سقاینح',
 'سلارح',
 'سمازنح',
 'سمایهح',
 'سمگاح',
 'سنوینح',
 'سنیرح',
 'سهرمزح',
 'سهگمتح',
 'سپاهاح',
 'سپح',
 'سپرمیح',
 'سکارونح',
 'سکردح',
 'سکرماح',
 'سیدکوح',
 'سیلامح',
 'شاراکح',
 'شاملاح',
 'شبریزح',
 'شبهرنح',
 'شتهرانح',
 'شتولیح',
 'شتوکاح',
 'شجمح',
 'شخارکح',
 'شدوصح',
 'شرانلح',
 'شرنگیح',
 'شزنگح',
 'شسمح',
 'شسیناح',
 'شصدفح',
 'شصفهاح',
 'شفاراح',
 'شفارسح',
 'شفنح',
 'شلردح',
 'شلعابح',
 'شموادح',
 'شنفتح',
 'شپارسح',
 'شپاسح',
 'شپاکساح',
 'شپتروح',
 'شپدیسح',
 'شپلیح',
 'شپناح',
 'شکبیرح',
 'شکربنح',
 'شکفح',
 'شکلرح',
 'شگلح',
 'شیرازح',
 'شیرانح',
 'صباح',
 'غاذرح',
 'غالبرح',
 'غبشهرح',
 'غبهنوشح',
 'غبهپاکح',
 'غدامح',
 'غدشتح',
 'غسالمح',
 'غشاذرح',
 'غشانح',
 'غشصفاح',
 'غشهدابح',
 'غشهدح',
 'غشوکوح',
 'غصینوح',
 'غمارگح',
 'غمشهدح',
 'غمهراح',
 'غمینوح',
 'غنابح',
 'غنوشح',
 'غنیلیح',
 'غویتاح',
 'غپاکح',
 'غپینوح',
 'غچینح',
 'غگرجیح',
 'غگرگح',
 'غگلح',
 'فاذرح',
 'فاراکح',
 'فاسمینح',
 'فافزاح',
 'فالبرح',
 'فالومح',
 'فاماح',
 'فاهوازح',
 'فایراح',
 'فباهنرح',
 'فبیراح',
 'فجامح',
 'فجرح',
 'فجوشح',
 'فخاسح',
 'فخوزح',
 'فرآورح',
 'فرومح',
 'فزرینح',
 'فساح',
 'فسازانح',
 'فسدیدح',
 'فسربح',
 'فسپاح',
 'فلاتح',
 'فلامیح',
 'فلولهح',
 'فماکح',
 'فملیح',
 'فن آواح',
 'فنفتح',
 'فنوالح',
 'فنوردح',
 'فولادح',
 'فولاژح',
 'فولایح',
 'فوکاح',
 'فپنتاح',
 'قاسمح',
 'قجامح',
 'قرنح',
 'قزوینح',
 'قشرینح',
 'قشهدح',
 'قشکرح',
 'قشیرح',
 'قصفهاح',
 'قلرستح',
 'قنقشح',
 'قنیشاح',
 'قپارسح',
 'لابساح',
 'لازماح',
 'لبوتانح',
 'لخانهح',
 'لخزرح',
 'لراداح',
 'لسرماح',
 'لوتوسح',
 'لپیامح',
 'لکماح',
 'مادیراح',
 'مدارانح',
 'مرقامح',
 'معیارح',
 'ملتح',
 'میدکوح',
 'میهنح',
 'نبروجح',
 'نتوسح',
 'نشیراح',
 'نمرینوح',
 'نوینح',
 'نیروح',
 'هجرتح',
 'همراهح',
 'وآذرح',
 'وآرینح',
 'وآفریح',
 'وآیندح',
 'واتیح',
 'وارسح',
 'واعتبارح',
 'والبرح',
 'وامیدح',
 'وانصارح',
 'وبانکح',
 'وبشهرح',
 'وبملتح',
 'وبهمنح',
 'وبوعلیح',
 'وبیمهح',
 'وتجارتح',
 'وتعاونح',
 'وتوسح',
 'وتوسمح',
 'وتوسکاح',
 'وتوشهح',
 'وتوصاح',
 'وتوکاح',
 'وثوقح',
 'وحافظح',
 'وخارزمح',
 'وخاورح',
 'وداناح',
 'ودیح',
 'ورازیح',
 'ورناح',
 'وزمینح',
 'وساختح',
 'وساپاح',
 'وسدیدح',
 'وسرمدح',
 'وسناح',
 'وسپهح',
 'وسکابح',
 'وسیناح',
 'وسینح',
 'وشمالح',
 'وصناح',
 'وصندوقح',
 'وصنعتح',
 'وغدیرح',
 'وقوامح',
 'ولبهمنح',
 'ولتجارح',
 'ولرازح',
 'ولساپاح',
 'ولشرقح',
 'ولصنمح',
 'ولغدرح',
 'ولقمانح',
 'ولملتح',
 'ولیزح',
 'ومعادنح',
 'وملتح',
 'ومللح',
 'وملیح',
 'ونفتح',
 'ونوینح',
 'ونیروح',
 'ونیکیح',
 'وهامونح',
 'وهورح',
 'وپارسح',
 'وپاسارح',
 'وپتروح',
 'وپخشح',
 'وکادوح',
 'وکارح',
 'وکوثرح',
 'وگردشح',
 'پارتاح',
 'پارسانح',
 'پارسیانح',
 'پاساح',
 'پاکشوح',
 'پتایرح',
 'پترولح',
 'پخشح',
 'پدرخشح',
 'پرداختح',
 'پردیسح',
 'پسهندح',
 'پشاهنح',
 'پلاستح',
 'پلاسکح',
 'پلولهح',
 'پنکاح',
 'پکرمانح',
 'پکویرح',
 'پکیانح',
 'چافستح',
 'چفیبرح',
 'چکارلح',
 'چکارنح',
 'چکاوهح',
 'کابگنح',
 'کاذرح',
 'کازروح',
 'کاسپینح',
 'کالبرح',
 'کاماح',
 'کاوهح',
 'کایتاح',
 'کبافقح',
 'کترامح',
 'کتوکاح',
 'کحافظح',
 'کخاکح',
 'کدماح',
 'کرازیح',
 'کرماشاح',
 'کرویح',
 'کزغالح',
 'کساوهح',
 'کساپاح',
 'کسراح',
 'کسرامح',
 'کسعدیح',
 'کطبسح',
 'کفرآورح',
 'کفراح',
 'کفپارسح',
 'کقزویح',
 'کلوندح',
 'کماسهح',
 'کمرجانح',
 'کمنگنزح',
 'کمیناح',
 'کنورح',
 'کهرامح',
 'کهمداح',
 'کوثرح',
 'کورزح',
 'کویرح',
 'کپارسح',
 'کپاناح',
 'کپرورح',
 'کپشیرح',
 'کچادح',
 'کگازح',
 'کگلح',
 'کگهرح',
 'کی بی سیح',
 'کیسونح',
 'گوهرانح',
 'گکیشح']
right_offers = [(i,) for i in right_offers]
right_offers = spark.createDataFrame(data= right_offers,  schema= valid_symbols_df.schema)
valid_symbols_df = valid_symbols_df.union(right_offers).dropDuplicates()
display_df(valid_symbols_df)

handly_collected_valid_symbols = [
    'فسلير',
    'نگین',
    'نیرو',
    'غگز',
    'آینده'
]
handly_collected_valid_symbols = [(i,) for i in handly_collected_valid_symbols]
handly_collected_valid_symbols = spark.createDataFrame(data= handly_collected_valid_symbols,  schema= valid_symbols_df.schema)
valid_symbols_df = valid_symbols_df.union(handly_collected_valid_symbols).dropDuplicates()
display_df(valid_symbols_df)


invalid_symbols = [
                    'وکوثر',
                    'حکمت',
                    'ومهر',
                    'جوین',
                    'وقوام',
                    'ممسنی',
                    'غناب',
                    'کارا',
                    'کاوه',
                    'گنگین',
                    'وپویا',
                    'ولپارس',
                    'نوری',
                    'شجی',
                    'ثعمسا',
                    'بگیلان',
                    'بجهرم',
                    'آرمانح',
                    'شساخت',
                    'ثخوز',
                    'قاروم', # capital increase
                    'امید',
                    'وهنر',
                    'تنوین',
                    'وگردش',
                    'آریان', # two different stocks with the same symbol
                    'وآتوس', 
                    'همراه' # capital increase!
    
]

valid_symbols_df = valid_symbols_df.filter(~F.col('symbol').isin(invalid_symbols))
display_df(valid_symbols_df)

22/02/23 08:59:52 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:54 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

782


22/02/23 08:59:55 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:56 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


+------+
|symbol|
+------+
|دسبحان|
|فن آوا|
|دلقما |
+------+
only showing top 3 rows



22/02/23 08:59:57 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 08:59:59 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

1299


22/02/23 09:00:00 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:00:01 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


+------+
|symbol|
+------+
|دسبحان|
|فن آوا|
|بکابح |
+------+
only showing top 3 rows



22/02/23 09:00:02 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:00:04 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

1304


22/02/23 09:00:06 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:00:06 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


+------+
|symbol|
+------+
|دسبحان|
|فن آوا|
|بکابح |
+------+
only showing top 3 rows



22/02/23 09:00:07 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

1282


22/02/23 09:00:09 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:00:10 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


+------+
|symbol|
+------+
|دسبحان|
|فن آوا|
|بکابح |
+------+
only showing top 3 rows



## data preparation

### prepare trade data

In [14]:
trade_df = (
    raw_trade_df
    .withColumn('secondsWithinDay', modify_time_udf('time'))
    .join(valid_symbols_df, on = ['symbol'], how = 'inner')
    .select(
        'date',
        'time',
        'secondsWithinDay',
        'symbol',
        'nTradeShares',
        'tradePrice',
        'tradeSettlementValue',
        dropSpace(F.col('buyerAccountId')).alias('buyerAccountId'),
        dropSpace(F.col('sellerAccountId')).alias('sellerAccountId')
    )
)

display_df(trade_df)
# Note: 'time' columns is not reliable!

22/02/23 09:00:11 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:00:13 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

26223382


22/02/23 09:00:32 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB


+--------+------+----------------+------+------------+----------+--------------------+------------------------------------+------------------------------------+
|date    |time  |secondsWithinDay|symbol|nTradeShares|tradePrice|tradeSettlementValue|buyerAccountId                      |sellerAccountId                     |
+--------+------+----------------+------+------------+----------+--------------------+------------------------------------+------------------------------------+
|13980110|115203|12963603        |ثامان |24928       |3390.0    |8.450592            |ECC77A89-C9AD-494F-8B49-4F178C2D7F3E|2AF253F2-7044-44EA-858B-09DA6A224E86|
|13980110|120816|12963616        |ثاباد |2000        |2320.0    |0.464               |9288482E-9715-4DE3-AAB6-D20D2FB157DE|02F7AA8E-29E7-4594-ACD4-FDAEE6BA957B|
|13980110|122054|12963654        |آسیا  |7573        |1876.0    |1.4206948           |5D9B391F-C8F1-48E8-A9D0-2215FFCB9FCB|C7470FDD-F4DA-472D-895A-EBD1ED249BAD|
+--------+------+----------------+

In [15]:
print('missing nTradeShares: ', round(trade_df.filter(F.col('nTradeShares') == 0).count() / trade_df.count(), 5))
print('missing tradeSettlementValue: ', round(trade_df.filter(F.col('tradeSettlementValue') == 0).count() / trade_df.count(), 5))

22/02/23 09:00:33 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:00:34 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

missing nTradeShares:  0.0


22/02/23 09:00:35 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:00:37 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB

missing tradeSettlementValue:  0.0


                                                                                

### prepare initial portfolio data

In [16]:
raw_portfolio_df.columns

['SPSYMB', 'SPDATE', 'SPACC#', 'SPTROH']

In [17]:
mapping = (
    dict(
    zip(
        ['SPDATE', 'SPSYMB', 'SPACC#', 'SPTROH'],
        ['date', 'symbol', 'accountId', 'nHeldShares'],
    )
    )
)

portfolio_df = (
    raw_portfolio_df
    .select(
    [F.col(c).alias(mapping.get(c, c)) for c in raw_portfolio_df.columns]
    )
    .select(
        'date',
        'symbol',
        dropSpace(F.col('accountId')).alias('accountId'),
        'nHeldShares'
    )
)

portfolio_df = replace_arabic_characters_and_correct_symbol_names(portfolio_df)
display_df(portfolio_df)


replaceChar = F.udf(lambda s: s[:-1], T.StringType())

def agg(x):
    t = ''
    for i in x.split(' '):
        t += i
    return t

def cleaning(data):
    data = (
        data
        .withColumn(
        'symbol',
        F.when(F.col('symbol').endswith('ج'),
         replaceChar(F.col('symbol'))).otherwise(
            F.col('symbol')
        )
        )
    )
    for i in ['اوج', 'بکهنوج', 'ساروج', 'نبروج', 'وسخراج']:
            data = (
                data
                .withColumn(
                    'symbol', F.when(F.col('symbol') == i[:-1], i).otherwise(F.col('symbol'))
                )
            )
    data = data.dropDuplicates()
    return data

portfolio_df = cleaning(portfolio_df)

                                                                                

12109854
+--------+------+------------------------------------+-----------+
|date    |symbol|accountId                           |nHeldShares|
+--------+------+------------------------------------+-----------+
|13980105|خساپا |37D6BD7D-DCF6-4B34-9ACC-AAA8111E0243|32850      |
|13980105|خساپا |F5094FE0-46A8-4C7F-86CD-6EA2DC8D5B42|2237       |
|13980105|اخابر |47229110-AA3E-4972-985B-670E61BAC864|384        |
+--------+------+------------------------------------+-----------+
only showing top 3 rows



In [18]:
portfolio_df = (
    portfolio_df
    .join(valid_symbols_df, on = ['symbol'], how = 'inner')
    .groupBy(['accountId','date','symbol'])
    .agg(
        F.sum('nHeldShares').alias('nHeldShares')
    )
)

display_df(portfolio_df)

22/02/23 09:00:50 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:01:01 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:01:11 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
                                                                                

8769334


22/02/23 09:01:15 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB


+------------------------------------+--------+------+-----------+
|accountId                           |date    |symbol|nHeldShares|
+------------------------------------+--------+------+-----------+
|A196287E-81E5-4C65-B155-B9BEBC3BE905|13980105|پتایر |230        |
|7176A068-09E1-4A19-9644-53529D392E4F|13980105|فلامی |1462       |
|023156CF-DFA6-43E6-9897-79511AA14397|13980105|وسکاب |1936       |
+------------------------------------+--------+------+-----------+
only showing top 3 rows



In [19]:
(
    portfolio_df
    .filter(F.col('nHeldShares') < 0)
    .count()
)

22/02/23 09:01:17 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
                                                                                

0

#### check symbols

In [20]:
price_symbols = price_df.select('symbol').distinct().withColumn('price', F.lit(1))
trade_symbols = trade_df.select('symbol').distinct().withColumn('trade', F.lit(1))
portfolio_symbols = portfolio_df.select('symbol').distinct().withColumn('portfolio', F.lit(1))

symbols_df = (
    trade_symbols
    .join(portfolio_symbols, on = ['symbol'], how = 'outer')
    .join(price_symbols, on = ['symbol'], how = 'outer')
)

print(symbols_df.filter(F.col('price').isNull()).select('symbol').rdd.flatMap(lambda x: x).collect())

22/02/23 09:01:19 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:01:20 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:01:21 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:01:32 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:01:37 WARN DAGScheduler: Broadcasting large task binary with size 35.9 MiB
[Stage 190:>                                                        (0 + 2) / 2]

[]


                                                                                

In [21]:
w = Window().partitionBy('symbol').orderBy('date')
w2 = Window().partitionBy('symbol').orderBy(price_df.date.desc())

price_return_df = (price_df.select('symbol', 'mktcap',
   ((
       F.first('close_price',True).over(w2) - F.first('close_price',True).over(w)
       ) 
       / F.first('close_price',True).over(w)
       ).alias('price_return'),
       )
   .dropDuplicates(['symbol'])
)
tempt = price_df.na.drop(
    how = 'any',
)
large_small_stocks =( 
    tempt[tempt.date == 13980328]
    .withColumn('sizeDecile', F.ntile(10).over(Window.partitionBy().orderBy('mktcap')))
) 
price_return_df = price_return_df.join(
    large_small_stocks.select(
        F.col('symbol'),
    F.col('sizeDecile'),
    ) , on =['symbol']
).select(
    F.col('symbol'),
    F.col('mktcap'),
    F.col('price_return'),
    F.col('sizeDecile'),
)

In [22]:
(
    price_return_df
    .groupBy('sizeDecile')
    .agg(
        F.round(F.expr('percentile(mktcap, array(0.5))')[0], 3).alias('medianmktcap'),
        F.round(F.mean('price_return'), 2).alias('meanReturn'),
        F.round(F.expr('percentile(price_return, array(0.5))')[0], 3).alias('medianReturn')
    )
    .orderBy('sizeDecile')
    .show()
)

22/02/23 09:01:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/23 09:01:39 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:01:39 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:01:45 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/23 09:01:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/23 09:01:47 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:01:48 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/23 09:01:49 WARN DAGScheduler: Broadcasting large task binary with size 1

+----------+------------+----------+------------+
|sizeDecile|medianmktcap|meanReturn|medianReturn|
+----------+------------+----------+------------+
|         1|    33021.78|      0.52|       0.044|
|         2|     69850.0|      0.49|       0.389|
|         3|    111675.0|       0.6|       0.493|
|         4|    147260.0|      0.61|       0.514|
|         5|    203036.0|      0.62|       0.562|
|         6|    296800.0|      0.57|       0.519|
|         7|    435278.0|      0.69|       0.545|
|         8|    703566.0|      0.78|       0.558|
|         9|   1570800.0|      0.62|       0.383|
|        10|   8119500.0|      0.25|       0.205|
+----------+------------+----------+------------+



[Stage 199:>                                                        (0 + 1) / 1]                                                                                

### general insights

#### check compatibility of the two datasets

In [23]:
common_investors_df = (
    trade_df
    .select(F.col('buyerAccountId').alias('accountId'))
    .union(trade_df.select(F.col('sellerAccountId').alias('accountId')))
    .dropDuplicates()
    .withColumn('trade', F.lit(1))
    .join(portfolio_df.select('accountId', F.lit(1).alias('portfolio')).dropDuplicates(), on = ['accountId'], how = 'outer')
    .fillna(0, subset = ['trade', 'portfolio'])
)

display_df(common_investors_df)

22/02/23 09:01:53 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:01:53 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:02:03 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:02:13 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:02:40 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB
                                                                                

4371963


22/02/23 09:02:46 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB


+------------------------------------+-----+---------+
|accountId                           |trade|portfolio|
+------------------------------------+-----+---------+
|00019AFD-C89B-4B63-98BB-18BF5A112C6F|0    |1        |
|000217BA-3C48-4458-8CAA-691CA19C7187|0    |1        |
|000249D5-649B-4C99-AE9E-82AB979E80C9|0    |1        |
+------------------------------------+-----+---------+
only showing top 3 rows



In [24]:
trade_only = common_investors_df.filter( (F.col('trade') == 1) & (F.col('portfolio') == 0)).count()
all_trade = common_investors_df.filter(F.col('trade') == 1).count()

print('share of missing portfolio accounts among traders:', round(100 * trade_only / all_trade, 2), '%')
# It seems reasonable to attribute this missing portion to the new entrants!

22/02/23 09:02:48 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB
22/02/23 09:02:51 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB

share of missing portfolio accounts among traders: 21.13 %


                                                                                

In [25]:
portfolio_only = common_investors_df.filter( (F.col('trade') == 0) & (F.col('portfolio') == 1)).count()
all_portfolio = common_investors_df.filter(F.col('portfolio') == 1).count()

print('share of missing trades among investors who have nitial portfolio:', round(100 * portfolio_only / all_portfolio, 2), '%')
# It seems reasonable to attribute this missing portion to the new entrants!

22/02/23 09:02:55 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB
22/02/23 09:02:59 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB

share of missing trades among investors who have nitial portfolio: 86.21 %


                                                                                

#### number of unique investors

In [26]:
(
    trade_df
    .select(F.col('buyerAccountId').alias('accountId'))
    .union(trade_df.select(F.col('sellerAccountId').alias('accountId')))
    .dropDuplicates()
    .count()
)

22/02/23 09:03:02 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:03:09 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:03:11 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:03:24 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
                                                                                

737297

#### number of stocks within investors' initial portfolios

In [27]:
(
    portfolio_df
    .groupBy('accountId')
    .count()
    .agg(
        F.expr('percentile(count, array(0.25))')[0].alias('25%'),
        F.expr('percentile(count, array(0.50))')[0].alias('50%'),
        F.round(F.mean('count'), 4).alias('mean'),
        F.expr('percentile(count, array(0.75))')[0].alias('75%'),
        F.expr('percentile(count, array(0.9))')[0].alias('90%'),
        F.expr('percentile(count, array(0.99))')[0].alias('99%'),
    )
    .show()
)

22/02/23 09:03:27 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:03:35 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:03:38 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB


+---+---+------+---+---+----+
|25%|50%|  mean|75%|90%| 99%|
+---+---+------+---+---+----+
|1.0|1.0|2.0799|2.0|4.0|15.0|
+---+---+------+---+---+----+



                                                                                

#### compare trade value of new entrants with other investors

In [28]:
(
    common_investors_df
    .select(F.col('accountId').alias('buyerAccountId'), F.col('portfolio').alias('hasPortfolio'))
    .join(trade_df, on = ['buyerAccountId'], how = 'right')
    .groupBy('hasPortfolio')
    .agg(
        F.round(F.expr('percentile(tradeSettlementValue, array(0.5))')[0], 2).alias('median_buyValue'),
        F.round(F.mean('tradeSettlementValue'), 2).alias('mean_buyValue')
    )
    .show()
)

22/02/23 09:03:40 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB
22/02/23 09:03:40 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:03:47 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB
22/02/23 09:03:52 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB
22/02/23 09:04:34 WARN DAGScheduler: Broadcasting large task binary with size 18.1 MiB
[Stage 394:>                                                        (0 + 1) / 1]

+------------+---------------+-------------+
|hasPortfolio|median_buyValue|mean_buyValue|
+------------+---------------+-------------+
|           1|            1.0|         3.44|
|           0|           0.78|         2.57|
+------------+---------------+-------------+



                                                                                

#### stocks whose shares were given to the mass general public

In [29]:
mass_public_stocks_df = (
    portfolio_df
    .groupBy('symbol', 'nHeldShares')
    .agg(
        F.countDistinct('accountId').alias('nHolders')
    )
    .withColumn('nAllHolders', F.sum('nHolders').over(Window.partitionBy('symbol')))
    .withColumn('rank', F.row_number().over(Window.partitionBy('symbol').orderBy(F.desc('nHolders'))))
    .filter(F.col('rank') == 1)
    .drop('rank')
    .orderBy(F.desc('nHolders'))
    .withColumn('shareOfHolders', F.round(F.col('nHolders') / F.col('nAllHolders'), 3))
#     .join(price_df.filter(F.col('date') == 13980105).select('symbol', 'shrout'), on = 'symbol', how = 'left')
#     .withColumn('shareOfShares', F.round(F.col('nHeldShares')*F.col('nHolders') / F.col('shrout'), 3))
)

display_df(mass_public_stocks_df)

22/02/23 09:04:45 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:04:58 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:05:06 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:05:14 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:05:17 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:05:18 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
                                                                                

663


22/02/23 09:05:22 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB


+------+-----------+--------+-----------+--------------+
|symbol|nHeldShares|nHolders|nAllHolders|shareOfHolders|
+------+-----------+--------+-----------+--------------+
|زماهان|120        |266735  |332130     |0.803         |
|ومعلم |229        |209919  |261378     |0.803         |
|سمایه |300        |185411  |302045     |0.614         |
+------+-----------+--------+-----------+--------------+
only showing top 3 rows



In [30]:
mass_public_stocks_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/mass_public_stocks.parquet')

22/02/23 09:05:24 WARN DAGScheduler: Broadcasting large task binary with size 18.2 MiB
                                                                                

## make daily portfolios

### flatten trade data

In [31]:
raw_flat_trade_df = spark.read.parquet(PATH_TRADE + "{}".format("raw_flat_trade_df.parquet"))

display_df(raw_flat_trade_df)



15170298
+--------+------+------------------------------------+------------+---------+------+
|date    |symbol|accountId                           |nTradeShares|cashOut  |cashIn|
+--------+------+------------------------------------+------------+---------+------+
|13980204|ومعلم |D41B1B0F-FB40-4ACE-BF3B-4AA4E6700EA2|-229        |0.074654 |0.0   |
|13980204|پارس  |D41B1B0F-FB40-4ACE-BF3B-4AA4E6700EA2|-2          |0.0076702|0.0   |
|13980204|بزاگرس|D41B1B0F-FB40-4ACE-BF3B-4AA4E6700EA2|-449        |0.145476 |0.0   |
+--------+------+------------------------------------+------------+---------+------+
only showing top 3 rows



                                                                                

In [32]:
print(round(100*raw_flat_trade_df.filter(
    (F.col('cashIn') != 0)&
    (F.col('cashOut') != 0) ).count() / raw_flat_trade_df.count(), 2),'%')



3.42 %


In [33]:
print(raw_flat_trade_df.filter(F.col('nTradeShares') == 0).count())
print(trade_df.filter(F.col('tradeSettlementValue') == 0).count())

187070


22/02/23 09:05:32 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB

0


                                                                                

In [34]:
print(raw_flat_trade_df.filter(F.col('cashIn') > 0 ).count())
print(raw_flat_trade_df.filter(F.col('cashOut') < 0 ).count())

0
0


### make daily portfolios

In [35]:
def make_daily_portfolio():
    window = (
        Window.partitionBy('accountId', 'symbol')
        .orderBy('date')
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    )
    return (F.sum('nHeldShares').over(window), F.sum('cashOut').over(window), F.sum('cashIn').over(window))

raw_daily_portfolio_df = (
    portfolio_df
    .select('date',
            'symbol', 
            'accountId', 
            'nHeldShares', 
            F.lit(0).alias('cashOut'),
            F.lit(0).alias('cashIn')
           )
    .union(
        raw_flat_trade_df
        .withColumnRenamed('nTradeShares', 'nHeldShares')
    )
    .groupBy('date', 'symbol', 'accountId')
    .agg(
        F.sum('nHeldShares').alias('nHeldShares'),
        F.sum('cashOut').alias('cashOut'),
        F.sum('cashIn').alias('cashIn')
    )
    .orderBy('accountId', 'date')
    .withColumn('heldShares', make_daily_portfolio()[0])
    .withColumn('netCashOut', make_daily_portfolio()[1])
    .withColumn('netCashIn', make_daily_portfolio()[2])
    .drop('nHeldShares', 'settlementValue', 'cashIn', 'cashOut')
)

display_df(raw_daily_portfolio_df)

22/02/23 09:05:35 WARN DAGScheduler: Broadcasting large task binary with size 17.9 MiB
22/02/23 09:06:14 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:06:28 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:06:32 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:06:41 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:06:51 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
                                                                                

23765673


22/02/23 09:06:59 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB


+--------+------+------------------------------------+----------+----------+---------+
|date    |symbol|accountId                           |heldShares|netCashOut|netCashIn|
+--------+------+------------------------------------+----------+----------+---------+
|13980105|چکاپا |00026661-733B-49E0-AC93-ED5812290A9B|1740      |0.0       |0.0      |
|13980221|چکاپا |00026661-733B-49E0-AC93-ED5812290A9B|0         |0.83346   |0.0      |
|13980105|ارفع  |00029878-6EF9-49A7-B231-61DBED05BDE7|32796     |0.0       |0.0      |
+--------+------+------------------------------------+----------+----------+---------+
only showing top 3 rows



#### invalid holdings

In [36]:
invalid_holdings_df = (
    raw_daily_portfolio_df
    .filter(F.col('heldShares') < 0)
    .select('accountId', 'symbol')
    .dropDuplicates()
    .withColumn('invalidHolding', F.lit(1))
)
display_df(invalid_holdings_df)


flat_trade_df = (
    raw_flat_trade_df
    .join(invalid_holdings_df, on = ['accountId', 'symbol'], how = 'left')
    .filter(F.col('invalidHolding').isNull())
    .drop('invalidHolding')
)
display_df(flat_trade_df)


daily_portfolio_df = (
    raw_daily_portfolio_df
    .join(invalid_holdings_df, on = ['accountId', 'symbol'], how = 'left')
    .filter(F.col('invalidHolding').isNull())
    .drop('invalidHolding')
)
display_df(daily_portfolio_df)

22/02/23 09:07:01 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
                                                                                

171376


22/02/23 09:07:04 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB


+------------------------------------+-------+--------------+
|accountId                           |symbol |invalidHolding|
+------------------------------------+-------+--------------+
|006BE8C1-A950-44B8-B2B5-FD293FAFD7B6|وساپا  |1             |
|0122EC9B-B0A1-4510-8D41-55731733AE20|ومعلم  |1             |
|01ED3611-405C-432F-B2A0-E0CC337FA36F|فرابورس|1             |
+------------------------------------+-------+--------------+
only showing top 3 rows



22/02/23 09:07:06 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
                                                                                

14601858
+------------------------------------+------+--------+------------+---------+------+
|accountId                           |symbol|date    |nTradeShares|cashOut  |cashIn|
+------------------------------------+------+--------+------------+---------+------+
|D41B1B0F-FB40-4ACE-BF3B-4AA4E6700EA2|ومعلم |13980204|-229        |0.074654 |0.0   |
|D41B1B0F-FB40-4ACE-BF3B-4AA4E6700EA2|پارس  |13980204|-2          |0.0076702|0.0   |
|D41B1B0F-FB40-4ACE-BF3B-4AA4E6700EA2|بزاگرس|13980204|-449        |0.145476 |0.0   |
+------------------------------------+------+--------+------------+---------+------+
only showing top 3 rows



22/02/23 09:07:15 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:07:16 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
                                                                                

23160625


22/02/23 09:07:23 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB


+------------------------------------+------+--------+----------+----------+---------+
|accountId                           |symbol|date    |heldShares|netCashOut|netCashIn|
+------------------------------------+------+--------+----------+----------+---------+
|00026661-733B-49E0-AC93-ED5812290A9B|چکاپا |13980105|1740      |0.0       |0.0      |
|00026661-733B-49E0-AC93-ED5812290A9B|چکاپا |13980221|0         |0.83346   |0.0      |
|00029878-6EF9-49A7-B231-61DBED05BDE7|ارفع  |13980105|32796     |0.0       |0.0      |
+------------------------------------+------+--------+----------+----------+---------+
only showing top 3 rows



#### new entrants df

In [37]:
new_entrant_account_ids_df = (
    flat_trade_df
    .groupBy('accountId')
    .agg(
        F.min('date').alias('firstDate')
    )
    .join(portfolio_df.select('accountId').distinct(), on = 'accountId', how = 'left_anti')
)

display_df(new_entrant_account_ids_df)

22/02/23 09:07:26 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
22/02/23 09:07:36 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB
                                                                                

146812


22/02/23 09:07:41 WARN DAGScheduler: Broadcasting large task binary with size 18.0 MiB


+------------------------------------+---------+
|accountId                           |firstDate|
+------------------------------------+---------+
|000E126E-9959-4796-A329-C9839A3C0FED|13980231 |
|00B67D2A-565A-4D31-9C1F-9E329D76B018|13980209 |
|00E62831-4844-4CF1-9611-68FB9FB76B80|13980209 |
+------------------------------------+---------+
only showing top 3 rows



In [38]:
new_entrant_account_ids_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/new_entrant_account_ids.parquet')

22/02/23 09:08:00 WARN DAGScheduler: Broadcasting large task binary with size 18.2 MiB
                                                                                

#### time series of new entrants

In [None]:
new_entrantd_time_series_df = (
    flat_trade_df
    .select('date', 'accountId')
    .dropDuplicates()
    .join(new_entrant_account_ids_df, on = 'accountId', how = 'inner')
    .withColumn('rank', F.row_number().over(Window.partitionBy('accountId').orderBy('date')))
    .filter(F.col('rank') == 1)
    .drop('rank')
    .groupBy('date')
    .count()
    .orderBy('date')
)

display_df(new_entrantd_time_series_df)

In [None]:
new_entrantd_time_series_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/new_entrantd_time_series_df.parquet')

### calculate gain from trade

In [None]:
gain_from_trade_df = (
    flat_trade_df
    .groupBy('accountId')
    .agg(
        F.sum('cashOut').alias('netCashOut'),
        F.sum('cashIn').alias('netCashIn'),
    )
)

display_df(gain_from_trade_df)

### calculate value of the initial portfolio

In [None]:
initial_portfolio_value_df = (
    portfolio_df
    .join(price_df.select('date', 'symbol', 'close_price'), on = ['date', 'symbol'], how = 'left')
    .dropna(subset = ['close_price'])
    .join(invalid_holdings_df, on = ['accountId', 'symbol'], how = 'left')
    .filter(F.col('invalidHolding').isNull())
    .withColumn('value', F.col('nHeldShares') * F.col('close_price'))
    .groupBy('accountId')
    .agg(
        (F.sum('value') / 10**7).alias('initialPortfolioValue')
    )
)

display_df(initial_portfolio_value_df)
# count after join?

In [None]:
print(initial_portfolio_value_df.filter(F.col('initialPortfolioValue').isNull()).count())
print(initial_portfolio_value_df.filter(F.col('initialPortfolioValue') <= 0).count())

In [None]:
# (
#     initial_portfolio_value_df
#     .agg(
#         F.round(F.min('initialPortfolioValue'), 2).alias('min'),
#         F.round(F.expr('percentile(initialPortfolioValue, array(0.01))')[0], 2).alias('1%'),
#         F.round(F.expr('percentile(initialPortfolioValue, array(0.25))')[0], 2).alias('25%'),
#         F.round(F.expr('percentile(initialPortfolioValue, array(0.5))')[0], 2).alias('50%'),
#         F.round(F.mean('initialPortfolioValue'), 2).alias('mean'),
#         F.round(F.expr('percentile(initialPortfolioValue, array(0.75))')[0], 2).alias('75%'),
#         F.round(F.expr('percentile(initialPortfolioValue, array(0.9))')[0], 2).alias('90%'),
#         F.round(F.expr('percentile(initialPortfolioValue, array(0.99))')[0], 2).alias('99%'),
#         F.round(F.expr('percentile(initialPortfolioValue, array(0.999))')[0], 2).alias('99.9%'),
#     )
#     .show()
# )

In [None]:
# initial_portfolio_value_df.write.mode('overwrite').parquet('/home/user1/Data/initial_portfolio_value_df.parquet')

### calculate value of the final portfolio

In [None]:
final_portfolio_value_df = (
    daily_portfolio_df
    .withColumn('rowNumber', F.row_number().over(Window.partitionBy('accountId', 'symbol').orderBy('date')))
    .withColumn('maxRowNumber', F.max('rowNumber').over(Window.partitionBy('accountId', 'symbol')))
    .filter(F.col('rowNumber') == F.col('maxRowNumber'))
    .filter(F.col('heldShares') > 0)
    .withColumn('date', F.lit(MAX_PRICE_DATE))
    .join(price_df.select('date', 'symbol', 'close_price'), on = ['date', 'symbol'], how = 'left')
    .dropna(subset = ['close_price'])
    .withColumn('value', F.col('heldShares') * F.col('close_price'))
    .groupBy('accountId')
    .agg(
        (F.sum('value') / 10**7).alias('finalPortfolioValue')
    )   
)

display_df(final_portfolio_value_df)
# count after join?

In [None]:
print(final_portfolio_value_df.filter(F.col('finalPortfolioValue').isNull()).count())
print(final_portfolio_value_df.filter(F.col('finalPortfolioValue') <= 0).count())

In [None]:
# (
#     final_portfolio_value_df
#     .agg(
#         F.round(F.min('finalPortfolioValue'), 2).alias('min'),
#         F.round(F.expr('percentile(finalPortfolioValue, array(0.1))')[0], 2).alias('10%'),
#         F.round(F.expr('percentile(finalPortfolioValue, array(0.25))')[0], 2).alias('25%'),
#         F.round(F.expr('percentile(finalPortfolioValue, array(0.5))')[0], 2).alias('50%'),
#         F.round(F.mean('finalPortfolioValue'), 2).alias('mean'),
#         F.round(F.expr('percentile(finalPortfolioValue, array(0.75))')[0], 2).alias('75%'),
#         F.round(F.expr('percentile(finalPortfolioValue, array(0.9))')[0], 2).alias('90%'),
#         F.round(F.expr('percentile(finalPortfolioValue, array(0.99))')[0], 2).alias('99%'),
#         F.round(F.expr('percentile(finalPortfolioValue, array(0.999))')[0], 2).alias('99.9%'),
#     )
#     .show()
# )

In [None]:
# final_portfolio_value_df.write.mode('overwrite').parquet('/home/user1/Data/final_portfolio_value_df.parquet')

### time series of the net cash in

In [None]:
max_portfolio_value_df = (
    final_portfolio_value_df
    .join(initial_portfolio_value_df, on = ['accountId'], how = 'outer')
    .fillna(0)
    .withColumn('maxPortfolioValue', F.greatest(F.col('initialPortfolioValue'), F.col('finalPortfolioValue')))
    .withColumn('type', F.when(F.col('maxPortfolioValue') < 10, 'lessThan10MT')
                         .when(F.col('maxPortfolioValue').between(10, 20), 'between10MTand20MT')
                         .when(F.col('maxPortfolioValue').between(20, 50), 'between20MTand50MT')
                         .otherwise('greaterThan50MT')
               )
    .select('accountId', 'type')
    .dropDuplicates()
)

display_df(max_portfolio_value_df)

In [None]:
dates_list = price_df.select('date').distinct().orderBy('date').rdd.flatMap(lambda x: x).collect()

cash_time_series_df = (
    flat_trade_df
    .withColumn('netCash', F.col('cashIn') + F.col('cashOut'))
    .join(max_portfolio_value_df, on = 'accountId', how = 'inner')
    .groupBy('type', 'date')
    .agg(
        F.round((-F.sum('netCash'))).alias('netCash'),
        F.countDistinct('accountId').alias('nAccounts')
    )
    .orderBy('date', 'type')
)

cash_time_series_df.show(4)

In [None]:
cash_time_series_df.write.mode('overwrite').parquet('/home/user1/Data/cash_time_series.parquet')

## calculate returns

In [None]:
return_df = (
    gain_from_trade_df
    .join(initial_portfolio_value_df, on = 'accountId', how = 'outer')
    .join(final_portfolio_value_df, on = 'accountId', how = 'outer')
    .fillna(0, subset = ['netCashIn', 'netCashOut', 'initialPortfolioValue', 'finalPortfolioValue'])
    .withColumn('return', 
                ((F.col('finalPortfolioValue') + F.col('netCashOut')) / (F.col('initialPortfolioValue') + (-F.col('netCashIn')))) - 1)
    .filter(F.col('return').isNotNull())
    .withColumn('returnDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('return')))
)

display_df(return_df)
# null returns?

In [None]:
(
    return_df
    .filter(F.col('return') == 0)
    .count()
)


In [None]:
(
    return_df
    .groupBy('returnDecile')
    .agg(
        F.round(F.expr('percentile(return, array(0.5))')[0], 3).alias('medianReturn')
    )
    .show()
)

In [None]:
(
    return_df
    .agg(
       F.round(F.min('return'), 2).alias('min'),
        F.round(F.expr('percentile(return, array(0.01))')[0], 2).alias('1%'),
        F.round(F.expr('percentile(return, array(0.1))')[0], 2).alias('10%'),
        F.round(F.expr('percentile(return, array(0.25))')[0], 2).alias('25%'),
        F.round(F.expr('percentile(return, array(0.5))')[0], 2).alias('50%'),
        F.round(F.mean('return'), 2).alias('mean'),
        F.round(F.expr('percentile(return, array(0.75))')[0], 2).alias('75%'),
        F.round(F.expr('percentile(return, array(0.9))')[0], 2).alias('90%'),
        F.round(F.expr('percentile(return, array(0.99))')[0], 2).alias('99%'),
        F.round(F.expr('percentile(return, array(0.999))')[0], 2).alias('99.9%'),
    )
    .show()
)

In [None]:
return_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/return_output.parquet')

### final portfolio value output

In [None]:
output_final_portfolio_value = (
    final_portfolio_value_df
    .join(return_df.select('accountId', 'return'), on = 'accountId')
    .withColumn('finalPortfolioValueDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('finalPortfolioValue')))
)

display_df(output_final_portfolio_value)

In [None]:
(
    output_final_portfolio_value
    .groupBy('finalPortfolioValueDecile')
    .agg(
        F.round(F.expr('percentile(finalPortfolioValue, array(0.5))')[0], 3).alias('medianFinalPortfolioValue'),
        F.round(F.mean('return'), 2).alias('meanReturn'),
        F.round(F.expr('percentile(return, array(0.5))')[0], 3).alias('medianReturn')
    )
    .orderBy('finalPortfolioValueDecile')
    .show()
)

In [None]:
output_final_portfolio_value.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/final_portfolio_output.parquet')

### initial portfolio value output

In [None]:
output_initial_portfolio_value = (
    initial_portfolio_value_df
    .join(return_df.select('accountId', 'return'), on = 'accountId')
    .withColumn('initialPortfolioValueDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('initialPortfolioValue')))
)

display_df(output_initial_portfolio_value)

In [None]:
(
    output_initial_portfolio_value
    .groupBy('initialPortfolioValueDecile')
    .agg(
        F.round(F.expr('percentile(initialPortfolioValue, array(0.5))')[0], 3).alias('medianInitialPortfolioValue'),
        F.round(F.mean('return'), 2).alias('meanReturn'),
        F.round(F.expr('percentile(return, array(0.5))')[0], 5).alias('medianReturn')
    )
    .orderBy('initialPortfolioValueDecile')
    .show()
)

In [None]:
output_initial_portfolio_value.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/inital_portfolio_output.parquet')

### calculate frequency of trades and active days

In [None]:
active_days_df = (
    raw_flat_trade_df
    .groupBy('accountId', 'date')
    .agg(
        F.sum('cashIn').alias('netCashIn'),
        F.sum('cashOut').alias('netCashOut')
    )
    .withColumn('netCash', F.col('netCashIn') + F.col('netCashOut'))
    .groupBy('accountId')
    .agg(
        F.count(F.when(F.col('netCash') < 0, F.lit(1))).alias('nBuyDays'),
        F.count(F.when(F.col('netCash') > 0, F.lit(1))).alias('nSellDays')
    )
    .fillna(0, subset = ['nBuyDays', 'nSellDays'])
)

display_df(active_days_df)

In [None]:
buy_trade_df = (
    trade_df
        .select(
        'date',
        'symbol',
        F.col('buyerAccountId').alias('accountId'),
        'nTradeShares',
        (-F.col('tradeSettlementValue')).alias('settlementValue'),
        )
)

sell_trade_df = (
    trade_df
        .select(
            'date',
            'symbol',
            F.col('sellerAccountId').alias('accountId'),
            (-F.col('nTradeShares')).alias('nTradeShares'),
            F.col('tradeSettlementValue').alias('settlementValue')
        )
)

In [None]:
trade_kpi_df = (
    buy_trade_df
    .union(sell_trade_df)
    .groupBy('accountId')
    .agg(
        F.count(F.lit(1)).alias('tradeFrequency'),
        F.mean(F.abs('settlementValue')).alias('meanTradeValue'),
        F.sum('settlementValue').alias('netSumTradeValue'),
        F.sum(F.abs('settlementValue')).alias('absSumTradeValue'),
        F.countDistinct('date').alias('activeDays'),
    )
    .join(active_days_df, on = 'accountId')
)

display_df(trade_kpi_df)

In [None]:
(
    trade_kpi_df
    .agg(
        F.round(F.expr('percentile(tradeFrequency, array(0.25))')[0], 2).alias('25% percentile'),
        F.round(F.expr('percentile(tradeFrequency, array(0.5))')[0], 2).alias('50% percentile'),
        F.round(F.mean('tradeFrequency'), 2).alias('mean'),
        F.round(F.expr('percentile(tradeFrequency, array(0.75))')[0], 2).alias('75% percentile'),
        F.round(F.expr('percentile(tradeFrequency, array(0.9))')[0], 2).alias('90% percentile'),
        F.round(F.expr('percentile(tradeFrequency, array(0.99))')[0], 2).alias('99% percentile'),
        F.round(F.expr('percentile(tradeFrequency, array(0.999))')[0], 2).alias('99.9% percentile'),
    )
    .show()
)

In [None]:
# (
#     trade_kpi_df
#     .agg(
#         F.round(F.expr('percentile(meanTradeValue, array(0.25))')[0], 2).alias('25% percentile'),
#         F.round(F.expr('percentile(meanTradeValue, array(0.5))')[0], 2).alias('50% percentile'),
#         F.round(F.mean('meanTradeValue'), 2).alias('mean'),
#         F.round(F.expr('percentile(meanTradeValue, array(0.75))')[0], 2).alias('75% percentile'),
#         F.round(F.expr('percentile(meanTradeValue, array(0.9))')[0], 2).alias('90% percentile'),
#         F.round(F.expr('percentile(meanTradeValue, array(0.99))')[0], 2).alias('99% percentile'),
#         F.round(F.expr('percentile(meanTradeValue, array(0.999))')[0], 2).alias('99.9% percentile'),
#     )
#     .show()
# )

In [None]:
# (
#     trade_kpi_df
#     .agg(
#         F.round(F.expr('percentile(netSumTradeValue, array(0.25))')[0], 2).alias('25% percentile'),
#         F.round(F.expr('percentile(netSumTradeValue, array(0.5))')[0], 2).alias('50% percentile'),
#         F.round(F.mean('netSumTradeValue'), 2).alias('mean'),
#         F.round(F.expr('percentile(netSumTradeValue, array(0.75))')[0], 2).alias('75% percentile'),
#         F.round(F.expr('percentile(netSumTradeValue, array(0.9))')[0], 2).alias('90% percentile'),
#         F.round(F.expr('percentile(netSumTradeValue, array(0.99))')[0], 2).alias('99% percentile'),
#         F.round(F.expr('percentile(netSumTradeValue, array(0.999))')[0], 2).alias('99.9% percentile'),
#     )
#     .show()
# )

In [None]:
# (
#     trade_kpi_df
#     .agg(
#         F.round(F.expr('percentile(absSumTradeValue, array(0.25))')[0], 2).alias('25% percentile'),
#         F.round(F.expr('percentile(absSumTradeValue, array(0.5))')[0], 2).alias('50% percentile'),
#         F.round(F.mean('absSumTradeValue'), 2).alias('mean'),
#         F.round(F.expr('percentile(absSumTradeValue, array(0.75))')[0], 2).alias('75% percentile'),
#         F.round(F.expr('percentile(absSumTradeValue, array(0.9))')[0], 2).alias('90% percentile'),
#         F.round(F.expr('percentile(absSumTradeValue, array(0.99))')[0], 2).alias('99% percentile'),
#         F.round(F.expr('percentile(absSumTradeValue, array(0.999))')[0], 2).alias('99.9% percentile'),
#     )
#     .show()
# )

In [None]:
(
    trade_kpi_df
    .agg(
        F.round(F.expr('percentile(activeDays, array(0.25))')[0], 2).alias('25% percentile'),
        F.round(F.expr('percentile(activeDays, array(0.5))')[0], 2).alias('50% percentile'),
        F.round(F.mean('activeDays'), 2).alias('mean'),
        F.round(F.expr('percentile(activeDays, array(0.75))')[0], 2).alias('75% percentile'),
        F.round(F.expr('percentile(activeDays, array(0.9))')[0], 2).alias('90% percentile'),
        F.round(F.expr('percentile(activeDays, array(0.99))')[0], 2).alias('99% percentile'),
        F.round(F.expr('percentile(activeDays, array(0.999))')[0], 2).alias('99.9% percentile'),
    )
    .show()
)

In [None]:
(
    trade_kpi_df
    .agg(
        F.round(F.expr('percentile(nBuyDays, array(0.25))')[0], 2).alias('25% percentile'),
        F.round(F.expr('percentile(nBuyDays, array(0.5))')[0], 2).alias('50% percentile'),
        F.round(F.mean('nBuyDays'), 2).alias('mean'),
        F.round(F.expr('percentile(nBuyDays, array(0.75))')[0], 2).alias('75% percentile'),
        F.round(F.expr('percentile(nBuyDays, array(0.9))')[0], 2).alias('90% percentile'),
        F.round(F.expr('percentile(nBuyDays, array(0.99))')[0], 2).alias('99% percentile'),
        F.round(F.expr('percentile(nBuyDays, array(0.999))')[0], 2).alias('99.9% percentile'),
    )
    .show()
)

In [None]:
(
    trade_kpi_df
    .agg(
        F.round(F.expr('percentile(nSellDays, array(0.25))')[0], 2).alias('25% percentile'),
        F.round(F.expr('percentile(nSellDays, array(0.5))')[0], 2).alias('50% percentile'),
        F.round(F.mean('nSellDays'), 2).alias('mean'),
        F.round(F.expr('percentile(nSellDays, array(0.75))')[0], 2).alias('75% percentile'),
        F.round(F.expr('percentile(nSellDays, array(0.9))')[0], 2).alias('90% percentile'),
        F.round(F.expr('percentile(nSellDays, array(0.99))')[0], 2).alias('99% percentile'),
        F.round(F.expr('percentile(nSellDays, array(0.999))')[0], 2).alias('99.9% percentile'),
    )
    .show()
)

In [None]:
print(trade_kpi_df.count() - trade_kpi_df.dropna().count())

In [None]:
trade_output_df = (
    trade_kpi_df
    .join(return_df.select('accountId', 'return').dropDuplicates(), on = ['accountId'])
    .dropna()
    .withColumn('tradeFrequencyDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('tradeFrequency')))
    .withColumn('meanTradeValueDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('meanTradeValue')))
    .withColumn('netSumTradeValueDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('netSumTradeValue')))
    .withColumn('absSumTradeValueDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('absSumTradeValue')))
    .withColumn('activeDaysDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('activeDays')))
    .withColumn('nBuyDaysDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('nBuyDays')))
    .withColumn('nSellDaysDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('nSellDays')))
)

display_df(trade_output_df)

In [None]:
print(round(trade_output_df.filter(F.col('return') >= 0.35).count() / trade_output_df.count() ,2))

In [None]:
(
    trade_output_df
    .agg(
        F.round(F.expr('percentile(return, array(0.5))')[0], 3).alias('medianReturn')
    )
    .show()
)

In [None]:
(
    trade_output_df
    .groupBy('tradeFrequencyDecile')
    .agg(
        F.round(F.expr('percentile(return, array(0.5))')[0], 3).alias('medianReturn')
    )
    .orderBy('tradeFrequencyDecile')
    .show()
)

In [None]:
trade_output_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/trade_output.parquet')

### identify block holders

In [None]:
bh_df = (
    daily_portfolio_df
    .select('date', 'symbol', 'accountId', 'heldShares')
    .join(price_df.select('date', 'symbol', 'shrout'), on = ['date', 'symbol'])
    .withColumn('ownership', F.col('heldShares') / F.col('shrout'))
    .filter( (F.col('ownership') >= 0.01) & F.col('ownership').isNotNull() )
    .select('accountId')
    .distinct()
    .withColumn('isBH', F.lit(1))
)

display_df(bh_df)

In [None]:
bh_output_df = (
    return_df
    .select('accountId', 'return')
    .dropna()
    .join(bh_df, on = 'accountId', how = 'left')
    .fillna(0, 'isBH')
)

display_df(bh_output_df)

In [None]:
(
    bh_output_df
    .groupBy('isBH')
    .agg(
        F.round(F.expr('percentile(return, array(0.5))')[0], 3).alias('medianTradeFrequency')
    )
    .show()
)

In [None]:
bh_output_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/bhOutput.parquet')

### number of stocks within initial portfolio

In [None]:
n_stocks_within_initial_portfolio_df = (
    portfolio_df
    .groupBy('accountId')
    .agg(
        F.count(F.lit(1)).alias('nStocksWithinInitialPortfolio')
    )
    .dropna()
)

display_df(n_stocks_within_initial_portfolio_df)

In [None]:
(
    n_stocks_within_initial_portfolio_df
    .agg(
        F.expr('percentile(nStocksWithinInitialPortfolio, array(0.25))')[0].alias('25%'),
        F.expr('percentile(nStocksWithinInitialPortfolio, array(0.50))')[0].alias('50%'),
        F.round(F.mean('nStocksWithinInitialPortfolio'), 4).alias('mean'),
        F.expr('percentile(nStocksWithinInitialPortfolio, array(0.75))')[0].alias('75%'),
        F.expr('percentile(nStocksWithinInitialPortfolio, array(0.9))')[0].alias('90%'),
        F.expr('percentile(nStocksWithinInitialPortfolio, array(0.99))')[0].alias('99%'),
        F.expr('percentile(nStocksWithinInitialPortfolio, array(0.999))')[0].alias('99.9%'),
    )
    .show()
)

In [None]:
n_stocks_within_initial_portfolio_output_df = (
    return_df
    .select('accountId', 'return')
    .dropna()
    .join(n_stocks_within_initial_portfolio_df, on = 'accountId', how = 'inner')
    .withColumn('nStocksWithinInitialPortfolioDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('nStocksWithinInitialPortfolio')))
)

display_df(n_stocks_within_initial_portfolio_output_df)

In [None]:
(
    n_stocks_within_initial_portfolio_output_df
    .groupBy('nStocksWithinInitialPortfolioDecile')
    .agg(
        F.round(F.expr('percentile(return, array(0.5))')[0], 3).alias('medianReturn')
    )
    .orderBy('nStocksWithinInitialPortfolioDecile')
    .show()
)

In [None]:
n_stocks_within_initial_portfolio_output_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/n_initial_portfolio.parquet')

### number of stocks within final portfolio

In [None]:
n_stocks_within_final_portfolio_df = (
    daily_portfolio_df
    .withColumn('rowNumber', F.row_number().over(Window.partitionBy('accountId', 'symbol').orderBy('date')))
    .withColumn('maxRowNumber', F.max('rowNumber').over(Window.partitionBy('accountId', 'symbol')))
    .filter(F.col('rowNumber') == F.col('maxRowNumber'))
    .filter(F.col('heldShares') > 0)
    .withColumn('date', F.lit(MAX_PRICE_DATE))
    .join(price_df.select('date', 'symbol', 'close_price'), on = ['date', 'symbol'], how = 'left')
    .dropna(subset = ['close_price'])
    .groupBy('accountId')
    .agg(
        F.countDistinct('symbol').alias('nStocksWithinFinalPortfolio')
    )   
)

display_df(n_stocks_within_final_portfolio_df)

In [None]:
n_stocks_within_final_portfolio_output_df = (
    return_df
    .select('accountId', 'return')
    .dropna()
    .join(n_stocks_within_final_portfolio_df, on = 'accountId', how = 'inner')
    .withColumn('nStocksWithinFinalPortfolioDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy('nStocksWithinFinalPortfolio')))
)

display_df(n_stocks_within_final_portfolio_output_df)

In [None]:
(
    n_stocks_within_final_portfolio_output_df
    .groupBy('nStocksWithinFinalPortfolioDecile')
    .agg(
        F.round(F.expr('percentile(return, array(0.5))')[0], 3).alias('medianReturn')
    )
    .orderBy('nStocksWithinFinalPortfolioDecile')
    .show()
)

In [None]:
n_stocks_within_final_portfolio_output_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/n_final_portfolio.parquet')

### turnover

In [None]:
turnover_df = (
    trade_kpi_df
    .join(final_portfolio_value_df, on =['accountId'], how = 'left')
    .withColumn('turnover', F.col('absSumTradeValue') / F.col('finalPortfolioValue'))
    .join(return_df.select('accountId', 'return'), on = 'accountId')
    .withColumn('turnoverDecile', F.ntile(N_QUANTILES).over(Window.partitionBy().orderBy(F.col('turnover'))))
    .select(
        'accountId',
        'turnover',
        'turnoverDecile',
        'return'
    )
)

display_df(turnover_df)

In [None]:
(
    turnover_df
    .groupBy('turnoverDecile')
    .agg(
        F.round(F.expr('percentile(return, array(0.5))')[0], 3).alias('medianReturn')
    )
    .orderBy('turnoverDecile')
    .show()
)

In [None]:
turnover_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/turnover.parquet')

### time series of the number of stocks within portfolio

In [None]:
dates_list = (
    price_df
    .select('date')
    .distinct()
    .orderBy('date')
    .rdd.flatMap(lambda x: x).collect()
)

print(dates_list)

In [None]:
nStocksWithinPortfolioOfAllInvestors = []
nInvestors = []

for date in dates_list:
    print(date)
    result = (
        daily_portfolio_df
        .filter(F.col('date') <= date)
        .withColumn('rowNumber', F.row_number().over(Window.partitionBy('accountId', 'symbol').orderBy('date')))
        .withColumn('maxRowNumber', F.max('rowNumber').over(Window.partitionBy('accountId', 'symbol')))
        .filter(F.col('rowNumber') == F.col('maxRowNumber'))
        .filter( (F.col('heldShares') > 0) & (F.col('heldShares').isNotNull()) )
        .groupBy('accountId')
        .agg(
            F.count(F.lit(1)).alias('nStocksWithinPortfolioOfAllInvestors'),
        )
        .agg(
            F.round(F.mean('nStocksWithinPortfolioOfAllInvestors'), 3).alias('nStocksWithinPortfolioOfAllInvestors'),
            F.count(F.lit(1)).alias('nInvestors')
        )
    )
    nStocksWithinPortfolioOfAllInvestors.append(result.collect()[0][0])
    nInvestors.append(result.collect()[0][1])

In [None]:
n_stocks_df = spark.createDataFrame(
    pd.DataFrame({
        'date' : dates_list,
        'nStocksWithinPortfolioOfAllInvestors' : nStocksWithinPortfolioOfAllInvestors,
        'nInvestors' : nInvestors
    })
)

display_df(n_stocks_df)

In [None]:
n_stocks_df.write.mode('overwrite').parquet('/home/user1/Data/Esmaeil/mean_number_of_stocks_within_portfolio.parquet')

In [None]:
initial_ids =  [row['accountId'] for row in portfolio_df.select('accountId').distinct().collect()]
initial_ids = set(initial_ids)

In [None]:
unique_id_trade = flat_trade_df.dropDuplicates(subset=['accountId','date'])
unique_id_trade.count()
result = {}
for date in dates_list[:3]:
    print(len(initial_ids))
    tempt = unique_id_trade.filter(F.col('date') == date).select('accountId').distinct().collect()
    teades_ids = set([row['accountId'] for row in tempt])
    result[date] = len(teades_ids -initial_ids )
    
    initial_ids =  set.union(initial_ids, teades_ids)