# Value at risk - portfolio ETL

In this notebook, we show how to 
- ingest yahoo tick market data onto delta tables through `yfinance` module 
- compute fortnightly returns through a tumbling window on Spark SQL

This notebook requires the following dependencies
- `yfinance`


# `STEP0` Configuration

In [0]:
import yfinance as yf
import pandas as pd
import numpy as np
from io import StringIO
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf, PandasUDFType
from datetime import datetime, timedelta

In [0]:
portfolio_table = 'var_portfolio'
stock_table = 'var_stock'
stock_return_table = 'var_stock_return'
market_table = 'var_market'
market_return_table = 'var_market_return'

# `STEP1` Create our portfolio

In [0]:
portfolio = """
country,company,ticker,industry
CHILE,Banco de Chile,BCH,Banks
CHILE,Banco Santander-Chile,BSAC,Banks
CHILE,Compañía Cervecerías Unidas S.A.,CCU,Beverages
CHILE,Itaú CorpBanca,ITCB,Banks
CHILE,"Embotelladora Andina, S.A.",AKOA,Beverages
CHILE,"Embotelladora Andina, S.A.",AKOB,Beverages
CHILE,"Empresa Nacional de Electricidad, S.A. (Chile)",EOCC,Electricity
CHILE,"Enersis, S.A.",ENIA,Electricity
CHILE,Enersis Chile SA Sponsored ADR,ENIC,Electricity
CHILE,LAN Airlines S.A.,LFL,Travel & Leisure
CHILE,"SQM-Sociedad Química y Minera de Chile, S.A.",SQM,Chemicals
CHILE,"Viña Concha y Toro, S.A.",VCO,Beverages
COLOMBIA,Avianca Holdings S.A.,AVH,Travel & Leisure
COLOMBIA,BanColombia S.A.,CIB,Banks
COLOMBIA,Ecopetrol S.A.,EC,Oil & Gas Producers
COLOMBIA,Grupo Aval Acciones y Valores S.A,AVAL,Financial Services
MEXICO,"América Móvil, S.A.B. de C.V.",AMX,Mobile Telecommunications
MEXICO,América Móvil SAB de CV Sponsored ADR Class A,AMOV,Mobile Telecommunications
MEXICO,CEMEX S.A.B. de C.V. (CEMEX),CX,Construction & Materials
MEXICO,"Coca-Cola FEMSA, S.A.B. de C.V.",KOF,Beverages
MEXICO,"Controladora Vuela Compañía de Aviación, S.A.B. de C.V",VLRS,Travel & Leisure
MEXICO,"Fomento Económico Mexicano, S.A.B. de C.V. (FEMSA)",FMX,Beverages
MEXICO,"Grupo Aeroportuario del Pacífico, S.A.B. de C.V. (GAP)",PAC,Industrial Transportation
MEXICO,"Grupo Aeroportuario del Sureste, S.A. de C.V. (ASUR)",ASR,Industrial Transportation
MEXICO,"Grupo Financiero Santander México, S.A.B. de C.V",BSMX,Banks
MEXICO,"Grupo Simec, S.A. De CV. (ADS)",SIM,Industrial Metals & Mining
MEXICO,"Grupo Televisa, S.A.",TV,Media
MEXICO,"Industrias Bachoco, S.A.B. de C.V. (Bachoco)",IBA,Food Producers
PANAMA,"Banco Latinoamericano de Comercio Exterior, S.A.",BLX,Banks
PANAMA,"Copa Holdings, S.A.",CPA,Travel & Leisure
PERU,Cementos Pacasmayo S.A.A.,CPAC,Construction & Materials
PERU,Southern Copper Corporation,SCCO,Industrial Metals & Mining
PERU,Fortuna Silver Mines Inc.,FSM,Mining
PERU,Compañía de Minas Buenaventura S.A.,BVN,Mining
PERU,Graña y Montero S.A.A.,GRAM,Construction & Materials
PERU,Credicorp Ltd.,BAP,Banks
"""

portfolio_df = pd.read_csv(StringIO(portfolio))

In [0]:
spark \
  .createDataFrame(portfolio_df) \
  .select('ticker', 'company', 'country', 'industry') \
  .write \
  .format('delta') \
  .mode('overwrite') \
  .saveAsTable(portfolio_table)

display(spark.read.table(portfolio_table))

ticker,company,country,industry
VLRS,"Controladora Vuela Compañía de Aviación, S.A.B. de C.V",MEXICO,Travel & Leisure
FMX,"Fomento Económico Mexicano, S.A.B. de C.V. (FEMSA)",MEXICO,Beverages
PAC,"Grupo Aeroportuario del Pacífico, S.A.B. de C.V. (GAP)",MEXICO,Industrial Transportation
ASR,"Grupo Aeroportuario del Sureste, S.A. de C.V. (ASUR)",MEXICO,Industrial Transportation
BSMX,"Grupo Financiero Santander México, S.A.B. de C.V",MEXICO,Banks
AVAL,Grupo Aval Acciones y Valores S.A,COLOMBIA,Financial Services
AMX,"América Móvil, S.A.B. de C.V.",MEXICO,Mobile Telecommunications
AMOV,América Móvil SAB de CV Sponsored ADR Class A,MEXICO,Mobile Telecommunications
CX,CEMEX S.A.B. de C.V. (CEMEX),MEXICO,Construction & Materials
KOF,"Coca-Cola FEMSA, S.A.B. de C.V.",MEXICO,Beverages


