In [None]:
import datetime as dt
import time
from math import copysign

import pandas as pd

from gs_quant.datetime.relative_date import RelativeDate
from gs_quant.markets.position_set import Position, PositionSet, PositionTag
from gs_quant.markets.portfolio import Portfolio
from gs_quant.markets.portfolio_manager import PortfolioManager
from gs_quant.markets.securities import Asset, AssetIdentifier
from gs_quant.markets.report import FactorRiskReport, ReturnFormat
from gs_quant.models.risk_model import FactorRiskModel
from gs_quant.session import GsSession
from gs_quant.target.common import PositionSetWeightingStrategy

from gs_quant.target.hedge import CorporateActionsTypes
from gs_quant.markets.optimizer import OptimizerStrategy, OptimizerUniverse, \
    AssetConstraint, FactorConstraint, SectorConstraint, OptimizerSettings, OptimizerConstraints, \
    OptimizerObjective, OptimizerType

pd.set_option('display.width', 1000)


In [None]:
# Initialize your session
GsSession.use(client_id='client_id', client_secret='client_secret')

First, let's setup our starting point. We will create a small portfolio of 3 stocks.

In [None]:
start_date = dt.date(2024, 1, 2)
reference_notional = 100_000_000
hedge_notional_pct = 0.4
universe = ['SPX']
rebalance_freq = '3m'
risk_model_id = 'AXIOMA_AXUS4S'
holdings = [
    {
        'identifier': 'AAPL UW', 
        'weight'  : 0.4,
        'source': 'Portfolio'
    },
    {
        'identifier': 'MSFT UW',
        'weight'  : 0.25,
        'source': 'Portfolio'
    },
    {
        'identifier': 'META UW',
        'weight'  : 0.25,
        'source': 'Portfolio'
    }
]
apply_factor_constraints_on_total = True

If you do not have historical identifiers of your holdings, you can use SecurityMaster to resolve today's identifiers to a past point in time. In this example, META is an identifier as of today, not as of 2020.  

In [None]:
from gs_quant.api.gs.secmaster import GsSecurityMasterApi

historical_identifiers = {}
 
def get_historical_id(identifier, date, listed=True, id_type='bbid'):
 
    data = GsSecurityMasterApi.get_many_securities(**{"identifier": [identifier], "isPrimary": True})
    if not data or 'results' not in data or not data['results']:
        raise ValueError(f"No security found for identifier {identifier}")
    secm_id = data['results'][0]['id']
    historical_data = GsSecurityMasterApi.get_identifiers(secmaster_id=secm_id)
    for record in historical_data:
        start_date = dt.datetime.strptime(record['startDate'], '%Y-%m-%d').date()
        end_date = dt.datetime.strptime(record['endDate'], '%Y-%m-%d').date() if record['endDate'] != '9999-99-99' else dt.date.today()
        if start_date <= date <= end_date:
            if listed and record['type'] == id_type:
                return record['value']
    return identifier
 
holdings = [
    {'identifier': 'META UW'},
]
start_date = dt.date(2020, 1, 1)
 
for holding in holdings:
    try:
        historical_identifiers[holding['identifier']] = get_historical_id(holding['identifier'], start_date)
    except ValueError as e:
        print(e)
 
historical_identifiers

You can then use this information to update your holdings

In [None]:
# Function to replace incorrect identifiers with correct ones
def update_identifiers(holdings, historical_identifiers):
    for holding in holdings:
        incorrect_id = holding['identifier']
        if incorrect_id in historical_identifiers:
            holding['identifier'] = historical_identifiers[incorrect_id]
    return holdings
 
# Update the portfolio positions
updated_holdings = update_identifiers(holdings, historical_identifiers)
 
# Print the updated portfolio positions
updated_holdings

Once the holdings are finalised, we convert them to a PositionSet object, and resolve them to Marquee Identifiers. We also price our positions to filter out any assets that might be missing prices.

In [None]:
position_set = PositionSet.from_dicts(
    holdings, 
    date=start_date, 
    add_tags=True, 
    reference_notional=reference_notional
)
position_set.resolve()

position_set.to_frame(add_tags=True)

In [None]:
combined_pset = position_set.clone()
combined_pset.price(use_unadjusted_close_price=True,
                   weighting_strategy=PositionSetWeightingStrategy.Weight,
                   fallbackDate='5d')
combined_pset.positions.append(Position(identifier='USD', quantity=hedge_notional_pct*reference_notional*-1, tags=[PositionTag(name='source', value='Optimization')]))
combined_pset.resolve()
combined_pset.price(use_unadjusted_close_price=True,
                    fallbackDate='5d',
                    handle_long_short=True)

combined_pset.to_frame(add_tags=True)

In [None]:
portfolio = Portfolio(name='My New Strategy')
portfolio.save()
print('Created portfolio with id: {0}'.format(portfolio.id))
port_id = portfolio.id

