forked from jmcardon/tsec
-
Notifications
You must be signed in to change notification settings - Fork 3
/
JWTSig.scala
110 lines (87 loc) · 4.08 KB
/
JWTSig.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package tsec.jws.signature
import java.time.Instant
import cats.effect.Sync
import cats.syntax.all._
import tsec.common._
import tsec.jws.{JWSJWT, JWSSerializer}
import tsec.jwt.JWTClaims
import tsec.jwt.algorithms.JWTSigAlgo
import tsec.signature._
import tsec.signature.jca._
case class JWTSig[A](header: JWSSignedHeader[A], body: JWTClaims, signature: CryptoSignature[A])
extends JWSJWT[A, CryptoSignature] {
def toEncodedString(implicit hs: JWSSerializer[JWSSignedHeader[A]]): String =
hs.toB64URL(header) + "." + JWTClaims.toB64URL(body) + "." + signature.toB64UrlString
}
object JWTSig {
def signAndBuild[F[_]: Sync, A: JWTSigAlgo](
body: JWTClaims,
sigPrivateKey: SigPrivateKey[A]
)(implicit sigCV: JWSSigCV[F, A]): F[JWTSig[A]] = sigCV.signAndBuild(JWSSignedHeader[A](), body, sigPrivateKey)
def signAndBuild[F[_]: Sync, A: JWTSigAlgo](
header: JWSSignedHeader[A],
body: JWTClaims,
sigPrivateKey: SigPrivateKey[A]
)(implicit sigCV: JWSSigCV[F, A]): F[JWTSig[A]] = sigCV.signAndBuild(header, body, sigPrivateKey)
def signToString[F[_]: Sync, A: JWTSigAlgo](
body: JWTClaims,
sigPrivateKey: SigPrivateKey[A]
)(implicit sigCV: JWSSigCV[F, A]): F[String] = sigCV.signToString(JWSSignedHeader[A](), body, sigPrivateKey)
def signToString[F[_]: Sync, A: JWTSigAlgo](
header: JWSSignedHeader[A],
body: JWTClaims,
sigPrivateKey: SigPrivateKey[A]
)(implicit sigCV: JWSSigCV[F, A]): F[String] = sigCV.signToString(header, body, sigPrivateKey)
def verifyK[F[_], A: JWTSigAlgo](
jwt: String,
pubKey: SigPublicKey[A]
)(implicit F: Sync[F], sigCV: JWSSigCV[F, A]): F[JWTSig[A]] =
F.delay(Instant.now()).flatMap(sigCV.verify(jwt, pubKey, _))
def verifyC[F[_], A: JWTSigAlgo](
jwt: String,
cert: SigCertificate[A]
)(implicit F: Sync[F], sigCV: JWSSigCV[F, A]): F[JWTSig[A]] =
F.delay(Instant.now()).flatMap(sigCV.verifyCert(jwt, cert, _))
def verifyKI[F[_], A: JWTSigAlgo](
jwt: JWTSig[A],
extract: SigPublicKey[A]
)(implicit F: Sync[F], sigCV: JWSSigCV[F, A], hs: JWSSerializer[JWSSignedHeader[A]]): F[JWTSig[A]] =
verifyK[F, A](jwt.toEncodedString, extract)
def verifyCI[F[_], A: JWTSigAlgo](
jwt: JWTSig[A],
extract: SigCertificate[A]
)(implicit F: Sync[F], sigCV: JWSSigCV[F, A], hs: JWSSerializer[JWSSignedHeader[A]]): F[JWTSig[A]] =
verifyC[F, A](jwt.toEncodedString, extract)
}
object JWTSigImpure {
def signAndBuild[A: JWTSigAlgo](body: JWTClaims, sigPrivateKey: SigPrivateKey[A])(
implicit sigCV: JWSSigCV[SigErrorM, A]
): SigErrorM[JWTSig[A]] = sigCV.signAndBuild(JWSSignedHeader[A](), body, sigPrivateKey)
def signAndBuild[A: JWTSigAlgo](header: JWSSignedHeader[A], body: JWTClaims, sigPrivateKey: SigPrivateKey[A])(
implicit sigCV: JWSSigCV[SigErrorM, A]
): SigErrorM[JWTSig[A]] = sigCV.signAndBuild(header, body, sigPrivateKey)
def signToString[A: JWTSigAlgo](header: JWSSignedHeader[A], body: JWTClaims, sigPrivateKey: SigPrivateKey[A])(
implicit sigCV: JWSSigCV[SigErrorM, A]
): SigErrorM[String] = sigCV.signToString(header, body, sigPrivateKey)
def signToString[A: JWTSigAlgo](body: JWTClaims, sigPrivateKey: SigPrivateKey[A])(
implicit sigCV: JWSSigCV[SigErrorM, A]
): SigErrorM[String] = sigCV.signToString(JWSSignedHeader[A](), body, sigPrivateKey)
def verifyK[A: JWTSigAlgo](
jwt: String,
pubKey: SigPublicKey[A]
)(implicit sigCV: JWSSigCV[SigErrorM, A]): SigErrorM[JWTSig[A]] = sigCV.verify(jwt, pubKey, Instant.now())
def verifyC[A: JWTSigAlgo](
jwt: String,
cert: SigCertificate[A]
)(implicit sigCV: JWSSigCV[SigErrorM, A]): SigErrorM[JWTSig[A]] = sigCV.verifyCert(jwt, cert, Instant.now())
def verifyKI[A: JWTSigAlgo](
jwt: JWTSig[A],
extract: SigPublicKey[A]
)(implicit sigCV: JWSSigCV[SigErrorM, A], hs: JWSSerializer[JWSSignedHeader[A]]): SigErrorM[JWTSig[A]] =
verifyK[A](jwt.toEncodedString, extract)
def verifyCI[A: JWTSigAlgo](
jwt: JWTSig[A],
cert: SigCertificate[A]
)(implicit sigCV: JWSSigCV[SigErrorM, A]): SigErrorM[JWTSig[A]] =
sigCV.verifyCert(jwt.toEncodedString, cert, Instant.now())
}