# Đánh giá rủi ro tài chính thông qua phép mô phỏng Monte Carlo
### Mục tiêu :
<ol>
  <li>Nắm vững kiến trức cơ bản về phân tích rủi ro</li>
  <li>Hiểu phép mô phỏng Monte Carlo</li>
  <li>Áp dụng phép mô phỏng Monte Carlo cho việc dự đoán rủi ro thông qua việc tính toán chỉ số rủi ro <mark>VaR</mark></li>
</ol> 

- INPUT: giá trị các cổ phiếu, giá dầu thô, giá trái phiếu chính phủ Hoa Kỳ
- OUTPUT: giá trị rủi ro của một khoản đầu tư 

### Thuật ngữ :
__Instrument__ : Tài sản có thể trao đổi được như trái phiếu, vốn đầu tư. Trong một thời điểm nhất định, nó được coi như là có <b>giá trị</b>. Đó là <b>giá trị</b> mà nó có thể được bán.<br>
__Market Factor__ : Giá trị cung cấp cho ta một cái nhìn vĩ mô về thị trường tài chính tại một thời điểm cụ thể( ví dụ như tỉ giá hối đoái giữa giá USD và euro)
__Return__ : Giá trị thay đổi của giá trị của một _Instrument_ trong một khoảng thời gian<br>
__Loss__ : Giá trị _Return_ âm <br>
__VaR__ : _Value at Risk_ - Chỉ số đo lường rủi ro đầu tư khi muốn đánh giá khả năng thiệt hại lớn nhất khi đầu tư trong một khoảng thời gian nhất định. Ví dụ : <br>
Với giá trị VaR 5% của 1 triệu USD, trong khoảng thời gian 2 tuần : ta có 5% nguy cơ bị mất nhiều hơn 1 triệu USD<br>

__CVaR__ : _Conditional Value at Risk_ <br>
Ví dụ : <br>
Với giá trị CVaR 5% của 5 triệu USD, trong khoảng thời gian 2 tuần : tổn thất trung bình trong 5% kết quả tồi tệ nhất là 5 triệu USD.<br>


### Phép mô phỏng Monte Carlo
  Monte Carlo là một kỹ thuật chuyển đổi các biến đầu vào của mô hình thành phân phối xác suất. Bằng cách chọn ngẫu nhiên các giá trị từ phân phối xác suất đó, nó sẽ tính toán lại mô hình mô phỏng nhiều lần, để xác định phân phối đầu ra. 
