Skip to content

Commit

Permalink
First pass at the code.
Browse files Browse the repository at this point in the history
  • Loading branch information
ieure committed Jan 4, 2010
1 parent d676f59 commit a46d7dd
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 33 deletions.
17 changes: 14 additions & 3 deletions project.clj
@@ -1,5 +1,16 @@
(defproject twidoop "0.0.1"
;; -*- coding: utf-8 -*-
;;
;; © 2009 Digg, Inc. All rights reserved.
;; Author: Ian Eure <ian@digg.com>
;;

(defproject twidoop "0.8.0"
:description "Read from the firehose, write to Hadoop"
:dependencies [[org.clojure/clojure "1.1.0-ALPHA-SNAPSHOT"]
:dependencies [[org.clojure/clojure "1.1.0-master-SNAPSHOT"]
[org.clojure/clojure-contrib "1.0-SNAPSHOT"]
[org.clojure/clojure-http-client "1.0.0-SNAPSHOT"]])
[clojure-http-client "1.0.0-SNAPSHOT"]
[org.apache.mahout.hadoop/hadoop-core "0.20.1"]
[commons-logging "1.1.1"]
[commons-cli "1.2"]]
:dev-dependencies [[org.clojure/swank-clojure "1.0"]]
:main twidoop)
30 changes: 0 additions & 30 deletions src/digg/core/twidoop.clj

This file was deleted.

89 changes: 89 additions & 0 deletions src/twidoop.clj
@@ -0,0 +1,89 @@
;; -*- coding: utf-8 -*-
;;
;; © 2009 Digg, Inc. All rights reserved.
;; Author: Ian Eure <ian@digg.com>
;;

(ns twidoop
(:require [clojure.http.client :as http])
(:use [clojure.contrib.command-line :only (with-command-line)])
(:import (org.apache.hadoop.fs FileSystem LocalFileSystem Path)
(org.apache.hadoop.conf Configuration)
(java.util Date)
(java.text SimpleDateFormat))
(:gen-class))

(def #^{:tag FileSystem
:doc "The HDFS FileSystem instance to write to."}
*filesystem* nil)

(def #^{:doc "Replica count for the output."}
*replicas* 1)

(def #^{:doc "The block size of the output."}
*block-size* (* 1024 1024 16))

(defn stream-url
"Return the Twitter stream URL to read statuses from.
Type can be `sample' or `firehose'."
([user pass]
(stream-url user pass "sample"))
([user pass type]
(format "http://%s:%s@stream.twitter.com/1/statuses/%s.json"
user pass type)))

(defn create-file
"Create a new file on HDFS, and return a new FSDataOutputStream."
([name]
(create-file name true 512 *replicas* (* 1024 1024 *block-size*)))

([name overwrite buffer replicas block-size]
(.create *filesystem*
(or (and (instance? Path name) name) (Path. name))
overwrite buffer replicas block-size)))

(defn today []
"Return today's date in YYYY-MM-DD format."
(. (SimpleDateFormat. "yyyy-MM-dd") format (Date.)))

(defn get-output
"Return a hash-map of the date and file we're writing to."
[path]
{:date (today)
:path path
:file (create-file (format "%s/statuses-%s.json" path (today)))})

(defn stale? [{date :date}]
"Is the current output stale?"
(not (= (today) date)))

(defn save-statuses [url out-path]
"Save Twitter statuses into HDFS."
(let [output (atom (get-output out-path))]
(doseq [status (:body-seq (http/request url))]
(do (.writeBytes (:file @output) (str status "\0"))
(print ".")
(flush))

(when (stale? @output)
(do (print (format "\n%s -> " (:date @output)))
(.close (:file @output))
(reset! output (get-output (:path @output)))
(println (:date @output)))))))

(defn -main [& args]
(with-command-line args
"twidoop -- Stream the Twitter firehose into Hadoop."
[[output o "Output here on HDFS" "/firehose"]
[hdfs "HDFS to connect to" "hdfs://localhost:9000"]
[block-size b "HDFS block size (in megabytes)" 16]
[replicas r "HDFS replica count" 1]
[type t "Type of stream to read from: sample or firehose" "sample"]
[user u "Twitter username"]
[pass p "Twitter password"]]
(binding [*filesystem* (FileSystem/get (doto (Configuration.)
(.set "fs.default.name" hdfs)))
*block-size* (Integer. block-size)
*replicas* (Integer. replicas)]
(save-statuses (stream-url user pass type) output))))
4 changes: 4 additions & 0 deletions twidoop
@@ -0,0 +1,4 @@
#! /bin/sh

LIBS=`find lib -type f -name \*.jar -print0 | tr '\0' :`
exec java -cp ${LIBS}twidoop.jar twidoop $*

0 comments on commit a46d7dd

Please sign in to comment.