Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

akka 기반으로 서버 수정 #6

Merged
merged 3 commits into from
Jan 5, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
name := "daewonHttp"

version := "1.0"

scalaVersion := "2.9.1"

name := "la-scala http"

version := "0.1"

scalaVersion := "2.10.0"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.1.0"
340 changes: 209 additions & 131 deletions src/main/scala/common/main.scala
Original file line number Diff line number Diff line change
@@ -1,171 +1,249 @@
/**
* la-scala static http server
* 참고: http://twitter.github.com/scala_school/concurrency.html
* la-scala http server
* source from http://doc.akka.io/docs/akka/2.1.0/scala/io.html
*/

/*
* This software is licensed under the Apache 2 license, quoted below.
*
* Copyright 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package daewon.http

import java.net._
import java.util.concurrent._
import java.util.Date
import scala.io._
import java.io._
package laScala.http

import akka.actor._
import akka.util.{ ByteString, ByteStringBuilder }
import java.net.InetSocketAddress

/**
* 서버 메인
* Excutors 스레드 풀 사용
*/
class HttpServer(docRoot: String, port: Int) {
val serverSocket = new ServerSocket(port)
val pool: ExecutorService = Executors.newCachedThreadPool

def run() {
try {
while (true) {
val socket = serverSocket.accept() // block here
pool.execute(new Handler(docRoot, socket))
}
} finally {
pool.shutdown()
}
}
}