<ol>
    <li> Định nghĩa mỗi quan hệ giữa điều kiện thị trường ( Factor ) và giá cổ phiếu thay đổi ( Stock's return ) : Tuyến tính</li>
    <li>Định nghĩa phân phối cho điều kiện thị trường từ các giá trị đã có</li>
    <li>Đặt ra các phép thử đối với điều kiện thị trường được lấy ngẫu nhiên từ phân phối đã định nghĩa </li>
    <li>Tính toán loss cho mỗi phép thử, sử dụng những giá trị đó để định nghĩa một phân phối thực nghiệm trên toàn losses</li>
</ol>

### Dữ liệu
Dữ liệu được thu thấp theo ngày, trừ những ngày nghỉ sẽ không có.<br>
__Instrument__ : Stock AAME ( Atlantic American Corporation), dữ liệu có từ 3/1/2000 đến 31/12/2013 <br>
__Factor__ : 
<ul>
    <li>GSPC</li>
- Đại diện cho chỉ số index S&P 500:  là một chỉ số cổ phiếu dựa trên cổ phiếu phổ thông của 500 công ty có vốn hóa thị trường lớn nhất niêm yết trên NYSE hoặc NASDAQ, Mỹ, nhằm cung cấp cho các nhà đầu tư thông tin về sự chuyển động tổng thể của thị trường.<br/>
- Tỉ lệ và loại cổ phiếu sử dụng để tính toán chỉ số S&P 500 được quyết định bởi hãng S&P Dow Jones Indices. Điều này khiến chỉ số S&P 500 khác với các chỉ số thị trường chứng khoán khác của Mĩ như chỉ số công nghiệp Dow Jones hay chỉ số Nasdaq Composite.<br>
- Chỉ số S&P 500 là một trong những chỉ số khách quan và được quan tâm nhất, rất nhiều nhà đầu tư coi đây là thước đo tốt nhất của thị trường chứng khoán Mĩ cũng như là một chỉ số chủ đạo của nền kinh tế
    <li>IXIC</li> 
- Ký hiệu cho chỉ số Nasdaq Composite: được xây dựng trên giá cổ phiếu của toàn bộ các công ty niêm yết trên sàn giao dịch chứng khoán Nasdaq<br>
    <li>crudeoil</li> 
- Giá dầu thô.
    <li>us30yeartreasurybonds</li>
- Giá trái phiếu chính phủ do Kho bạc Hoa Kỳ phát hành, đáo hạn trong vòng 30 năm.
</ul>

In [1]:
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc=SparkContext()
spark = SparkSession(sparkContext=sc)

In [2]:
data_dir = './data/'

# 1. Xử lý dữ liệu

## Chuẩn bị dữ liệu cho Stock

### Đọc dữ liệu

In [3]:
from pyspark.sql.functions import udf
import datetime

def readData(file) :
    stock = spark.read.csv(path=file,
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)
    return stock

In [4]:
stock = readData(data_dir+'/stocks/AAME.csv')
stock.show(5)

+---------+----+----+----+-----+------+
|     Date|Open|High| Low|Close|Volume|
+---------+----+----+----+-----+------+
|31-Dec-13|4.04|4.13|3.96| 4.09| 30735|
|30-Dec-13|4.05|4.05|3.84|  3.9| 14646|
|27-Dec-13|4.02|4.05|3.99| 4.05|  5047|
|26-Dec-13|3.99|4.04|3.70| 4.01|  6309|
|24-Dec-13|3.90|3.97|3.84| 3.95| 13592|
+---------+----+----+----+-----+------+
only showing top 5 rows



In [5]:
stock.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



### Chuẩn hóa dữ liệu

#### Dữ liệu sẽ được cắt bớt chỉ sử dụng cột Date và Open( giá mở cửa)

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.functions import unix_timestamp, to_timestamp

def convertTimeStock(stock) :
    # Chỉ lấy dữ liệu Date và Open
    stock = stock['Date','Open']
    # Chuyển Open thành kiểu float
    stock = stock.withColumn("Open",stock["Open"].cast('float'))
    # Chuyển Date thành kiểu timestamp
    def toDate(cols) :
        date = unix_timestamp(cols, 'dd-MMM-yy')                
        return date
    
    convert_time= stock.withColumn("Date",to_timestamp(toDate(stock.Date)))
    
    return convert_time

In [7]:
convert_time_stock = convertTimeStock(stock)
convert_time_stock.show(5)
convert_time_stock.printSchema()

+-------------------+----+
|               Date|Open|
+-------------------+----+
|2013-12-31 00:00:00|4.04|
|2013-12-30 00:00:00|4.05|
|2013-12-27 00:00:00|4.02|
|2013-12-26 00:00:00|3.99|
|2013-12-24 00:00:00| 3.9|
+-------------------+----+
only showing top 5 rows

root
 |-- Date: timestamp (nullable = true)
 |-- Open: float (nullable = true)



### Điền dữ liệu bị thiếu

In [8]:
convert_time_stock.rdd.collect()[879:884]

[Row(Date=datetime.datetime(2010, 6, 15, 0, 0), Open=1.399999976158142),
 Row(Date=datetime.datetime(2010, 6, 14, 0, 0), Open=None),
 Row(Date=datetime.datetime(2010, 6, 11, 0, 0), Open=None),
 Row(Date=datetime.datetime(2010, 6, 10, 0, 0), Open=1.4500000476837158),
 Row(Date=datetime.datetime(2010, 6, 9, 0, 0), Open=None)]

In [9]:
print('Số dữ liệu thiếu : ' + repr(convert_time_stock.rdd.filter(lambda x : x['Open'] == None).count()))

Số dữ liệu thiếu : 358


In [10]:
from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

def fillData(data) :
    window = Window.rowsBetween(-sys.maxsize, 0)

    filled_column = last(data['Open'], ignorenulls=True).over(window)

    data_filled = data.withColumn('Open', filled_column)
    
    return data_filled

In [11]:
filledStock = fillData(convert_time_stock)
filledStock.rdd.collect()[879:884]

[Row(Date=datetime.datetime(2010, 6, 15, 0, 0), Open=1.399999976158142),
 Row(Date=datetime.datetime(2010, 6, 14, 0, 0), Open=1.399999976158142),
 Row(Date=datetime.datetime(2010, 6, 11, 0, 0), Open=1.399999976158142),
 Row(Date=datetime.datetime(2010, 6, 10, 0, 0), Open=1.4500000476837158),
 Row(Date=datetime.datetime(2010, 6, 9, 0, 0), Open=1.4500000476837158)]

In [12]:
print ' Số dữ liệu thiếu : ',filledStock.rdd.filter(lambda x : x['Open'] == None).count()

 Số dữ liệu thiếu :  0


### Sắp xếp dữ liệu theo Date

In [13]:
sortedStock = filledStock.sort('Date', acsending = True)

In [14]:
sortedStock.show(5)

+-------------------+----+
|               Date|Open|
+-------------------+----+
|2000-01-03 00:00:00|2.38|
|2000-01-04 00:00:00|2.38|
|2000-01-05 00:00:00|2.31|
|2000-01-06 00:00:00|2.31|
|2000-01-07 00:00:00|2.31|
+-------------------+----+
only showing top 5 rows



### Đánh chỉ số cho dữ liệu

In [15]:
header = ["index"] + convert_time_stock.columns

def zipWithIndex(convert_time_data) :
    dataWithIndex = convert_time_data.rdd.zipWithIndex().map(lambda x: [x[1]] + list(x[0])).toDF(header)
    return dataWithIndex

In [16]:
stocksWithIndex = zipWithIndex(sortedStock)

In [17]:
stocksWithIndex.show(5)

+-----+-------------------+-----------------+
|index|               Date|             Open|
+-----+-------------------+-----------------+
|    0|2000-01-03 00:00:00|2.380000114440918|
|    1|2000-01-04 00:00:00|2.380000114440918|
|    2|2000-01-05 00:00:00|2.309999942779541|
|    3|2000-01-06 00:00:00|2.309999942779541|
|    4|2000-01-07 00:00:00|2.309999942779541|
+-----+-------------------+-----------------+
only showing top 5 rows



### Tính toán chêch lệch giá trong vòng 2 tuần
![title](data/image/r.png)

In [18]:
def computeReturns(data) :
    baseRDD = data.rdd.map(lambda x : ( x['index'], x ))
    slideRDD = data.rdd.map(lambda x : ( x['index'] - 10, x))
    unionRDD = baseRDD.union(slideRDD)
    reduceRDD = unionRDD.map(lambda x: (x[0], (x[1]['Date'],x[1]['Open']))).groupByKey().mapValues(list)
    filterReduceRDD = reduceRDD.filter(lambda x: len(x[1])==2)
    dataReturn = filterReduceRDD.map(lambda x : (x[1][0][0], x[1][1][1] - x[1][0][1]))
    return dataReturn
# baseRDD = stocksWithIndex.rdd.map(lambda x : ( x['index'], x ))
# baseRDD.collect()

#### Gán key

In [19]:
baseRDD = stocksWithIndex.rdd.map(lambda x : ( x['index'], x ))
baseRDD.take(5)

[(0,
  Row(index=0, Date=datetime.datetime(2000, 1, 3, 0, 0), Open=2.380000114440918)),
 (1,
  Row(index=1, Date=datetime.datetime(2000, 1, 4, 0, 0), Open=2.380000114440918)),
 (2,
  Row(index=2, Date=datetime.datetime(2000, 1, 5, 0, 0), Open=2.309999942779541)),
 (3,
  Row(index=3, Date=datetime.datetime(2000, 1, 6, 0, 0), Open=2.309999942779541)),
 (4,
  Row(index=4, Date=datetime.datetime(2000, 1, 7, 0, 0), Open=2.309999942779541))]

#### Tạo một RDD trung gian với chỉ số key lệch 10 ( 2 tuần làm việc )

In [20]:
slideRDD = stocksWithIndex.rdd.map(lambda x : ( x['index'] - 10, x))
slideRDD.take(5)

[(-10,
  Row(index=0, Date=datetime.datetime(2000, 1, 3, 0, 0), Open=2.380000114440918)),
 (-9,
  Row(index=1, Date=datetime.datetime(2000, 1, 4, 0, 0), Open=2.380000114440918)),
 (-8,
  Row(index=2, Date=datetime.datetime(2000, 1, 5, 0, 0), Open=2.309999942779541)),
 (-7,
  Row(index=3, Date=datetime.datetime(2000, 1, 6, 0, 0), Open=2.309999942779541)),
 (-6,
  Row(index=4, Date=datetime.datetime(2000, 1, 7, 0, 0), Open=2.309999942779541))]

#### Hợp hai RDD
Hai RDD chứa dữ liệu của bảng gốc và bảng trung gian được gộp lại để tiến hành ghép theo key(ngày hiện tại-ngày của 2 tuần sau)

In [21]:
unionRDD = baseRDD.union(slideRDD)
unionRDD.count()

7012

#### Nhóm các phần tử theo key

In [22]:
reduceRDD = unionRDD.map(lambda x: (x[0], (x[1]['Date'],x[1]['Open']))).groupByKey().mapValues(list)
reduceRDD.take(5)

[(0,
  [(datetime.datetime(2000, 1, 3, 0, 0), 2.380000114440918),
   (datetime.datetime(2000, 1, 18, 0, 0), 2.440000057220459)]),
 (2,
  [(datetime.datetime(2000, 1, 5, 0, 0), 2.309999942779541),
   (datetime.datetime(2000, 1, 20, 0, 0), 2.5)]),
 (4,
  [(datetime.datetime(2000, 1, 7, 0, 0), 2.309999942779541),
   (datetime.datetime(2000, 1, 24, 0, 0), 2.380000114440918)]),
 (6,
  [(datetime.datetime(2000, 1, 11, 0, 0), 2.309999942779541),
   (datetime.datetime(2000, 1, 26, 0, 0), 2.809999942779541)]),
 (8,
  [(datetime.datetime(2000, 1, 13, 0, 0), 2.440000057220459),
   (datetime.datetime(2000, 1, 28, 0, 0), 2.559999942779541)])]

In [23]:
reduceRDD.count()

3516

#### Loại 10 phần tử đầu của RDD gốc và 10 phần tử cuối của RDD trung gian

In [24]:
filterReduceRDD = reduceRDD.filter(lambda x: len(x[1])==2)
filterReduceRDD.take(5)

[(0,
  [(datetime.datetime(2000, 1, 3, 0, 0), 2.380000114440918),
   (datetime.datetime(2000, 1, 18, 0, 0), 2.440000057220459)]),
 (2,
  [(datetime.datetime(2000, 1, 5, 0, 0), 2.309999942779541),
   (datetime.datetime(2000, 1, 20, 0, 0), 2.5)]),
 (4,
  [(datetime.datetime(2000, 1, 7, 0, 0), 2.309999942779541),
   (datetime.datetime(2000, 1, 24, 0, 0), 2.380000114440918)]),
 (6,
  [(datetime.datetime(2000, 1, 11, 0, 0), 2.309999942779541),
   (datetime.datetime(2000, 1, 26, 0, 0), 2.809999942779541)]),
 (8,
  [(datetime.datetime(2000, 1, 13, 0, 0), 2.440000057220459),
   (datetime.datetime(2000, 1, 28, 0, 0), 2.559999942779541)])]

In [25]:
filterReduceRDD.count()

3496

#### Tính return

In [26]:
stocksReturn = filterReduceRDD.map(lambda x : (x[1][0][0], x[1][1][1] - x[1][0][1]))
stocksReturn.take(5)

[(datetime.datetime(2000, 1, 3, 0, 0), 0.059999942779541016),
 (datetime.datetime(2000, 1, 5, 0, 0), 0.19000005722045898),
 (datetime.datetime(2000, 1, 7, 0, 0), 0.07000017166137695),
 (datetime.datetime(2000, 1, 11, 0, 0), 0.5),
 (datetime.datetime(2000, 1, 13, 0, 0), 0.11999988555908203)]

#### Chuyển thành dataframe

In [27]:
df_stocksReturn = stocksReturn.toDF(['Date','Open']).sort('Date')

In [28]:
df_stocksReturn.show(5)

+-------------------+--------------------+
|               Date|                Open|
+-------------------+--------------------+
|2000-01-03 00:00:00|0.059999942779541016|
|2000-01-04 00:00:00| 0.06999993324279785|
|2000-01-05 00:00:00| 0.19000005722045898|
|2000-01-06 00:00:00| 0.13000011444091797|
|2000-01-07 00:00:00| 0.07000017166137695|
+-------------------+--------------------+
only showing top 5 rows



In [29]:
sortedStock.show(5) # giá trị gốc

+-------------------+----+
|               Date|Open|
+-------------------+----+
|2000-01-03 00:00:00|2.38|
|2000-01-04 00:00:00|2.38|
|2000-01-05 00:00:00|2.31|
|2000-01-06 00:00:00|2.31|
|2000-01-07 00:00:00|2.31|
+-------------------+----+
only showing top 5 rows



<mark>Note:</mark>RDD được cache vào bộ nhớ để sử dụng nhiều lần sau này, giảm thời gian xử lý.

In [30]:
df_stocksReturn.cache()

DataFrame[Date: timestamp, Open: double]

## Chuẩn bị dữ liệu cho Factors

### Đọc dữ liệu 

In [31]:
factor_names = ['IXIC.csv', 'GSPC.csv', 'crudeoil.csv','us30yeartreasurybonds.csv']

factors = [readData(data_dir+'factors/' + factor_name) for factor_name in factor_names]

In [32]:
factors[0].printSchema()
factors[0].show(5)

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)