In [None]:
port_manager = PortfolioManager(port_id)
# share the portfolio with your account, as your app is different from yourself 
port_manager.share(emails=['user.1@yourcompany.com'], admin=True)
port_manager.share(emails=['user.2@yourcompany.com'], admin=False)
port_manager.update_positions([combined_pset])
port_manager.set_tag_name_hierarchy(['source'])

risk_report = FactorRiskReport(risk_model_id=risk_model_id)
risk_report.set_position_source(port_id)
risk_report.save()

port_manager.update_portfolio_tree()
port_manager.schedule_reports(start_date=position_set.date, 
                              end_date=RelativeDate("5b", position_set.date).apply_rule())

In [None]:
print('Waiting for risk calculations to complete...')
risk_report = port_manager.get_factor_risk_report(risk_model_id=risk_model_id, tags={'source': 'Portfolio'})
risk_report.get_most_recent_job().wait_for_completion()

factor_exposure_data = risk_report.get_factor_exposure(start_date=position_set.date, end_date=position_set.date)
factor_exposure_map = factor_exposure_data.to_dict(orient='records')[0]

In [None]:
def prepare_factor_constraints(factor_constraints, port_factor_exposure_map):
    """Given factor constraints defined on the total portfolio and factor exposures of the core portfolio, 
    return constraints to be applied on the hedge"""
    new_constraints = []
    for fc in factor_constraints:
        old = fc.max_exposure
        new = port_factor_exposure_map.get(fc.factor.name, 0) - fc.max_exposure
        print('Changing factor constraint for ', fc.factor.name, 'from ', old, 'to ', new)
        new_constraints.append(
            FactorConstraint(
                fc.factor,
                port_factor_exposure_map.get(fc.factor.name, 0) - fc.max_exposure
            )
        )
    return new_constraints

Now that you have a position set, you can get a hedge according to your liking. We have put in some sample settings below. 

In [None]:
hedge_universe = OptimizerUniverse(
    assets=[Asset.get(a, AssetIdentifier.BLOOMBERG_ID) for a in universe],
    explode_composites=True,
    exclude_corporate_actions_types=[CorporateActionsTypes.Mergers]
)

risk_model = FactorRiskModel.get(risk_model_id)

asset_constraints = [
    AssetConstraint(Asset.get('MSFT UW', AssetIdentifier.BLOOMBERG_ID), 0, 5),
    AssetConstraint(Asset.get('AAPL UW', AssetIdentifier.BLOOMBERG_ID), 0, 5)
]

# here, we have specified the constraints on factor exposure of the Total Optimized portfolio
factor_constraints = [
    FactorConstraint(risk_model.get_factor('Size'), 0),
    FactorConstraint(risk_model.get_factor('Market Sensitivity'), 0)
]

if apply_factor_constraints_on_total:
    hedge_factor_constraints = prepare_factor_constraints(factor_constraints, factor_exposure_map)
else:
    hedge_factor_constraints = factor_constraints
 
sector_constraints = [
    SectorConstraint('Energy', 0, 30),
    SectorConstraint('Health Care', 0, 30)
]
settings = OptimizerSettings(notional=hedge_notional_pct * position_set.reference_notional, # 40% of your original portfolio 
                             allow_long_short=False)
constraints = OptimizerConstraints(
    asset_constraints=asset_constraints,
    factor_constraints=hedge_factor_constraints,
    sector_constraints=sector_constraints,
)

strategy = OptimizerStrategy(
    initial_position_set=position_set,
    constraints=constraints,
    settings=settings,
    universe=hedge_universe,
    risk_model=risk_model,
    objective=OptimizerObjective.MINIMIZE_FACTOR_RISK
)

strategy.run(optimizer_type=OptimizerType.AXIOMA_PORTFOLIO_OPTIMIZER)

optimization = strategy.get_optimization(by_weight=True) # Returns just the optimization results as a PositionSet object
optimization.to_frame()

The Optimizer uses adjusted prices, while the portfolio analytics use unadjusted prices to offer users more fine-grained control on historical positions. We need to convert output from hedger to unadjusted prices to be able to use it in the portfolio analytics.

In [None]:
position_set.price(use_unadjusted_close_price=True,
                   weighting_strategy=PositionSetWeightingStrategy.Weight,
                   fallbackDate='5d')

optimization.price(use_unadjusted_close_price=True,
                   weighting_strategy=PositionSetWeightingStrategy.Weight,
                   fallbackDate='5d')

for p in optimization.positions:
    p.quantity *= -1
    p.add_tag('source', 'Optimization')

combined_pset = PositionSet(
    date=position_set.date,
    # take only quantity of the newly priced positions
    positions=[Position(identifier=p.identifier, asset_id=p.asset_id, quantity=p.quantity, tags=p.tags) for p in position_set.positions+optimization.positions]
)
combined_pset.to_frame()

In [None]:
port_manager.update_positions([combined_pset])
port_manager.schedule_reports(start_date=combined_pset.date, 
                              end_date=RelativeDate(rebalance_freq, combined_pset.date).apply_rule())

With our initial setup done and settings configured, we are now ready to launch a flow that will continuously optimize our portfolio at our desired frequency.

