ci: add SARIF fingerprints before upload#67
Conversation
PR SummaryMedium Risk Overview Refactors the upload script to reuse a shared Reviewed by Cursor Bugbot for commit 90564ce. Bugbot is set up for automated code reviews on this repo. Configure here. |
8f42a65 to
90564ce
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix prepared a fix for the issue found in the latest run.
- ✅ Fixed: Polling errors caught by upload's except handler
- Moved SARIF status polling outside the upload HTTPError handler so polling failures are no longer misclassified as upload errors.
Preview (c09b149363)
diff --git a/scripts/upload-sarif-to-code-scanning.py b/scripts/upload-sarif-to-code-scanning.py
--- a/scripts/upload-sarif-to-code-scanning.py
+++ b/scripts/upload-sarif-to-code-scanning.py
@@ -6,10 +6,12 @@
import argparse
import base64
import gzip
+import hashlib
import json
import os
from pathlib import Path
import sys
+import time
import urllib.error
import urllib.request
@@ -36,11 +38,112 @@
return parser.parse_args()
+def request_headers() -> dict[str, str]:
+ return {
+ "Accept": "application/vnd.github+json",
+ "Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}",
+ "Content-Type": "application/json",
+ "X-GitHub-Api-Version": "2022-11-28",
+ }
+
+
+def artifact_uri(run: dict[str, object], artifact_location: object) -> object:
+ if not isinstance(artifact_location, dict):
+ return None
+ uri = artifact_location.get("uri")
+ if uri:
+ return uri
+ index = artifact_location.get("index")
+ artifacts = run.get("artifacts")
+ if not isinstance(index, int) or not isinstance(artifacts, list):
+ return None
+ if index < 0 or index >= len(artifacts):
+ return None
+ artifact = artifacts[index]
+ if not isinstance(artifact, dict):
+ return None
+ location = artifact.get("location")
+ if not isinstance(location, dict):
+ return None
+ return location.get("uri")
+
+
+def result_location_key(result: dict[str, object], run: dict[str, object]) -> dict[str, object]:
+ locations = result.get("locations")
+ if not isinstance(locations, list) or not locations:
+ return {}
+ first_location = locations[0]
+ if not isinstance(first_location, dict):
+ return {}
+ physical_location = first_location.get("physicalLocation")
+ if not isinstance(physical_location, dict):
+ return {}
+ region = physical_location.get("region")
+ if not isinstance(region, dict):
+ region = {}
+
+ return {
+ "uri": artifact_uri(run, physical_location.get("artifactLocation")),
+ "startLine": region.get("startLine"),
+ "startColumn": region.get("startColumn"),
+ "endLine": region.get("endLine"),
+ "endColumn": region.get("endColumn"),
+ }
+
+
+def result_fingerprint(result: dict[str, object], run: dict[str, object]) -> str:
+ key = {
+ "ruleId": result.get("ruleId"),
+ "message": result.get("message"),
+ "location": result_location_key(result, run),
+ }
+ encoded = json.dumps(key, sort_keys=True, separators=(",", ":")).encode("utf-8")
+ return hashlib.sha256(encoded).hexdigest()
+
+
+def sarif_upload_bytes(path: Path) -> bytes:
+ sarif = json.loads(path.read_text(encoding="utf-8"))
+ for run in sarif.get("runs", []):
+ if not isinstance(run, dict):
+ continue
+ for result in run.get("results", []):
+ if not isinstance(result, dict):
+ continue
+ if result.get("partialFingerprints"):
+ continue
+ result["partialFingerprints"] = {
+ "primaryLocationLineHash": result_fingerprint(result, run)
+ }
+ return json.dumps(sarif, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
+
+
+def wait_for_sarif_processing(sarif_id: str) -> None:
+ deadline = time.monotonic() + 120
+ status_url = (
+ f"{os.environ['GITHUB_API_URL']}/repos/{os.environ['GITHUB_REPOSITORY']}"
+ f"/code-scanning/sarifs/{sarif_id}"
+ )
+ while True:
+ request = urllib.request.Request(status_url, headers=request_headers())
+ with urllib.request.urlopen(request) as response:
+ status_body = json.loads(response.read().decode("utf-8"))
+ processing_status = status_body.get("processing_status")
+ if processing_status == "complete":
+ print(json.dumps(status_body))
+ return
+ if processing_status == "failed":
+ raise RuntimeError(f"SARIF processing failed: {json.dumps(status_body)}")
+ if time.monotonic() >= deadline:
+ raise TimeoutError(f"Timed out waiting for SARIF processing: {json.dumps(status_body)}")
+ print(f"SARIF processing status is {processing_status}; waiting...")
+ time.sleep(5)
+
+
def main() -> int:
args = parse_args()
- sarif_payload = base64.b64encode(gzip.compress(args.sarif_file.read_bytes())).decode(
- "ascii"
- )
+ sarif_payload = base64.b64encode(
+ gzip.compress(sarif_upload_bytes(args.sarif_file))
+ ).decode("ascii")
body = {
"commit_sha": os.environ["GITHUB_SHA"],
@@ -55,16 +158,12 @@
f"{os.environ['GITHUB_API_URL']}/repos/{os.environ['GITHUB_REPOSITORY']}/code-scanning/sarifs",
data=json.dumps(body).encode("utf-8"),
method="POST",
- headers={
- "Accept": "application/vnd.github+json",
- "Authorization": f"Bearer {os.environ['GITHUB_TOKEN']}",
- "Content-Type": "application/json",
- "X-GitHub-Api-Version": "2022-11-28",
- },
+ headers=request_headers(),
)
try:
with urllib.request.urlopen(request) as response:
- print(response.read().decode("utf-8"))
+ response_body = json.loads(response.read().decode("utf-8"))
+ print(json.dumps(response_body))
except urllib.error.HTTPError as error:
response_body = error.read().decode("utf-8")
response_body_lower = response_body.lower()
@@ -77,6 +176,9 @@
return 0
sys.stderr.write(response_body)
raise
+ sarif_id = response_body.get("id")
+ if sarif_id:
+ wait_for_sarif_processing(str(sarif_id))
return 0You can send follow-ups to the cloud agent here.
Reviewed by Cursor Bugbot for commit 90564ce. Configure here.

Summary
partialFingerprintsto SARIF results that lack them before direct Code Scanning API uploadVerification
python3 -m py_compile scripts/upload-sarif-to-code-scanning.pyactionlint .github/workflows/ci.ymlgit diff --checkFollow-up to #66 and evalops/.github#32.