+-------------------+-----------+-----------+-----------+-----------+-----------+----------+
|               Date|       Open|       High|        Low|      Close|  Adj Close|    Volume|
+-------------------+-----------+-----------+-----------+-----------+-----------+----------+
|2000-01-03 00:00:00|4186.189941|4192.189941|3989.709961|4131.149902|4131.149902|1510070000|
|2000-01-04 00:00:00|     4020.0|    4073.25| 3898.22998|3901.689941|3901.689941|1511840000|
|2000-01-05 00:00:00|3854.350098|3924.209961|3734.870117|3877.540039|3877.540039|1735670000|
|2000-01-06 00:00:00|3834.439941| 3868.76001|3715.620117|3727.129883|3727.129883|1598320000|
|2000-01-07 00:00:00|3711.090088|3882.669922|3711.090088|3882.620117|3882.620117|16

In [33]:
print 'Số điểm dữ liệu: ' + repr(factors[0].count())

Số điểm dữ liệu: 3520


In [34]:
factors[1].printSchema()
factors[1].show(5)

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)

+-------------------+-----------+-----------+-----------+-----------+-----------+----------+
|               Date|       Open|       High|        Low|      Close|  Adj Close|    Volume|
+-------------------+-----------+-----------+-----------+-----------+-----------+----------+
|2000-01-03 00:00:00|    1469.25|     1478.0|1438.359985|1455.219971|1455.219971| 931800000|
|2000-01-04 00:00:00|1455.219971|1455.219971|1397.430054|1399.420044|1399.420044|1009000000|
|2000-01-05 00:00:00|1399.420044| 1413.27002|1377.680054|1402.109985|1402.109985|1085500000|
|2000-01-06 00:00:00|1402.109985|1411.900024|1392.099976|1403.449951|1403.449951|1092300000|
|2000-01-07 00:00:00|1403.449951|1441.469971| 1400.72998|1441.469971|1441.469971|12