We will take th positions from the previous rebalance, utilize performance analytics to get the latest positions, and then optimize the portfolio again. We will then update the portfolio with the new positions and schedule reports for the next rebalance date.

This operation of moving your portfolio forward using performance analytics relies solely on availability of the underlying assets. 

In [None]:
port_manager = PortfolioManager(port_id)
start = position_set.date
max_end = RelativeDate("-1b", dt.date.today()).apply_rule(exchanges=['NYSE'])
start_time = time.time()
rebal = RelativeDate(rebalance_freq, start).apply_rule(exchanges=['NYSE'])

while rebal < max_end:
    print(f'Moving to rebalance date {rebal}')
    port_perf_report   = port_manager.get_performance_report({'source': 'Portfolio'})
    perf_report_job = port_perf_report.get_most_recent_job()
    print(f'Waiting for performance calculations till date {perf_report_job.end_date} to complete (job id {perf_report_job.job_id})')
    perf_report_job.wait_for_completion()
    
    latest_port_pos    = port_perf_report.get_portfolio_constituents(
        start_date=rebal, 
        end_date=rebal, 
        fields=['quantity', 'grossWeight'], 
        prefer_rebalance_positions=True,
        return_format=ReturnFormat.JSON
    )
    latest_port_exp    = port_perf_report.get_gross_exposure(start_date=rebal, end_date=rebal)['grossExposure'][0]
    latest_position_set = PositionSet(
                date=rebal,
                reference_notional=latest_port_exp,
                positions=[Position(
                    asset_id=p['assetId'],
                    identifier=p['assetId'],
                    # We recommend using gross weight to find your reference weight, like below
                    weight=copysign(p.get('grossWeight', 0), p.get('quantity', 0)),
                    tags=[{'source': 'Portfolio'}]
                ) for p in latest_port_pos]
            )
    print('Latest Portfolio Position set:')
    print(latest_position_set.to_frame(add_tags=True))
    settings = OptimizerSettings(notional=hedge_notional_pct * latest_position_set.reference_notional, # 30% of your original portfolio 
                                 allow_long_short=False)
    
    if factor_constraints and apply_factor_constraints_on_total:
        port_risk_report   = port_manager.get_factor_risk_report(risk_model_id=risk_model_id, tags={'source': 'Portfolio'})
        risk_report_job = port_risk_report.get_most_recent_job()
        print(f'Waiting for risk calculations till date {risk_report_job.end_date} to complete (job id {risk_report_job.job_id})')
        risk_report_job.wait_for_completion()
        
        factor_exposure_data = risk_report.get_factor_exposure(start_date=latest_position_set.date, end_date=latest_position_set.date)
        factor_exposure_map = factor_exposure_data.to_dict(orient='records')[0]
        hedge_factor_constraints = prepare_factor_constraints(factor_constraints, factor_exposure_map)
    else:
        hedge_factor_constraints = factor_constraints
    
    constraints = OptimizerConstraints(
        asset_constraints=asset_constraints,
        factor_constraints=hedge_factor_constraints,
        sector_constraints=sector_constraints,
    )
    strategy = OptimizerStrategy(
        initial_position_set=latest_position_set,
        constraints=constraints,
        settings=settings,
        universe=hedge_universe,
        risk_model=risk_model,
        objective=OptimizerObjective.MINIMIZE_FACTOR_RISK
    )
    print('Optimizing...')
    strategy.run(optimizer_type=OptimizerType.AXIOMA_PORTFOLIO_OPTIMIZER)
    print('Optimization complete')
    print('Optimized Position Set quantities:')
    print(strategy.get_optimized_position_set().to_frame())
    optimization = strategy.get_optimization(by_weight=True)
    print('Hedge position set:')
    print(optimization.to_frame(add_tags=True))
    optimization.price(
        use_unadjusted_close_price=True,
        weighting_strategy=PositionSetWeightingStrategy.Weight,
        fallbackDate='5d'
    )
    for p in optimization.positions:
        p.quantity *= -1
        p.add_tag('source', 'Optimization')
    latest_position_set.price(
        use_unadjusted_close_price=True,
        weighting_strategy=PositionSetWeightingStrategy.Weight,
        fallbackDate='5d'
    )
    combined_pset = PositionSet(
        date=optimization.date,
        positions=[Position(identifier=p.identifier, asset_id=p.asset_id, quantity=p.quantity, tags=p.tags) for p in latest_position_set.positions+optimization.positions]
    )
    print('Combined position set:')
    print(combined_pset.to_frame(add_tags=True))
    port_manager.update_positions([combined_pset])
    start = rebal
    rebal = min(max_end, RelativeDate(rebalance_freq, start).apply_rule())
    print(f'Scheduling reports to calculate performance till {rebal}...')
    port_manager.schedule_reports(start_date=start,
                                  end_date=rebal)
    time.sleep(2)

print(f'Done! Processing completed in {time.time() - start_time} seconds')

And that's it! You have successfully completed a basic run of our Quant Backtesting Workflow. 

For questions, please reach out to [Marquee Sales](mailto:gs-marquee-sales@gs.com)!