# `STEP2` Download stock data

In [0]:
schema = StructType(
  [
    StructField('ticker', StringType(), True), 
    StructField('date', DateType(), True),
    StructField('open', DoubleType(), True),
    StructField('high', DoubleType(), True),
    StructField('low', DoubleType(), True),
    StructField('close', DoubleType(), True),
    StructField('volume', DoubleType(), True),
  ]
)

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def fetch_tick(group, pdf):
  tick = group[0]
  try:
    msft = yf.Ticker(tick)
    raw = msft.history(period="2y")[['Open', 'High', 'Low', 'Close', 'Volume']]
    # fill in missing business days
    idx = pd.date_range(raw.index.min(), raw.index.max(), freq='B')
    # use last observation carried forward for missing value
    output_df = raw.reindex(idx, method='pad')
    # Pandas does not keep index (date) when converted into spark dataframe
    output_df['date'] = output_df.index
    output_df['ticker'] = tick    
    output_df = output_df.rename(columns={"Open": "open", "High": "high", "Low": "low", "Volume": "volume", "Close": "close"})
    return output_df
  except:
    return pd.DataFrame(columns = ['ticker', 'date', 'open', 'high', 'low', 'close', 'volume'])
  
spark \
  .read \
  .table(portfolio_table) \
  .groupBy("ticker") \
  .apply(fetch_tick) \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable(stock_table)

display(spark.read.table(stock_table))

ticker,date,open,high,low,close,volume
CIB,2018-05-14,45.04,45.51,44.93,45.35,335000.0
CIB,2018-05-15,45.01,45.46,44.58,45.25,1693700.0
CIB,2018-05-16,45.14,45.25,43.84,44.07,1145300.0
CIB,2018-05-17,43.93,44.03,42.71,42.72,634200.0
CIB,2018-05-18,42.67,42.93,42.36,42.47,334600.0
CIB,2018-05-21,42.65,42.81,42.17,42.66,539400.0
CIB,2018-05-22,42.84,43.72,42.84,43.31,640400.0
CIB,2018-05-23,42.94,43.36,42.63,42.95,519400.0
CIB,2018-05-24,42.82,42.85,42.22,42.57,806600.0
CIB,2018-05-25,42.34,43.37,42.02,43.16,272200.0


In [0]:
try:
  dbutils.widgets.remove('stock')
except:
  print("Unable to delete widget")
  
tickers = spark.read.table(portfolio_table).select('ticker').toPandas()['ticker']
dbutils.widgets.dropdown('stock', 'AVAL', tickers)

In [0]:
display(
  spark \
    .read \
    .table(stock_table) \
    .filter(F.col('ticker') == dbutils.widgets.get('stock')) \
    .orderBy(F.asc('date'))
)

ticker,date,open,high,low,close,volume
AVAL,2018-05-14,7.7383,7.7837,7.6565,7.7292,27400.0
AVAL,2018-05-15,7.6747,7.6747,7.5475,7.602,114700.0
AVAL,2018-05-16,7.6656,7.6838,7.5793,7.6656,217200.0
AVAL,2018-05-17,7.6475,7.6565,7.5021,7.602,131100.0
AVAL,2018-05-18,7.5657,7.6202,7.4476,7.6111,147100.0
AVAL,2018-05-21,7.6565,7.6565,7.4931,7.5748,167600.0
AVAL,2018-05-22,7.602,7.8654,7.5566,7.7928,171300.0
AVAL,2018-05-23,7.7383,7.7928,7.602,7.7383,331600.0
AVAL,2018-05-24,7.7201,7.82,7.6656,7.7746,91700.0
AVAL,2018-05-25,7.7383,7.8109,7.6202,7.7746,140600.0


# `STEP3` Download market factors

In [0]:
factors = {
  '^GSPC':'SP500',
  '^NYA':'NYSE',
  '^XOI':'OIL',
  '^TNX':'TREASURY',
  '^DJI':'DOWJONES'
}

# Create a pandas dataframe where each column contain close index
factors_df = pd.DataFrame()
for tick in factors.keys():    
    msft = yf.Ticker(tick)
    raw = msft.history(period="2y")
    # fill in missing business days
    idx = pd.date_range(raw.index.min(), raw.index.max(), freq='B')
    # use last observation carried forward for missing value
    pdf = raw.reindex(idx, method='pad')
    factors_df[factors[tick]] = pdf['Close'].copy()
        