In [35]:
print 'Số điểm dữ liệu: ', factors[1].count()

Số điểm dữ liệu:  3520


In [36]:
factors[2].printSchema()
factors[2].show(5)

root
 |-- Date: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Vol.: string (nullable = true)
 |-- Change %: string (nullable = true)

+-----------+------+------+------+-----+-------+--------+
|       Date| Price|  Open|  High|  Low|   Vol.|Change %|
+-----------+------+------+------+-----+-------+--------+
|Dec 31-2013| 98.42| 99.25| 99.39|98.15|113.91K|  -0.88%|
|Dec 30-2013| 99.29|100.15|100.42|99.13|118.36K|  -1.03%|
|Dec 27-2013|100.32| 99.63|100.75|99.37|127.20K|   0.77%|
|Dec 26-2013| 99.55|  99.2|  99.7|99.05| 65.67K|   0.33%|
|Dec 24-2013| 99.22| 98.62|  99.3|98.53| 51.88K|   0.31%|
+-----------+------+------+------+-----+-------+--------+
only showing top 5 rows



In [37]:
print 'Số điểm dữ liệu: ', factors[2].count()

Số điểm dữ liệu:  3514


In [38]:
factors[3].printSchema()
factors[3].show(5)

root
 |-- Date: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Change %: string (nullable = true)

+-----------+-----+-----+-----+-----+--------+
|       Date|Price| Open| High|  Low|Change %|
+-----------+-----+-----+-----+-----+--------+
|Dec 31-2013|3.964|3.908|3.974|3.898|   1.56%|
|Dec 30-2013|3.903|4.001|4.001|3.895|  -2.52%|
|Dec 29-2013|4.004| 3.94|4.004| 3.94|   1.62%|
|Dec 27-2013| 3.94|3.925| 3.95|3.908|   0.36%|
|Dec 26-2013|3.926|3.912|3.931| 3.89|   0.40%|
+-----------+-----+-----+-----+-----+--------+
only showing top 5 rows



In [39]:
print 'Số điểm dữ liệu: ', factors[3].count()

Số điểm dữ liệu:  3628


### Chuyển hóa dữ liệu

In [40]:
def convertTimeFactor(factor, isConvert) :
    factor = factor['Date','Open']
    factor = factor.withColumn("Open",factor["Open"].cast('float'))
    def toDate(cols) :
        date = unix_timestamp(cols, 'MMM dd-yyyy')                  
        return date
    
    if isConvert :
        convert_time= factor.withColumn("Date",to_timestamp(toDate(factor.Date)))
        return convert_time
    else :
        return factor

In [41]:
convert_time_factor = convertTimeFactor(factors[2], True)
convert_time_factor.show(5)
convert_time_factor.printSchema()

+-------------------+------+
|               Date|  Open|
+-------------------+------+
|2013-12-31 00:00:00| 99.25|
|2013-12-30 00:00:00|100.15|
|2013-12-27 00:00:00| 99.63|
|2013-12-26 00:00:00|  99.2|
|2013-12-24 00:00:00| 98.62|
+-------------------+------+
only showing top 5 rows

root
 |-- Date: timestamp (nullable = true)
 |-- Open: float (nullable = true)



In [42]:
convertTimeFactors = []
convertTimeFactors.append(convertTimeFactor(factors[0],False))
convertTimeFactors.append(convertTimeFactor(factors[1],False))
convertTimeFactors.append(convertTimeFactor(factors[2],True).orderBy('Date', ascending = True))
convertTimeFactors.append(convertTimeFactor(factors[3],True).orderBy('Date', ascending = True))

In [43]:
convertTimeFactors[2].show(5)
convertTimeFactors[2].printSchema()

+-------------------+-----+
|               Date| Open|
+-------------------+-----+
|2000-01-04 00:00:00| 25.2|
|2000-01-05 00:00:00| 25.5|
|2000-01-06 00:00:00| 24.8|
|2000-01-07 00:00:00|24.65|
|2000-01-10 00:00:00|24.22|
+-------------------+-----+
only showing top 5 rows

root
 |-- Date: timestamp (nullable = true)
 |-- Open: float (nullable = true)



### Điền dữ liệu bị thiếu 

In [44]:
convertTimeFactors[0].rdd.filter(lambda x : x['Open'] == None).count()

0

In [45]:
# stocka.rdd.filter(lambda x : not isinstance(x[1], float)).collect()
# factors = [fillData(factor) for factor in factors]

### Đánh chỉ số cho dữ liệu 

In [46]:
factorsZipIndex = [zipWithIndex(factor) for factor in convertTimeFactors]

In [47]:
factorsZipIndex[3].show(5)

+-----+-------------------+------------------+
|index|               Date|              Open|
+-----+-------------------+------------------+
|    0|2000-01-03 00:00:00| 6.620999813079834|
|    1|2000-01-04 00:00:00|6.5370001792907715|
|    2|2000-01-05 00:00:00| 6.620999813079834|
|    3|2000-01-06 00:00:00| 6.554999828338623|
|    4|2000-01-07 00:00:00| 6.539999961853027|
+-----+-------------------+------------------+
only showing top 5 rows



### Tính chênh lệch 

In [48]:
factorsReturn = [computeReturns(factor).toDF(['Date','Open_'+str(i)]) for i,factor in enumerate(factorsZipIndex)]

In [49]:
sortedFactorsReturn = []
sortedFactorsReturn.append(factorsReturn[0])
sortedFactorsReturn.append(factorsReturn[1])
sortedFactorsReturn.append(factorsReturn[2].orderBy('Date', ascending = True))
sortedFactorsReturn.append(factorsReturn[3].orderBy('Date', ascending = True))

In [50]:
for fac in sortedFactorsReturn:
    fac.show()

