**We simulate *n* investement tries per ticker and evaluate the payment.**

Investement and payment dates are randomized to remove decision biases.

In [1]:
import findspark
findspark.init()

In [2]:
import random
import datetime
import itertools
import functools

import pandas as pd
import pyspark.sql.functions as F

from tqdm import tqdm_notebook as tqdm
from pyspark.sql import SparkSession, DataFrame

In [3]:
session = (SparkSession
           .builder
           .master('local[*]')
           .appName('simulations')
           .getOrCreate())

# Datasets

In [4]:
extras = session.read.csv('../datasets/extras_eur.csv.gz', header=True, inferSchema=True)

extras.show()

+-------+--------------------+--------------+--------+
| ticker|                name|       country|category|
+-------+--------------------+--------------+--------+
| 0BN.BE|HABIT RESTAURANT....|       Germany|    null|
| 0O2W.L| GFT Technologies SE|United Kingdom|    null|
|  13J.F|Jinmao (China) Ho...|        France|    null|
| 1TT.BE|   FACTOR THERAPEUT.|       Germany|    null|
|  233.F|Springland Intern...|        France|    null|
|2CDA.BE|CONTR.VUEL.CO. AD...|       Germany|    null|
| 3NEN.F|Brisio Innovation...|        France|    null|
| 44C.SG|Civeo Corp (Canad...|       Germany|    null|
|48CA.DU|   CAIXABANK S.A. EO|       Germany|    null|
| 4FO.MU|INDS PENOLES S.A....|       Germany|    null|
| 4HP.DU|ISETAN MITSUKOSHI...|       Germany|    null|
|  51S.F|Cynata Therapeuti...|        France|    null|
| 5CF.SG|Calfrac Well Serv...|       Germany|    null|
| 5IX.BE|INTERXION HLDG NV...|       Germany|    null|
|5N91.BE|AGUIA RES LTD AD-,20|       Germany|    null|
|  6GX.F|N

In [5]:
quotes = session.read.csv('../datasets/quotes_eur.csv.gz', header=True, inferSchema=True)

quotes.show()

+-------+----------+------------------+
| ticker|  unixtime|             close|
+-------+----------+------------------+
|  MT.AS|1409176800|22.260799407958984|
|BAYA.DE|1409176800|25.512500762939453|
| AZM.MI|1409176800|18.391599655151367|
| AGN.AS|1409176800|  5.96999979019165|
|BKIA.MC|1409176800| 5.863999843597412|
| SAB.MC|1409176800| 1.938789963722229|
|  PUM.F|1409176800|18.737199783325195|
| NEM.DE|1409176800| 6.581669807434082|
| MDF.MC|1409176800|25.820199966430664|
|  EZQ.F|1409176800|1.6920000314712524|
| DIC.DE|1409176800| 6.800000190734863|
| CPR.MI|1409176800|2.9049999713897705|
| COK.DE|1409176800| 16.63249969482422|
|BIO3.DE|1409176800|28.450000762939453|
| V3S.DE|1409176800|  2.34975004196167|
| UBK.HM|1409176800| 9.858050346374512|
|SRT3.DU|1409176800|22.165000915527344|
| NEM.DU|1409176800| 6.516670227050781|
|  MUM.F|1409176800| 5.814620018005371|
| MUM.DE|1409176800| 5.716060161590576|
+-------+----------+------------------+
only showing top 20 rows



In [6]:
splits = session.read.csv('../datasets/splits_eur.csv.gz', header=True, inferSchema=True)

splits.show()

+-------+----------+------------------+
| ticker|  unixtime|             ratio|
+-------+----------+------------------+
|  1WR.F|1409176800|               2.0|
| S7F1.F|1409608800|               5.0|
|UCA1.MU|1409781600|               0.1|
|UCA1.DU|1409781600|               0.1|
|UCA1.BE|1409781600|               0.1|
| UCA1.F|1409781600|               0.1|
|UCA1.HM|1409781600|               0.1|
| 3U6.BE|1410127200|               1.5|
|  3U6.F|1410127200|               1.5|
|  CH5.F|1410213600|               1.5|
|KE0A.BE|1410386400|               4.0|
|  VGR.F|1410386400|               1.0|
| FII.PA|1410472800|               5.0|
|KE0A.BE|1410472800|               4.0|
| HIN.BE|1410472800|             0.125|
|  LI3.F|1410472800|               5.0|
| LI3.SG|1410472800|               5.0|
| HIN.MU|1410472800|             0.125|
|  HIN.F|1410472800|             0.125|
|L1OA.DE|1411336800|0.3333333333333333|
+-------+----------+------------------+
only showing top 20 rows



In [7]:
dividends = session.read.csv('../datasets/dividends_eur.csv.gz', header=True, inferSchema=True)

dividends.show()

+-------+----------+------+
| ticker|  unixtime|amount|
+-------+----------+------+
|  ASF.F|1409176800|  0.19|
| ASF.BE|1409176800|  0.19|
| BL8.SG|1409176800|  0.13|
| BL8.DU|1409176800|  0.13|
| BL8.BE|1409176800|  0.13|
|  BL8.F|1409176800|  0.13|
| 62M.MU|1409176800|   0.3|
|CCDG.MU|1409176800|  0.08|
|CCDG.BE|1409176800|  0.08|
| HNC.MU|1409176800| 0.032|
| 62M.BE|1409176800|   0.3|
|  62M.F|1409176800|   0.3|
|  NKE.F|1409176800|  0.24|
| NKE.SG|1409176800|  0.24|
| NKE.DU|1409176800|  0.24|
| NKE.HA|1409176800|  0.24|
| NKE.MU|1409176800|  0.24|
| NKE.BE|1409176800|  0.24|
|GSC1.DE|1409263200|   2.2|
| WF3.SG|1409263200|   1.5|
+-------+----------+------+
only showing top 20 rows



In [8]:
events = (dividends
          .join(splits, ['ticker', 'unixtime'], 'outer')
          .sort('unixtime')
          .cache())

events.show()

+-------+----------+------+-----+
| ticker|  unixtime|amount|ratio|
+-------+----------+------+-----+
| BL8.DU|1409176800|  0.13| null|
| NKE.HA|1409176800|  0.24| null|
|CCDG.MU|1409176800|  0.08| null|
| HNC.MU|1409176800| 0.032| null|
| 62M.BE|1409176800|   0.3| null|
|  NKE.F|1409176800|  0.24| null|
| NKE.SG|1409176800|  0.24| null|
| BL8.SG|1409176800|  0.13| null|
| 62M.MU|1409176800|   0.3| null|
|  ASF.F|1409176800|  0.19| null|
| BL8.BE|1409176800|  0.13| null|
|  1WR.F|1409176800|  null|  2.0|
|CCDG.BE|1409176800|  0.08| null|
| NKE.BE|1409176800|  0.24| null|
| NKE.DU|1409176800|  0.24| null|
| ASF.BE|1409176800|  0.19| null|
|  BL8.F|1409176800|  0.13| null|
|  62M.F|1409176800|   0.3| null|
| NKE.MU|1409176800|  0.24| null|
| SL3.SG|1409263200|  0.15| null|
+-------+----------+------+-----+
only showing top 20 rows



# Dates

In [9]:
dates = [row['unixtime'] for row in
         (quotes
          .select('unixtime').distinct()
          .sort('unixtime')
          .collect())]

dates[:5]

[1409176800, 1409263200, 1409522400, 1409608800, 1409695200]

In [10]:
start, stop = dates[0], dates[-1]

datetime.date.fromtimestamp(start), datetime.date.fromtimestamp(stop)

(datetime.date(2014, 8, 28), datetime.date(2019, 8, 28))

In [11]:
def periods(dates, mindelta=365, maxdelta=float('inf'), seed=0):
    """Generate random periods from the given list of dates."""
    rand = random.Random(seed)
    
    while True:
        start_time, end_time = rand.choices(dates, k=2)
        
        if start_time >= end_time:
            continue
            
        start_date = datetime.date.fromtimestamp(start_time)
        end_date = datetime.date.fromtimestamp(end_time)
        period = (end_date - start_date).days
        
        if mindelta <= period <= maxdelta:
            yield start_time, end_time

In [12]:
list(itertools.islice(periods(dates), 5))

[(1453762800, 1528840800),
 (1458169200, 1524693600),
 (1484607600, 1545865200),
 (1450306800, 1536530400),
 (1409263200, 1487113200)]

# Results

In [13]:
def replay_stock_market_events(events):
    """Replay split and dividend events."""
    iter_events = iter(sorted(events, key=lambda e: e['unixtime']))
    
    buy_event = next(iter_events)
    shares = buy_event.get('inshares', 0)
    dividends = buy_event.get('dividends', 0)
    
    if not shares:  # it should have shares
        return shares, dividends
    
    for event in iter_events:
        ratio = event.get('ratio') or 1
        amount = event.get('amount') or 0
        
        shares = int(shares * ratio)
        dividends += shares * amount
    
    return shares, dividends


def results(dividends, quotes, periods, budget=10_000):
    """Simulate results for the given period and budget."""
    for intime, outtime in periods:
        allevents = events.filter(f'unixtime >= {intime} AND '
                                  f'unixtime <= {outtime}').cache()
        allquotes = quotes.filter(f'unixtime == {intime} OR '
                                  f'unixtime == {outtime}').cache()
        outquotes = (allquotes
                     .filter(f'unixtime == {outtime}')
                     .withColumn('outclose', F.round(F.col('close'), 2))
                     .withColumn('outdate', F.from_unixtime('unixtime', 'yyyy-MM-dd')))
        inquotes = (allquotes
                    .filter(f'unixtime == {intime}')
                    .withColumnRenamed('shares', 'inshares')
                    .withColumn('inclose', F.round(F.col('close'), 2))
                    .withColumn('inshares', F.floor(10_000 / F.col('inclose')))
                    .withColumn('indate', F.from_unixtime('unixtime', 'yyyy-MM-dd'))
                    .withColumn('inprice', F.round(F.col('inshares') * F.col('inclose'), 2))
                   )
        revents = (allevents
                   .join(inquotes, ['ticker', 'unixtime'], 'outer')
                   .rdd.groupBy(lambda row: row['ticker'])
                   .mapValues(lambda it: (row.asDict() for row in it))
                   .mapValues(lambda it: replay_stock_market_events(it))
                   .map(lambda tickervals: (tickervals[0],) + tickervals[1]))
        devents = (session
                   .createDataFrame(revents, ['ticker', 'outshares', 'divs'])
                   .withColumn('dividends', F.round(F.col('divs'), 2))
                   .drop('divs'))
        result = (inquotes
                  .join(devents, 'ticker', 'inner')
                  .join(outquotes, 'ticker', 'inner')
                  .withColumn('outprice', F.round(F.col('outshares') * F.col('outclose'), 2))
                  .withColumn('total', F.round(F.col('outprice') + F.col('dividends') - F.col('inprice'), 2))
                  .drop('unixtime', 'close')
                  .sort('total', ascending=False))
        
        yield result

In [14]:
list(itertools.islice(results(dividends, quotes, periods(dates)), 1))[0].show()

+-------+-------+--------+----------+-------+---------+--------------+--------+----------+--------+--------------+
| ticker|inclose|inshares|    indate|inprice|outshares|     dividends|outclose|   outdate|outprice|         total|
+-------+-------+--------+----------+-------+---------+--------------+--------+----------+--------+--------------+
| TBA.SG|   0.05|  200000|2016-01-26|10000.0|  1000000|     4.33567E8|    0.23|2018-06-13|230000.0|     4.33787E8|
|  TBA.F|   0.05|  200000|2016-01-26|10000.0|  1000000|     4.33567E8|    0.23|2018-06-13|230000.0|     4.33787E8|
| TBA.MU|   0.06|  166666|2016-01-26|9999.96|   833330|3.6130438811E8|    0.23|2018-06-13|191665.9|3.6148605405E8|
|  HSZ.F|   0.25|   40000|2016-01-26|10000.0|  1000000|        3.04E8|     0.2|2018-06-13|200000.0|      3.0419E8|
| HSZ.MU|   0.25|   40000|2016-01-26|10000.0|  1000000|        3.04E8|     0.2|2018-06-13|200000.0|      3.0419E8|
|  D7V.F|   0.06|  166666|2016-01-26|9999.96|  4166650|     2.49999E8|    0.18|2

# Statistics

In [15]:
def statistics(results, row, col, n):
    operators = [F.mean, F.stddev, F.min, F.max, F.sum]
    operations = [op(F.col(col)) for op in operators]
    resultslice = itertools.islice(results, n)
    
    simulations = (functools
                   .reduce(DataFrame.union, resultslice)
                   .groupBy(row).agg(*operations))
    
    return simulations

In [16]:
N = 25

nperiods = periods(dates)
nresults = tqdm(results(dividends, quotes, nperiods), total=N)
simulats = statistics(nresults, 'ticker', 'total', N).join(extras, 'ticker', 'left')

simulats.show()

HBox(children=(IntProgress(value=0, max=25), HTML(value='')))

+-------+------------------+-------------------+----------+----------+------------------+--------------------+-------+--------------------+
| ticker|        avg(total)| stddev_samp(total)|min(total)|max(total)|        sum(total)|                name|country|            category|
+-------+------------------+-------------------+----------+----------+------------------+--------------------+-------+--------------------+
|  SE7.F|112753.53279999999|  62275.35537180181|  27307.28| 325571.84|        2818838.32|Seiko Epson Corpo...| France|                null|
|JUN3.MU|        26492.6212|  20606.23718657863|  -3089.76|  62637.52|         662315.53|JUNGHEINRICH AG O...|Germany|Diversified Machi...|
| RMC.BE|17301.448400000005| 11315.651084786534|    -241.8|  30998.76| 432536.2100000001|REMY COINTREAU EO...|Germany|                null|
|48CA.DU| 7068.737999999999|  6653.465496662873|  -3575.04|   21964.6|176718.44999999998|   CAIXABANK S.A. EO|Germany|                null|
| ALG.BE|20423.28739

In [17]:
simulats.toPandas().to_csv('../datasets/simulations.csv.gz', index=False)

In [18]:
pd.options.display.max_rows = 999

In [19]:
(simulats
 .filter('country == "France"')
 .sort('av package — PySpark master documentation package — PySpark master documentationg(total)', ascending=False)
 .limit(250)
 .toPandas())

Unnamed: 0,ticker,avg(total),stddev_samp(total),min(total),max(total),sum(total),name,country,category
0,HSZ.F,248677800.0,183317100.0,4308400.0,737748100.0,6216944000.0,PT Hanjaya Mandala Sampoerna Tbk,France,
1,TBA.F,155607700.0,137744200.0,17874375.0,433787000.0,3890193000.0,PT Bukit Asam (Persero) Tbk,France,
2,D7V.F,126267600.0,105449800.0,2076921.0,398071700.0,3156691000.0,PT Mayora Indah Tbk,France,
3,QF8.F,85896050.0,60678510.0,7663325.67,209550000.0,2147401000.0,Pt Mandala Multifinance Tbk,France,
4,BYRA.F,68018450.0,35324440.0,9956549.03,138290400.0,1700461000.0,PT Bank Rakyat Indonesia (Persero) Tbk,France,
5,OB8.F,36818750.0,29120430.0,12215000.0,77170000.0,147275000.0,PT Barito Pacific Tbk,France,
6,PQ9.F,27715570.0,16975830.0,6451296.58,62307040.0,692889200.0,PT Bank Mandiri (Persero) Tbk,France,
7,P5TA.F,27280870.0,12846400.0,5933327.4,59520000.0,627459900.0,PT Pelayaran Tempuran Emas Tbk,France,
8,QGI.F,23966740.0,23492750.0,6250000.0,82690000.0,503301500.0,PT. Mitra Adiperkasa Tbk,France,
9,FBLM.F,5174165.0,4764261.0,-98.52,12696650.0,98309130.0,"FIH GROUP PLC LS -,10",France,