# Pandas does not keep index (date) when converted into spark dataframe
factors_df['Date'] = idx

# Overwrite delta table (bronze) with information to date
spark.createDataFrame(factors_df) \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable(market_table)

# `STEP4` Compute daily log return

In [0]:
# our market factors easily fit in memory, use pandas for convenience
df = spark.table(market_table).toPandas()

# add date column as pandas index for sliding window
df.index = df['Date']
df = df.drop(columns = ['Date'])

# compute daily log returns
df = np.log(df.shift(1)/df)

# add date columns
df['date'] = df.index

# overwrite log returns to market table (gold)
spark.createDataFrame(df) \
  .write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable(market_return_table)

In [0]:
# Create UDF for computing daily log returns
@udf("double")
def compute_return(first, close):
  return float(np.log(close / first))

# Apply a tumbling 1 day window on each instrument
window = Window.partitionBy('ticker').orderBy('date').rowsBetween(-1, 0)

# apply sliding window and take first element
# compute returns
# make sure we have corresponding dates in market factor tables
sdf = spark.table(stock_table) \
  .filter(F.col('close').isNotNull()) \
  .withColumn("first", F.first('close').over(window)) \
  .withColumn("return", compute_return('first', 'close')) \
  .select('date', 'ticker', 'return') \
  .join(spark.table(market_return_table), 'date') \
  .select('date', 'ticker', 'return')

# overwrite log returns to market table (gold)
sdf.write \
  .format("delta") \
  .mode("overwrite") \
  .saveAsTable(stock_return_table)

In [0]:
display(spark.table(stock_return_table).filter(F.col('ticker') == dbutils.widgets.get('stock')))

date,ticker,return
2018-05-14,AVAL,0.0
2018-05-15,AVAL,-0.0165939937916512
2018-05-16,AVAL,0.0083314165796637
2018-05-17,AVAL,-0.0083314165796636
2018-05-18,AVAL,0.0011963375098231
2018-05-21,AVAL,-0.0047807596702671
2018-05-22,AVAL,0.0283732820679958
2018-05-23,AVAL,-0.0070182052398784
2018-05-24,AVAL,0.0046799845617904
2018-05-25,AVAL,0.0


# `STEP5` Ensure data consistency

In [0]:
%sql
DESCRIBE HISTORY var_market_return

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics
0,2020-05-12T00:31:01.000+0000,3658755248564160,antoine.amend@databricks.com,WRITE,"Map(mode -> Overwrite, partitionBy -> [])",,List(4019741983524224),0512-002306-kited766,,WriteSerializable,False,"Map(numFiles -> 8, numOutputBytes -> 36571, numOutputRows -> 521, numParts -> 0)"


In [0]:
%sql
SELECT * FROM var_market_return
VERSION AS OF 0

SP500,NYSE,OIL,TREASURY,DOWJONES,date
-0.0163211363646534,0.0039683418925462,0.083110747753074,0.4087970562136414,-0.010504636440647,2019-08-19T00:00:00.000+0000
0.0079462520811492,0.0069995840865511,0.0100409204998842,0.023426205810788,0.0066547611980478,2019-08-20T00:00:00.000+0000
-0.0082130062994996,-0.0077165452106995,-0.0099138045487799,-0.0101976664476063,-0.009212725638007,2019-08-21T00:00:00.000+0000
0.0005062096294763818,0.0006736137107804072,0.008570885251418,-0.0207098710154703,-0.0018877149182345,2019-08-22T00:00:00.000+0000
0.0262889355580666,0.0216707126234008,0.0347454955442376,0.0522744882520428,0.0240306977302634,2019-08-23T00:00:00.000+0000
-0.0109231911739489,-0.0082748074328565,-0.0036837009363418,-0.01106421960538,-0.0104771729399331,2019-08-26T00:00:00.000+0000
0.0032083318806849,0.0036465273403995,0.0075764643459135,0.0362477903923409,0.0046802582417934,2019-08-27T00:00:00.000+0000
-0.0065241411868015,-0.0068053669950561,-0.0168971451231083,0.016238516492908,-0.00996650084037,2019-08-28T00:00:00.000+0000
-0.012607435438741,-0.0114634125346536,-0.016427331807751,-0.0335376837557202,-0.0124490260540188,2019-08-29T00:00:00.000+0000
-0.0006426208211522577,-0.0025824562294826,-0.0017515244453785,0.006618157842478,-0.0015551822665124,2019-08-30T00:00:00.000+0000


# `HOMEWORK` Unifying streaming and batch

Can you read new market data as stream using `.readStream` and `Trigger.ONCE` so that only delta is processed?

```
val inputStream = spark
  .readStream
  .format("delta")
  .table("SILVER_TABLE")

val outputStream = inputStream.doSomething()

outputStream
  .writeStream
  .trigger(Trigger.Once)
  .option("checkpointLocation", "/my/checkpoint/dir")
  .format("delta")
  .table("GOLD_TABLE")
  ```