Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tpm: fix tpm get_quote() #55

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/python/cc_quote_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,28 @@ def main():
help="Output format: raw/human. Default raw.",
type=out_format_validator
)
parser.add_argument(
"--pcr-selection",
type=str,
default="sha256:1,2,10",
help="PCR Selection to generate quote",
dest="pcr_selection"
)
parser.add_argument("--ak-context", type=str, help="Path to ak context", dest="ak_context")
args = parser.parse_args()

nonce = make_nounce()
LOG.info("demo random number in base64: %s", nonce.decode("utf-8"))
userdata = make_userdata()
LOG.info("demo user data in base64: %s", userdata.decode("utf-8"))
extra_args = {}
extra_args["pcr_selection"] = args.pcr_selection
extra_args["ak_context"] = args.ak_context

quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata)
if ConfidentialVM.detect_cc_type() == CCTrustedApi.TYPE_CC_TPM:
quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata, extra_args)
else:
quote = CCTrustedVmSdk.inst().get_cc_report(nonce, userdata)
if quote is not None:
quote.dump(args.out_format == OUT_FORMAT_RAW)
else:
Expand Down
98 changes: 93 additions & 5 deletions src/python/cctrusted_vm/cvm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
from cctrusted_base.tdx.common import TDX_VERSION_1_0, TDX_VERSION_1_5
from cctrusted_base.tdx.rtmr import TdxRTMR
from cctrusted_base.tdx.quote import TdxQuoteReq10, TdxQuoteReq15, TdxQuote, TdxQuoteReq
from cctrusted_base.tpm.pcr import TpmPCR
from cctrusted_base.tpm.quote import Tpm2Quote
from cctrusted_base.tdx.report import TdxReportReq10, TdxReportReq15
from tpm2_pytss import ESAPI
from tpm2_pytss.types import TPML_PCR_SELECTION, TPMS_CONTEXT, TPM2B_DATA


LOG = logging.getLogger(__name__)

Expand Down Expand Up @@ -144,6 +149,10 @@ def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcRepor
Returns:
The ``CcReport`` object.
"""
# In tpm case, skip get report through configfs-tsm
if self.cc_type == CCTrustedApi.TYPE_CC_TPM:
return None

if not os.path.exists(self.tsm_prefix):
return None

Expand Down Expand Up @@ -248,9 +257,6 @@ def inst():
LOG.error("Fail to initialize the confidential VM.")
return ConfidentialVM._inst

from tpm2_pytss import ESAPI
from cctrusted_base.tpm.pcr import TpmPCR

class TpmVM(ConfidentialVM):

DEFAULT_TPM_DEVICE_NODE="/dev/tpm0"
Expand All @@ -265,26 +271,108 @@ def __init__(self, dev_node=DEFAULT_TPM_DEVICE_NODE):
def default_algo_id(self):
return TcgAlgorithmRegistry.TPM_ALG_SHA256

@property
def version(self):
return "TPM2"

def process_cc_report(self, report_data=None) -> bool:
"""
For TPM, we do not need to get integrited measurement register
"""
for index in range(24):
_, _, digests = self._esapi.pcr_read("sha256:%d" % index)
_, _, digests = self._esapi.pcr_read(f"sha256:{index}")
assert digests.count == 1
self._imrs[index] = TpmPCR(index, bytes.fromhex(str(digests.digests[0])))
return True

def process_eventlog(self) -> bool:
"""
Process TPM event logs from /sys/kernel/security/tpm0/binary_bios_measurements
"""
try:
with open(TpmVM.BIOS_MEAUSREMENT, "rb") as f:
self._boot_time_event_log = f.read()
assert len(self._boot_time_event_log) > 0
except (PermissionError, OSError):
LOG.error("Need root permission to open file %s", TdxVM.BIOS_MEAUSREMENT)
LOG.error("Need root permission to open file %s", TpmVM.BIOS_MEAUSREMENT)
return False
return True

def get_cc_report(self, nonce: bytearray, data: bytearray, extraArgs) -> CcReport:
"""
Fetch TPM quote with user provided attestation key
"""
pcr_selection = None
ak_context_path = None
ak_handle = None
input_data = None

# Get user defined params including attestation key context and PCRSelections
if extraArgs is not None and "pcr_selection" in extraArgs.keys() \
and "ak_context" in extraArgs.keys():
try:
pcr_selection = TPML_PCR_SELECTION.parse(extraArgs["pcr_selection"])
except ValueError:
LOG.error("Invalid PCRSelection provided for quote generation")
return None
ak_context_path = extraArgs["ak_context"]
else:
LOG.error(
"Required params(pcr_selection or ak_context) not provided for quote generation.")
return None

# Prepare user defined data which could include nonce
if nonce is not None:
nonce = base64.b64decode(nonce, validate=True)
if data is not None:
data = base64.b64decode(data, validate=True)

# the algorithm to concatenate nonce and user data will depend on
# algorithm defined in the pcr_selection
algo = extraArgs["pcr_selection"].split(":")[0]
if algo == "sha1":
hash_algo = hashlib.sha1()
elif algo == "sha256":
hash_algo = hashlib.sha256()
elif algo == "sha384":
hash_algo = hashlib.sha384()
elif algo == "sha512":
hash_algo = hashlib.sha512()
else:
LOG.error("Unsupported hash algorithm.")
return None

LOG.info("Calculate report data by nonce and user data")
if nonce is not None:
hash_algo.update(bytes(nonce))
if data is not None:
hash_algo.update(bytes(data))
input_data = hash_algo.digest()

# Open attestation key context from file
if os.path.exists(ak_context_path):
try:
with open(ak_context_path, 'rb') as ak_file:
ak_context = ak_file.read()
ak_handle = self._esapi.context_load(TPMS_CONTEXT.from_tools(ak_context))
except(PermissionError, OSError):
LOG.error("Lack of permission to open file %s for ak context.",
ak_context_path)
else:
LOG.error("ak_context not found under path %s", ak_context_path)

# Generate quote and signature
quote, signature = self._esapi.quote(ak_handle, pcr_selection, TPM2B_DATA(input_data))

# Flush the loaded context after use
self._esapi.flush_context(ak_handle)

# Save the tpm quote
structured_quote = Tpm2Quote(None, CCTrustedApi.TYPE_CC_TPM)
structured_quote.set_quoted_data(quote)
structured_quote.set_sig(signature)
return structured_quote

class TdxVM(ConfidentialVM):

DEVICE_NODE_PATH = {
Expand Down
Loading