/
main.go
127 lines (94 loc) · 2.35 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"encoding/json"
"log"
"net"
"net/http"
"net/url"
"os"
"time"
"github.com/unrolled/render"
"github.com/dahernan/goreddit/api"
"github.com/dahernan/goreddit/sse"
"github.com/dahernan/goreddit/streaming"
)
var (
client *http.Client
r *render.Render
stream streaming.RedditStreamer
consumerKey string
secretKey string
port string
agent string
)
func init() {
client = NewHttpClientWithTimeout(2 * time.Second)
r = render.New(render.Options{
Directory: "templates", // Specify what path to load the templates from.
Extensions: []string{".tmpl", ".html"}, // Specify extensions to load for templates.
})
consumerKey = os.Getenv("CONSUMER_KEY")
secretKey = os.Getenv("SECRET_KEY")
port = os.Getenv("PORT")
agent = os.Getenv("AGENT")
reddit := api.NewReddit(client, agent, consumerKey, secretKey)
stream = Stream(reddit)
}
func Index(w http.ResponseWriter, req *http.Request) {
items := stream.Items()
first := items[0]
items = items[1:5]
data := map[string]interface{}{
"First": first,
"Items": items,
}
r.HTML(w, http.StatusOK, "index", data)
}
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", Index)
broker := sse.NewBroker()
mux.Handle("/events", broker)
go StreamingBroker(broker)
http.ListenAndServe(":"+port, mux)
}
func Stream(reddit api.RedditListing) streaming.RedditStreamer {
v := url.Values{}
v.Set("limit", "40")
stream := streaming.NewRedditStream(reddit.New, "videos", v)
return stream
}
func StreamingBroker(broker *sse.Broker) {
items := stream.Stream()
for {
select {
case it := <-items:
data, err := ItemToJson(it)
if err != nil {
log.Println("Error marshal item:", it, err)
}
broker.Send(data)
//fmt.Println("New Item: ", data)
// heroku proxy timesout if the connection does not have activity
// so send something to prevent it
case <-time.After(5 * time.Second):
broker.Send("{}")
}
}
}
func ItemToJson(it api.Item) (string, error) {
data, err := json.Marshal(it)
return string(data), err
}
func NewHttpClientWithTimeout(timeout time.Duration) *http.Client {
dialTimeout := func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, timeout)
}
transport := http.Transport{
Dial: dialTimeout,
}
client := http.Client{
Transport: &transport,
}
return &client
}