+-------------------+----------------+
|               Date|          Open_0|
+-------------------+----------------+
|2000-01-03 00:00:00| -126.5400390625|
|2000-01-05 00:00:00|  350.7099609375|
|2000-01-07 00:00:00|579.289794921875|
|2000-01-11 00:00:00| 143.34033203125|
|2000-01-13 00:00:00|            95.0|
|2000-01-18 00:00:00|-98.579833984375|
|2000-01-20 00:00:00|    -70.41015625|
|2000-01-24 00:00:00| -16.35009765625|
|2000-01-26 00:00:00|  285.4599609375|
|2000-01-28 00:00:00|479.190185546875|
|2000-02-01 00:00:00|454.449951171875|
|2000-02-03 00:00:00| 348.35986328125|
|2000-02-07 00:00:00| 158.80029296875|
|2000-02-09 00:00:00|  123.7099609375|
|2000-02-11 00:00:00|  85.73974609375|
|2000-02-15 00:00:00|  317.2998046875|
|2000-02-17 00:00:00|           363.0|
|2000-02-22 00:00:00| 559.14013671875|
|2000-02-24 00:00:00| 329.18994140625|
|2000-02-28 00:00:00|  303.9599609375|
+-------------------+----------------+
only showing top 20 rows

+-------------------+-----------------

In [51]:
sortedFactorsReturn[1].printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open_1: double (nullable = true)



### Dùng phép join với 4 cột chứa dữ liệu của 4 factor để đồng nhất ngày giữa các factor.

In [52]:
def joinFactors(factors) :
    f =factors[0]
    for factor in factors[1:] :
        f = f.join(factor, 'Date', 'inner')
    return f

# header = 
joinFactorsReturn = joinFactors(factorsReturn)

In [53]:
joinFactorsReturn.cache()

DataFrame[Date: timestamp, Open_0: double, Open_1: double, Open_2: double, Open_3: double]

In [54]:
joinFactorsReturn.show(5)

+-------------------+-----------------+------------------+-------------------+--------------------+
|               Date|           Open_0|            Open_1|             Open_2|              Open_3|
+-------------------+-----------------+------------------+-------------------+--------------------+
|2000-06-27 00:00:00| 120.190185546875|  25.5699462890625| -1.260000228881836|-0.05299997329711914|
|2000-07-06 00:00:00| 233.719970703125|    35.72998046875| 0.5399990081787109|-0.09800004959106445|
|2001-01-03 00:00:00| 442.179931640625|  46.6199951171875| 2.3800010681152344|-0.01799964904785...|
|2001-03-07 00:00:00|-378.489990234375|-111.1800537109375|-2.1799983978271484|-0.02899980545043...|
|2001-04-26 00:00:00|  109.47998046875|     26.7900390625| 0.9300003051757812| 0.05200004577636719|
+-------------------+-----------------+------------------+-------------------+--------------------+
only showing top 5 rows



In [55]:
joinFactorsReturn.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open_0: double (nullable = true)
 |-- Open_1: double (nullable = true)
 |-- Open_2: double (nullable = true)
 |-- Open_3: double (nullable = true)



In [56]:
sortedJoinFactorsReturn = joinFactorsReturn.orderBy('Date')

In [57]:
sortedJoinFactorsReturn.show(5)

+-------------------+----------------+----------------+------------------+--------------------+
|               Date|          Open_0|          Open_1|            Open_2|              Open_3|
+-------------------+----------------+----------------+------------------+--------------------+
|2000-01-04 00:00:00|  96.27001953125|-0.0799560546875|3.5999984741210938| 0.18199968338012695|
|2000-01-05 00:00:00|  350.7099609375|  56.47998046875|3.8999996185302734| 0.11899995803833008|
|2000-01-06 00:00:00|  402.2099609375|   43.4599609375|3.0500011444091797| 0.14600038528442383|
|2000-01-07 00:00:00|579.289794921875|37.9100341796875|3.5799999237060547| 0.10900020599365234|
|2000-01-10 00:00:00| 122.52001953125| -39.93994140625|3.5600013732910156|0.053999900817871094|
+-------------------+----------------+----------------+------------------+--------------------+
only showing top 5 rows



In [58]:
sortedJoinFactorsReturn.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open_0: double (nullable = true)
 |-- Open_1: double (nullable = true)
 |-- Open_2: double (nullable = true)
 |-- Open_3: double (nullable = true)



In [59]:
print 'Số lượng dữ liệu sau khi đồng nhất ngày: ' + repr(sortedJoinFactorsReturn.count())

Số lượng dữ liệu sau khi đồng nhất ngày: 3479


In [60]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline, Transformer
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import math

# hàm udf hỗ trợ việc định nghĩa ra các hàm xử lý cho cột được chọn của bảng
# tính bình phương cho giá trị, dấu lấy theo dấu của giá trị gốc
udf_featurize_sqr = udf(lambda x: math.copysign(1, x)*(x)**2, FloatType())

# Tính căn bậc 2 cho giá trị, dấu lấy theo dấu của giá trị gốc.
udf_featurize_sqrt = udf(lambda x: math.copysign(1, x)* math.sqrt(abs(x)), FloatType())

# CUSTOM TRANSFORMER ----------------------------------------------------------------