/**
* Http 상수 모음
* HTTP 상수 모음
*/
object HttpConst {
private val htmlExt = Set("html", "htm")
val SP = " "
val CRLF = "\r\n"
val BODYDELIMITER = CRLF + CRLF

object Method {
val GET = "GET"
val POST = "POST"
val DELETE = "DELETE"
val PUT = "PUT"
}

def isHTMLPage(path: String) = htmlExt(path.toLowerCase().split("\\.").last)
object HttpConstants {
val SP = ByteString(" ")
val HT = ByteString("\t")
val CRLF = ByteString("\r\n")
val COLON = ByteString(":")
val PERCENT = ByteString("%")
val PATH = ByteString("/")
val QUERY = ByteString("?")
}

/**
* 응답 핸들러
* 200 OK, 404 NotFound 에 대한 처리를 담당
* http request 객체 및
* request Header 객체
*/
class Handler(docRoot: String, socket: Socket) extends Runnable {
val iterator = io.Source.fromInputStream(socket.getInputStream)(Codec.UTF8).getLines()
case class Request(meth: String, path: List[String], query: Option[String], httpver: String, headers: List[Header], body: Option[ByteString])
case class Header(name: String, value: String)

def run() {
try {
if (iterator.isEmpty){
log("cannot read input stream"); return
}
// 01. input stream으로부터 첫줄 읽어내서 메소드, 경로 등 파싱
val Array(method, path, version) = iterator.next().split(HttpConst.SP)
/**
* iteratee: http://www.haskell.org/haskellwiki/Iteratee_I/O
* 파서
*/
object HttpIteratees {
import HttpConstants._

def readRequest = {
for {
requestLine <- readRequestLine
(meth, (path, query), httpver) = requestLine
headers <- readHeaders
body <- readBody(headers)
} yield Request(meth, path, query, httpver, headers, body)
}

log(method, path, version)
def ascii(bytes: ByteString): String = bytes.decodeString("US-ASCII").trim

def readRequestLine = {
for {
meth <- IO takeUntil SP
uri <- readRequestURI
_ <- IO takeUntil SP // ignore the rest
httpver <- IO takeUntil CRLF
} yield (ascii(meth), uri, ascii(httpver))
}

val file = new File(docRoot + path)
val os = socket.getOutputStream
def readRequestURI = IO peek 1 flatMap {
case PATH => {
for {
path <- readPath
query <- readQuery
} yield (path, query)
}

method match {
case HttpConst.Method.GET => {
if (!file.exists()) NotFound(os)
else {
if (HttpConst.isHTMLPage(path)) OK(file, os)
else OKBinary(file, os)
}
}
case _ => throw new Error("unsupported method ")
}
} finally {
socket.close
case _ => {
sys.error("Not Implemented")
}
}
}

/*
* 응답 모음
* 응답 헤더 모음 쪽으로 리팩터링 필요
*/
abstract class Response {
val BUF_SIZE = 1024 * 1024

/**
* 날자는 매 응답마다 변경되어야 한다.
*/
def date = "Date: " + new Date().toString + HttpConst.CRLF

/**
* 파일을 메모리에 들지 않기 위해 사용
*/
def writeFileAsStream(file: File, os: OutputStream){
val fs = new FileInputStream(file)
val buf: Array[Byte] = new Array[Byte](BUF_SIZE)

try {
Stream.continually(fs.read(buf, 0, BUF_SIZE)).takeWhile(_ != -1).foreach( cnt => os.write(buf, 0, cnt))
} catch{
case e: IOException => log("can't write outputstream")
} finally{
fs.close
def readBody(headers: List[Header]) =
if (headers.exists(header => header.name == "Content-Length" || header.name == "Transfer-Encoding"))
IO.takeAll map (Some(_))
else
IO Done None

def readPath = {
def step(segments: List[String]): IO.Iteratee[List[String]] = IO peek 1 flatMap {
case PATH => IO drop 1 flatMap (_ => readUriPart(pathchar) flatMap (segment => step(segment :: segments)))
case _ => segments match {
case "" :: rest => IO Done rest.reverse
case _ => IO Done segments.reverse
}
}
step(Nil)
}
}

/**
* 404 에러 반환
*/
case class NotFound(os: OutputStream) extends Response {
val head = """HTTP/1.1 404 Not Found
Connection: Close""" + HttpConst.BODYDELIMITER
def readQuery: IO.Iteratee[Option[String]] = IO peek 1 flatMap {
case QUERY => IO drop 1 flatMap (_ => readUriPart(querychar) map (Some(_)))
case _ => IO Done None
}

log(head)
val alpha = Set.empty ++ ('a' to 'z') ++ ('A' to 'Z') map (_.toByte)
val digit = Set.empty ++ ('0' to '9') map (_.toByte)
val hexdigit = digit ++ (Set.empty ++ ('a' to 'f') ++ ('A' to 'F') map (_.toByte))
val subdelim = Set('!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=') map (_.toByte)
val pathchar = alpha ++ digit ++ subdelim ++ (Set(':', '@') map (_.toByte))
val querychar = pathchar ++ (Set('/', '?') map (_.toByte))

def readUriPart(allowed: Set[Byte]): IO.Iteratee[String] = for {
str <- IO takeWhile allowed map ascii
pchar <- IO peek 1 map (_ == PERCENT)
all <- if (pchar) readPChar flatMap (ch => readUriPart(allowed) map (str + ch + _)) else IO Done str
} yield all

def readPChar = IO take 3 map {
case Seq('%', rest @ _*) if rest forall hexdigit =>
java.lang.Integer.parseInt(rest map (_.toChar) mkString, 16).toChar
}

os.write( (head + date) getBytes )
def readHeaders = {
def step(found: List[Header]): IO.Iteratee[List[Header]] = {
IO peek 2 flatMap {
case CRLF => IO takeUntil CRLF flatMap (_ => IO Done found)
case _ => readHeader flatMap (header => step(header :: found))
}
}
step(Nil)
}

def readHeader = {
for {
name <- IO takeUntil COLON
value <- IO takeUntil CRLF flatMap readMultiLineValue
} yield Header(ascii(name), ascii(value))
}

def readMultiLineValue(initial: ByteString): IO.Iteratee[ByteString] = {
IO peek 1 flatMap {
case SP => IO takeUntil CRLF flatMap (bytes => readMultiLineValue(initial ++ bytes))
}
}
}

/**
* 200 OK with HTML 페이지
*/
case class OK(file: File, os: OutputStream) extends Response {
val head = """HTTP/1.1 200 OK
Connection: Close
Content-Type: text/html; charset=utf-8
""" + "Content-Length: " + file.length + HttpConst.BODYDELIMITER

log(head)

os.write(head getBytes)
this.writeFileAsStream(file, os)
* ok 응답
*/
object OKResponse {
import HttpConstants.CRLF

val okStatus = ByteString("HTTP/1.1 200 OK")
val contentType = ByteString("Content-Type: text/html; charset=utf-8")
val cacheControl = ByteString("Cache-Control: no-cache")
val date = ByteString("Date: ")
val server = ByteString("Server: Akka")
val contentLength = ByteString("Content-Length: ")
val connection = ByteString("Connection: ")
val keepAlive = ByteString("Keep-Alive")
val close = ByteString("Close")

def bytes(rsp: OKResponse) = {
new ByteStringBuilder ++=
okStatus ++= CRLF ++=
contentType ++= CRLF ++=
cacheControl ++= CRLF ++=
date ++= ByteString(new java.util.Date().toString) ++= CRLF ++=
server ++= CRLF ++=
contentLength ++= ByteString(rsp.body.length.toString) ++= CRLF ++=
connection ++= (if (rsp.keepAlive) keepAlive else close) ++= CRLF ++=
CRLF ++= rsp.body result
}
}

/**
* 200 OK 바이너리 (파일 다운로드)
*/
case class OKBinary(file: File, os: OutputStream) extends Response {
val head = """HTTP/1.1 200 OK
Connection: Close
""" + "Content-type : application/octet-stream; " + file.getName + HttpConst.CRLF + "Content-Length: " + file.length + HttpConst.CRLF + "Content-Disposition: attachment; " + file.getName + HttpConst.BODYDELIMITER

log(head)
case class OKResponse(body: ByteString, keepAlive: Boolean)

os.write(head getBytes)
this.writeFileAsStream(file, os)
/**
* http server companion object
*/
object HttpServer {
import HttpIteratees._

def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] = {
IO repeat {
for {
request <- readRequest
} yield {
val rsp = request match {
case Request("GET", "ping" :: Nil, _, _, headers, _) => {
OKResponse(ByteString("<p>pong</p>"), request.headers.exists {
case Header(n, v) => n.toLowerCase == "connection" && v.toLowerCase == "keep-alive" })
}
case req => {
OKResponse(ByteString("<p>" + req.toString + "</p>"), request.headers.exists {
case Header(n, v) => n.toLowerCase == "connection" && v.toLowerCase == "keep-alive" })
}
}
socket write OKResponse.bytes(rsp).compact
if (!rsp.keepAlive) socket.close()
}
}
}
}

/**
* 추후 로그 변경을 위해서
* http server main
*/
case class log(msg: String*) {
println( msg.mkString(", ") )
class HttpServer(port: Int) extends Actor {
val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher)

override def preStart {
IOManager(context.system) listen new InetSocketAddress(port)
}

def receive = {
case IO.NewClient(server) => {
val socket = server.accept()
state(socket) flatMap (_ => HttpServer.processRequest(socket))
}

case IO.Read(socket, bytes) => {
state(socket)(IO Chunk bytes)
}

case IO.Closed(socket, cause) => {
state(socket)(IO EOF)
state -= socket
}
}
}


/**
* Main
*/
object Main extends App {
val docRoot = "."
(new HttpServer(docRoot, 8080)).run
val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 8080
val system = ActorSystem()
val server = system.actorOf(Props(new HttpServer(port)))
}
Loading