-
Notifications
You must be signed in to change notification settings - Fork 787
/
EntityEncoder.scala
247 lines (210 loc) · 8.97 KB
/
EntityEncoder.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
/*
* Copyright 2013 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.http4s
import cats.Contravariant
import cats.Show
import cats.data.NonEmptyList
import cats.effect.Sync
import cats.syntax.all._
import fs2.Chunk
import fs2.Stream
import fs2.io.file.Files
import fs2.io.readInputStream
import org.http4s.Charset.`UTF-8`
import org.http4s.headers._
import org.http4s.multipart.Multipart
import org.http4s.multipart.MultipartEncoder
import scodec.bits.ByteVector
import java.io._
import java.nio.CharBuffer
import java.nio.file.Path
import scala.annotation.implicitNotFound
@implicitNotFound(
"Cannot convert from ${A} to an Entity, because no EntityEncoder[${F}, ${A}] instance could be found."
)
trait EntityEncoder[F[_], A] { self =>
/** Convert the type `A` to an [[Entity]] in the effect type `F` */
def toEntity(a: A): Entity[F]
/** Headers that may be added to a [[Message]]
*
* Examples of such headers would be Content-Type.
* __NOTE:__ The Content-Length header will be generated from the resulting Entity and thus should not be added.
*/
def headers: Headers
/** Make a new [[EntityEncoder]] using this type as a foundation */
def contramap[B](f: B => A): EntityEncoder[F, B] =
new EntityEncoder[F, B] {
override def toEntity(a: B): Entity[F] = self.toEntity(f(a))
override def headers: Headers = self.headers
}
/** Get the [[org.http4s.headers.`Content-Type`]] of the body encoded by this [[EntityEncoder]],
* if defined the headers
*/
def contentType: Option[`Content-Type`] = headers.get[`Content-Type`]
/** Get the [[Charset]] of the body encoded by this [[EntityEncoder]], if defined the headers */
def charset: Option[Charset] = headers.get[`Content-Type`].flatMap(_.charset)
/** Generate a new EntityEncoder that will contain the `Content-Type` header */
def withContentType(tpe: `Content-Type`): EntityEncoder[F, A] =
new EntityEncoder[F, A] {
override def toEntity(a: A): Entity[F] = self.toEntity(a)
override val headers: Headers = self.headers.put(tpe)
}
}
object EntityEncoder {
private val DefaultChunkSize = 4096
/** summon an implicit [[EntityEncoder]] */
def apply[F[_], A](implicit ev: EntityEncoder[F, A]): EntityEncoder[F, A] = ev
/** Create a new [[EntityEncoder]] */
def encodeBy[F[_], A](hs: Headers)(f: A => Entity[F]): EntityEncoder[F, A] =
new EntityEncoder[F, A] {
override def toEntity(a: A): Entity[F] = f(a)
override def headers: Headers = hs
}
/** Create a new [[EntityEncoder]] */
def encodeBy[F[_], A](hs: Header.ToRaw*)(f: A => Entity[F]): EntityEncoder[F, A] = {
val hdrs = if (hs.nonEmpty) Headers(hs: _*) else Headers.empty
encodeBy(hdrs)(f)
}
/** Create a new [[EntityEncoder]]
*
* This constructor is a helper for types that can be serialized synchronously, for example a String.
*/
def simple[F[_], A](hs: Header.ToRaw*)(toChunk: A => Chunk[Byte]): EntityEncoder[F, A] =
encodeBy(hs: _*) { a =>
val c = toChunk(a)
Entity[F](Stream.chunk(c), Some(c.size.toLong))
}
/** Encodes a value from its Show instance. Too broad to be implicit, too useful to not exist. */
def showEncoder[F[_], A](implicit
charset: Charset = `UTF-8`,
show: Show[A],
): EntityEncoder[F, A] = {
val hdr = `Content-Type`(MediaType.text.plain).withCharset(charset)
simple[F, A](hdr)(a => Chunk.array(show.show(a).getBytes(charset.nioCharset)))
}
def emptyEncoder[F[_], A]: EntityEncoder[F, A] =
new EntityEncoder[F, A] {
def toEntity(a: A): Entity[F] = Entity.empty
def headers: Headers = Headers.empty
}
/** A stream encoder is intended for streaming, and does not calculate its
* bodies in advance. As such, it does not calculate the Content-Length in
* advance. This is for use with chunked transfer encoding.
*/
implicit def streamEncoder[F[_], A](implicit
W: EntityEncoder[F, A]
): EntityEncoder[F, Stream[F, A]] =
new EntityEncoder[F, Stream[F, A]] {
override def toEntity(a: Stream[F, A]): Entity[F] =
Entity(a.flatMap(W.toEntity(_).body))
override def headers: Headers =
W.headers.get[`Transfer-Encoding`] match {
case Some(transferCoding) if transferCoding.hasChunked =>
W.headers
case _ =>
W.headers.add(`Transfer-Encoding`(TransferCoding.chunked.pure[NonEmptyList]))
}
}
implicit def unitEncoder[F[_]]: EntityEncoder[F, Unit] =
emptyEncoder[F, Unit]
implicit def stringEncoder[F[_]](implicit
charset: Charset = `UTF-8`
): EntityEncoder[F, String] = {
val hdr = `Content-Type`(MediaType.text.plain).withCharset(charset)
simple(hdr)(s => Chunk.array(s.getBytes(charset.nioCharset)))
}
implicit def charArrayEncoder[F[_]](implicit
charset: Charset = `UTF-8`
): EntityEncoder[F, Array[Char]] =
stringEncoder[F].contramap(new String(_))
implicit def chunkEncoder[F[_]]: EntityEncoder[F, Chunk[Byte]] =
simple(`Content-Type`(MediaType.application.`octet-stream`))(identity)
implicit def byteArrayEncoder[F[_]]: EntityEncoder[F, Array[Byte]] =
chunkEncoder[F].contramap(Chunk.array[Byte])
implicit def byteVectorEncoder[F[_]]: EntityEncoder[F, ByteVector] =
chunkEncoder[F].contramap(Chunk.byteVector)
/** Encodes an entity body. Chunking of the stream is preserved. A
* `Transfer-Encoding: chunked` header is set, as we cannot know
* the content length without running the stream.
*/
implicit def entityBodyEncoder[F[_]]: EntityEncoder[F, EntityBody[F]] =
encodeBy(`Transfer-Encoding`(TransferCoding.chunked.pure[NonEmptyList])) { body =>
Entity(body, None)
}
// TODO if Header moves to Entity, can add a Content-Disposition with the filename
@deprecated("Use pathEncoder with fs2.io.file.Path", "0.23.5")
implicit def fileEncoder[F[_]: Files]: EntityEncoder[F, File] =
pathEncoder.contramap(f => fs2.io.file.Path.fromNioPath(f.toPath()))
// TODO if Header moves to Entity, can add a Content-Disposition with the filename
@deprecated("Use pathEncoder with fs2.io.file.Path", "0.23.5")
implicit def filePathEncoder[F[_]: Files]: EntityEncoder[F, Path] =
pathEncoder.contramap(p => fs2.io.file.Path.fromNioPath(p))
// TODO if Header moves to Entity, can add a Content-Disposition with the filename
implicit def pathEncoder[F[_]: Files]: EntityEncoder[F, fs2.io.file.Path] =
encodeBy[F, fs2.io.file.Path](`Transfer-Encoding`(TransferCoding.chunked)) { p =>
Entity(Files[F].readAll(p))
}
implicit def inputStreamEncoder[F[_]: Sync, IS <: InputStream]: EntityEncoder[F, F[IS]] =
entityBodyEncoder[F].contramap { (in: F[IS]) =>
readInputStream[F](in.widen[InputStream], DefaultChunkSize)
}
// TODO parameterize chunk size
implicit def readerEncoder[F[_], R <: Reader](implicit
F: Sync[F],
charset: Charset = `UTF-8`,
): EntityEncoder[F, F[R]] =
entityBodyEncoder[F].contramap { (fr: F[R]) =>
// Shared buffer
val charBuffer = CharBuffer.allocate(DefaultChunkSize)
def readToBytes(r: Reader): F[Option[Chunk[Byte]]] =
for {
// Read into the buffer
readChars <- F.blocking(r.read(charBuffer))
} yield {
// Flip to read
charBuffer.flip()
if (readChars < 0) None
else if (readChars == 0) Some(Chunk.empty)
else {
// Encode to bytes according to the charset
val bb = charset.nioCharset.encode(charBuffer)
// Read into a Chunk
val b = new Array[Byte](bb.remaining())
bb.get(b)
Some(Chunk.array(b))
}
}
def useReader(r: Reader) =
Stream
.eval(readToBytes(r))
.repeat
.unNoneTerminate
.flatMap(Stream.chunk[F, Byte])
// The reader is closed at the end like InputStream
Stream.bracket(fr)(r => F.delay(r.close())).flatMap(useReader)
}
implicit def multipartEncoder[F[_]]: EntityEncoder[F, Multipart[F]] =
new MultipartEncoder[F]
implicit def entityEncoderContravariant[F[_]]: Contravariant[EntityEncoder[F, *]] =
new Contravariant[EntityEncoder[F, *]] {
override def contramap[A, B](r: EntityEncoder[F, A])(f: (B) => A): EntityEncoder[F, B] =
r.contramap(f)
}
implicit def serverSentEventEncoder[F[_]]: EntityEncoder[F, EventStream[F]] =
entityBodyEncoder[F]
.contramap[EventStream[F]](_.through(ServerSentEvent.encoder))
.withContentType(`Content-Type`(MediaType.`text/event-stream`))
}