class CreateNewFeature(Transformer):
    """
    Lớp CreateNewFeature được kế thừa từ lớp transformer nhằm mục đích thực hiện tính để lấy feature mới 
    từ các feature gốc(lấy từ các factors). Các phép tính bao gồm lấy bình phương giá trị hoặc căn bậc 2,
    dấu lấy theo dấu của giá trị gốc.
    
    """

    def __init__(self, inputCol, outputCol, opt='square'):
        super(CreateNewFeature, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.opt = opt

    def _transform(self, df):

        if self.opt == 'square':
            return df.withColumn(self.outputCol, udf_featurize_sqr(self.inputCol))
        else: 
            return df.withColumn(self.outputCol, udf_featurize_sqrt(self.inputCol))
            


all_stages = [CreateNewFeature(inputCol='Open_'+str(i), outputCol='F_sqr_'+str(i), opt='square') for i in range(4)]+\
            [CreateNewFeature(inputCol='Open_'+str(i), outputCol='F_sqrt_'+str(i), opt='square_root') for i in range(4)]

# Tạo một pipline để xử lý liên tiếp với các cột để tạo ra giá trị mới
df_with_featurized = Pipeline(stages=all_stages).fit(sortedJoinFactorsReturn).transform(sortedJoinFactorsReturn)



In [61]:
df_with_featurized.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open_0: double (nullable = true)
 |-- Open_1: double (nullable = true)
 |-- Open_2: double (nullable = true)
 |-- Open_3: double (nullable = true)
 |-- F_sqr_0: float (nullable = true)
 |-- F_sqr_1: float (nullable = true)
 |-- F_sqr_2: float (nullable = true)
 |-- F_sqr_3: float (nullable = true)
 |-- F_sqrt_0: float (nullable = true)
 |-- F_sqrt_1: float (nullable = true)
 |-- F_sqrt_2: float (nullable = true)
 |-- F_sqrt_3: float (nullable = true)



In [62]:
df_with_featurized['Open_0', 'F_sqr_0', 'F_sqrt_0'].show()

+----------------+----------+----------+
|          Open_0|   F_sqr_0|  F_sqrt_0|
+----------------+----------+----------+
|  96.27001953125|  9267.917| 9.8117285|
|  350.7099609375| 122997.48| 18.727251|
|  402.2099609375| 161772.86| 20.055174|
|579.289794921875| 335576.66|  24.06844|
| 122.52001953125| 15011.155| 11.068876|
| 143.34033203125| 20546.451| 11.972483|
|169.550048828125| 28747.219| 13.021138|
|            95.0|    9025.0|  9.746795|
| -171.8798828125|-29542.693|-13.110297|
|-98.579833984375| -9717.983| -9.928738|
|-57.239990234375|-3276.4165|-7.5657115|
|    -70.41015625|-4957.5903| -8.391076|
|             5.5|     30.25|  2.345208|
| -16.35009765625|-267.32568| -4.043525|
|   245.259765625|  60152.35| 15.660771|
|  285.4599609375|  81487.39| 16.895561|
|    269.66015625|   72716.6| 16.421331|
|479.190185546875| 229623.23| 21.890413|
|560.539794921875| 314204.88| 23.675722|
|454.449951171875| 206524.77| 21.317831|
+----------------+----------+----------+
only showing top

### Ghép features
Các features vừa được sinh ra cùng với các features gốc của mỗi factor sẽ được ghép lại với nhau thành một vector( merge_feature) dùng để train cho model linear regression<br>
Ta sẽ thu được một cột mới với mỗi dòng là một vector của 9 features: 3 features gốc, 3 features cho bình phương, 3 features cho căn bậc 2

In [63]:
df_assembledFactors = VectorAssembler(inputCols=['Open_'+str(k) for k in range(4)]+\
                               ['F_sqr_'+str(i) for i in range(4)] +\
                               ['F_sqrt_'+str(i) for i in range(4)], outputCol='merge_feature')\
    .transform(df_with_featurized)\
    .drop('F_sqr_0','F_sqr_1','F_sqr_2','F_sqr_3')\
    .drop('F_sqrt_0','F_sqrt_1','F_sqrt_2','F_sqrt_3',)\

df_assembledFactors.show(truncate=True)

+-------------------+----------------+-----------------+--------------------+--------------------+--------------------+
|               Date|          Open_0|           Open_1|              Open_2|              Open_3|       merge_feature|
+-------------------+----------------+-----------------+--------------------+--------------------+--------------------+
|2000-01-04 00:00:00|  96.27001953125| -0.0799560546875|  3.5999984741210938| 0.18199968338012695|[96.27001953125,-...|
|2000-01-05 00:00:00|  350.7099609375|   56.47998046875|  3.8999996185302734| 0.11899995803833008|[350.7099609375,5...|
|2000-01-06 00:00:00|  402.2099609375|    43.4599609375|  3.0500011444091797| 0.14600038528442383|[402.2099609375,4...|
|2000-01-07 00:00:00|579.289794921875| 37.9100341796875|  3.5799999237060547| 0.10900020599365234|[579.289794921875...|
|2000-01-10 00:00:00| 122.52001953125|  -39.93994140625|  3.5600013732910156|0.053999900817871094|[122.52001953125,...|
|2000-01-11 00:00:00| 143.34033203125|-4

In [64]:
FinalFactorsReturn = df_assembledFactors['Date', 'merge_feature'] 
# Lấy ra 2 cột Date, Merge_feature để join với bảng Stock

In [65]:
print 'Features: ', FinalFactorsReturn.select('merge_feature').rdd.take(1)[0]['merge_feature']

Features:  [96.27001953125,-0.0799560546875,3.5999984741210938,0.18199968338012695,9267.9169921875,-0.006392970681190491,12.959988594055176,0.03312388435006142,9.811728477478027,-0.2827650308609009,1.8973661661148071,0.4266141951084137]


# 2. Xây dựng mô hình 
![title](data/image/m.png)

### Join giữa bảng stock( chứa giá trị thay đổi giữa 2 tuần) và bảng factors( chứa các features vừa được sinh ra)

In [66]:
DataStockFactors = df_stocksReturn.join(FinalFactorsReturn, 'Date','inner')

In [67]:
DataStockFactors.select('Open').describe().show()

+-------+--------------------+
|summary|                Open|
+-------+--------------------+
|  count|                3464|
|   mean|0.004592956299732244|
| stddev|  0.2649215603349682|
|    min|  -2.809999942779541|
|    max|  2.7799999713897705|
+-------+--------------------+



In [68]:
print 'số dữ liệu sau khi ghép bảng: ',DataStockFactors.count()

số dữ liệu sau khi ghép bảng:  3464


In [69]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'merge_feature', labelCol = 'Open', solver='l-bfgs')

### Fit dữ liệu vào model để train

In [70]:
lr_model = lr.fit(DataStockFactors)

In [71]:
pred = lr_model.transform(DataStockFactors)
pred.show(500)

+-------------------+--------------------+--------------------+--------------------+
|               Date|                Open|       merge_feature|          prediction|
+-------------------+--------------------+--------------------+--------------------+
|2000-01-04 00:00:00| 0.06999993324279785|[96.27001953125,-...|-0.06149214276085421|
|2000-01-05 00:00:00| 0.19000005722045898|[350.7099609375,5...|0.011120189824706198|
|2000-01-06 00:00:00| 0.13000011444091797|[402.2099609375,4...|-0.02629229158335...|
|2000-01-07 00:00:00| 0.07000017166137695|[579.289794921875...|-0.03674202198616387|
|2000-01-10 00:00:00|                 0.0|[122.52001953125,...|-0.13851882595937398|
|2000-01-11 00:00:00|                 0.5|[143.34033203125,...|-0.15902514182624705|
|2000-01-12 00:00:00|                0.25|[169.550048828125...|-0.12607552924096396|
|2000-01-13 00:00:00| 0.11999988555908203|[95.0,-33.6899414...|-0.09918550476127679|
|2000-01-14 00:00:00|   0.440000057220459|[-171.8798828125,...|-0

In [72]:
print 'Features: ', pred.select('merge_feature').rdd.take(1)

Features:  [Row(merge_feature=DenseVector([96.27, -0.08, 3.6, 0.182, 9267.917, -0.0064, 12.96, 0.0331, 9.8117, -0.2828, 1.8974, 0.4266]))]


### Thông số của model khi huấn luyện

In [73]:
trainingSummary = lr_model.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2) # % biến thiên trong model có thể xấp xỉ khi sử dụng model
# print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))

numIterations: 53
RMSE: 0.258479
r2: 0.047771


In [74]:
pred.describe().show()

+-------+--------------------+--------------------+
|summary|                Open|          prediction|
+-------+--------------------+--------------------+
|  count|                3464|                3464|
|   mean|0.004592956299732244|0.004592956299732246|
| stddev| 0.26492156033496794|0.057903904840036956|
|    min|  -2.809999942779541| -0.3893831011249541|
|    max|  2.7799999713897705|  0.3394912399767216|
+-------+--------------------+--------------------+



In [75]:
# from pyspark.ml.evaluation import RegressionEvaluator 
# evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Open')
# evaluator.setMetricName('r2').evaluate(pred)

## 3. Thực hiện mô phỏng và tính giá trị VaR

### Xác định dạng phân phối của các điều kiện thị trường
Vì chúng ta không thể xác định trực tiếp các phân phối của các yếu tố thị trường nên ta cần phải xấp xỉ các phân phối này. Để làm được điều đó, ta sẽ xem xét đồ thị của các phân phối.

In [76]:
from pyspark.mllib.stat import KernelDensity
import matplotlib.pyplot as plt

def plotDistribution(samples, plot=True, numSamples=100):
    vmin = min(samples)
    vmax = max(samples)
    stddev = np.std(samples)
    
    domain = [float(x) for x in np.arange(vmin, vmax, (vmax-vmin)/numSamples)]
    
    # a simple heuristic to select bandwidth
    bandwidth = 1.06 * stddev * pow(len(samples), -.2)
    
    # estimate densityplotDistribution
#     kde = KDEUnivariate(samples)
#     kde.fit(bw=bandwidth)
#     density = kde.evaluate(domain)
    
    kd = KernelDensity()
    kd.setSample(sc.parallelize(samples))
    kd.setBandwidth(bandwidth)
    density = kd.estimate(domain)
    
    # plot
    # We do a little change because later we will use the data but plot it after
    if(plot):
        plt.plot(domain, density)
        plt.show()
    else:
        return domain,density

for i, f_return in enumerate(factorsReturn):
    plotDistribution(f_return.select("Open_" + str(i)).toPandas()["Open_" + str(i)])
    
# plotDistribution(factorsReturn[0])
# plotDistribution(factorsReturn[1])
# plotDistribution(factorsReturn[2])
# plotDistribution(factorsReturn[3])

NameError: global name 'np' is not defined

In [None]:
# pred_s = plotDistribution(pred.select('prediction').toPandas()['prediction'], plot=False)
# true_s = plotDistribution(pred.select('Open').toPandas()['Open'],plot=False)

# plt.plot(pred_s[0],pred_s[1])
# plt.plot(true_s[0], true_s[1])
# plt.show()

Để đơn giản, ta coi các phân phối trên có thể được biểu diễn gần đúng bằng một phân phối chuẩn. Do đó, để lấy mẫu các yếu tố lợi nhuận của thị trường, ta có thể sử dụng phân phối chuẩn cho từng yếu tố và lấy mẫu từ các phân phối này một cách độc lập. Tuy nhiên, cách tiếp cận này sẽ bỏ qua thực tế là các yếu tố thị trường thường có mối tương quan. Ví dụ, khi giá dầu thô giảm, giá trái phiếu kho bạc cũng giảm theo.

Phân phối chuẩn nhiều chiều (multivariate normal distribution) có thể giải quyết vấn đề này bằng cách lấy thông tin tương quan giữa các yếu tố. Mỗi mẫu từ một phân phối chuẩn nhiều chiều có thể được coi là một vectơ. Nếu xem xét theo từng chiều, phân phối các giá trị dọc theo chiều đó có dạng phân phối chuẩn. Tuy nhiên trong phân phối chung nhiều chiều, các biến sẽ không còn độc lập với nhau.

Cụ thể, các yếu tố thị trường theo phân phối chuẩn nhiều chiều được tính bởi công thức sau:
$$\left(\begin{array}{c}f_{1}\\f_{2}\\f_{3}\\f_{4} \end{array}\right)
\sim N 
\left[
  \left(
    \begin{array}{c}
      \mu_1\\ \mu_2 \\ \mu_3 \\ \mu_4 
    \end{array}
  \right), 
  \left(
    \begin{array}{cccc}
      \sigma^2_1 & \rho_{12} \sigma_1\sigma_2 & \rho_{13} \sigma_1\sigma_3 & \rho_{14} \sigma_1\sigma_4 \\ 
      \rho_{12}\sigma_2\sigma_1 & \sigma^2_2 & \rho_{23} \sigma_2\sigma_3 & \rho_{24} \sigma_2\sigma_4\\
      \rho_{13} \sigma_3\sigma_1 & \rho_{23} \sigma_3\sigma_2 & \sigma^2_3 & \rho_{34} \sigma_3\sigma_4 \\ 
      \rho_{14} \sigma_4\sigma_1 & \rho_{24} \sigma_4\sigma_2 & \rho_{34} \sigma_3\sigma_4 & \sigma_4^2 \\ 
    \end{array}
  \right)
\right]$$
hay:
$$f_t \sim N(\mu, \sum)$$

Trong đó $f_1$, $f_2$, $f_3$ và $f_4$ là các yếu tố thị trường, $\sigma_i$ là độ lệch chuẩn của yếu tố $i$, $\mu$ là vectơ kỳ vọng của sự biến động của các yếu tố và $\sigma$ là ma trận hiệp phương sai của sự biến động của các yếu tố.

Phân phối chuẩn nhiều chiều được tham số hóa với giá trị kỳ vọng dọc theo mỗi chiều và ma trận mô tả hiệp phương sai giữa mỗi cặp của các chiều.  Khi ma trận hiệp phương sai là ma trận chéo, phân phối sẽ lấy mẫu theo từng chiều một cách độc lập. Ngược lại, khi các giá trị ngoài các đường chéo khác không, phân phối sẽ nắm bắt mối quan hệ giữa các biến.


Tiếp theo, chúng ta sẽ tính toán vectơ kỳ vọng và ma trận hiệp phương sai của phân phối chuẩn theo nhiều chiều từ dữ liệu lịch sử.



### Tính vectơ kỳ vọng
Vectơ kỳ vọng được tính bằng cách sử dụng dữ liệu lịch sử

In [None]:
from pyspark.mllib.stat import Statistics

features = joinFactorsReturn.rdd.map(lambda row: row[1:])
cStats = Statistics.colStats(features)
means = cStats.mean()

In [None]:
print(means)

### Tính ma trận hiệp phương sai
Tính ma trận hiệp phương sai theo dữ liệu lịch sử

In [None]:
from pyspark.mllib.linalg.distributed import RowMatrix

rm = RowMatrix(joinFactorsReturn.rdd.map(lambda x: list(x[1:])))
#Tính ma trận hiệp phương sai corr_mat
corr_mat = rm.computeCovariance().toArray().tolist()

In [None]:
print(corr_mat)

### Tính phân phối theo nhiều chiều

In [None]:
import numpy as np

sample = np.random.multivariate_normal(means, corr_mat)
print(sample)

Để kiểm tra độ chính xác, ta so sánh đồ thị của các phân phối chuẩn xấp xỉ với phân phối thực tế.

In [None]:
numSamples = 50000 # to plot normal distributions
f, axarr = plt.subplots(2, 2)
f.set_figwidth(20)
f.set_figheight(10)
for idx in range(0,4):
    factorReturn = [x[idx] for x in features.collect()]
    i, j = divmod(idx, 2)
    ax = axarr[i, j]
    normalEstimates = [float(np.random.multivariate_normal(means, corr_mat)[idx]) for k in range(numSamples)]
    domainEstimates, densityEstimates = plotDistribution(normalEstimates, plot=False)
    domainFactor, densityFactor = plotDistribution(factorReturn, plot=False)
    ax.plot(domainEstimates, densityEstimates, lw=2)
    ax.plot(domainFactor, densityFactor, lw=2)
    ax.set_title("Variations over")
    ax.set_xlabel("2 weeks window") 
    ax.set_ylabel("variation over window") 
    ax.legend(["estimate", "real"], fontsize="xx-large")
f.subplots_adjust(hspace=0.3)

### Thực hiện mô phỏng và tính giá trị VaR

Trong phần này, chúng ta sẽ chạy mô phỏng Monte Carlo với 10.000 lần chạy, sử dụng Spark để song song hóa.

In [None]:
def sign(number):
    if number<0:
        return -1
    else:
        return 1

def featurize(factorReturns):
    factorReturns = list(factorReturns)
    squaredReturns = [sign(element)*(element)**2 for element in factorReturns]
    squareRootedReturns = [sign(element)*abs(element)**0.5 for element in factorReturns]
    # concat new features
    return squaredReturns + squareRootedReturns + factorReturns

def simulateTrialReturns(numTrials, factorMeans, factorCov, weights, testFeature = None):
    trialReturns = []
    for i in range(0, numTrials):
        # generate sample of factors' returns
        if testFeature is None: 
            trialFactorReturns = np.random.multivariate_normal(factorMeans, factorCov)
        
            # featurize the factors' returns
            trialFeatures = featurize(trialFactorReturns)
        else: 
            trialFeatures = testFeature
        
        # insert weight for intercept term
        trialFeatures.insert(0,1)
        
        # calculate the return of each instrument
        # then calulate the total of return for this trial features
        instrumentReturn =  sum(weights * trialFeatures)
        print(instrumentReturn)
        
        trialReturns.append(instrumentReturn)
    return trialReturns


parallelism = 4
numTrials = 10000
trial_indexes = list(range(0, parallelism))
seedRDD = sc.parallelize(trial_indexes, parallelism)

weights = np.append(lr_model.intercept, lr_model.coefficients)
bFactorWeights = sc.broadcast(weights)
feat = [96.27001953125,-0.0799560546875,3.5999984741210938,0.18199968338012695,9267.9169921875,
        -0.006392970681190491,12.959988594055176,0.03312388435006142,9.811728477478027,
        -0.2827650308609009,1.8973661661148071,0.4266141951084137]
print simulateTrialReturns(1, means, corr_mat, bFactorWeights.value, feat)

trials = seedRDD.flatMap(lambda idx: \
                simulateTrialReturns(
                    max(int(numTrials/parallelism), 1), 
                    means, corr_mat,
                    bFactorWeights.value,
                ))
trials.collect()
trials.cache()

Ta nhận được đồ thị phân phối lợi nhuận ngẫu nhiên như sau:

In [None]:
plotDistribution(trials.map(lambda x: float(x)).collect())

Để tính giá trị 5% VaR, chúng ta cần ra giá trị sao cho chỉ có 5% khả năng thua lỗ có giá trị lớn hơn nó. Với phân phối đã tính ở bước trên, điều này được thực hiện đơn giản bằng cách tìm ra giá trị mà có 5% giá trị thử nghiệm nhỏ hơn và 95% giá trị thử nghiệm lớn hơn. Đầu tiên ta sử dụng phương thức TakeOrdered để lấy ra 5% thử nghiệm tồi nhất và sau đó VaR được tính bằng giá trị lớn nhất trong tập con này.

Ta có tính CVaR bằng cách tương tự. Thay vì lấy giá trị thử nghiệm tốt nhất từ 5% thử nghiệm tồi tệ nhất, chúng ta lấy giá trị trung bình từ nhóm thử nghiệm đó:

In [None]:
def fivePercentVaR(trials):
    numTrials = trials.count()
    topLosses = trials.takeOrdered(int(max(round(numTrials/20.0), 1)))
    return topLosses[-1]

# an extension of VaR
def fivePercentCVaR(trials):
    numTrials = trials.count()
    topLosses = trials.takeOrdered(int(max(round(numTrials/20.0), 1)))
    return sum(topLosses)/len(topLosses)

In [None]:
valueAtRisk = fivePercentVaR(trials)
conditionalValueAtRisk = fivePercentCVaR(trials)

print "Value at Risk(VaR) 5%:", valueAtRisk
print "Conditional Value at Risk(CVaR) 5%:", conditionalValueAtRisk

In [None]:
trials.collect()

In [None]:
lr_model.coefficients