# Streaming tweets to a channel

## Imports

In [None]:
import (
    "encoding/json"
    "net"
    "net/http"
    "net/url"
    "strconv"
    "strings"
    "sync"
    "time"
    "fmt"
    "os"
    "context"
    
    "github.com/garyburd/go-oauth/oauth"
)

## Previously discussed types, values and functions

Twitter related types:

In [None]:
// Tweet is a single tweet.
type Tweet struct {
    Text string
    Terms []string
}

// TweetReader includes the info we need to access Twitter.
type TweetReader struct {
    ConsumerKey, ConsumerSecret, AccessToken, AccessSecret string
}

// NewTweetReader creates a new TweetReader with the given credentials.
func NewTweetReader(consumerKey, consumerSecret, accessToken, accessSecret string) *TweetReader {
    return &TweetReader{
        ConsumerKey:    consumerKey,
        ConsumerSecret: consumerSecret,
        AccessToken:    accessToken,
        AccessSecret:   accessSecret,
    }
}

HTTP client:

In [None]:
// Create a new HTTP client.
var connLock sync.Mutex
var conn net.Conn
client := &http.Client{
    Transport: &http.Transport{
        Dial: func(netw, addr string) (net.Conn, error) {
            connLock.Lock()
            defer connLock.Unlock()
            if conn != nil {
                conn.Close()
                conn = nil
            }
            netc, err := net.DialTimeout(netw, addr, 5*time.Second)
            if err != nil {
                return nil, err
            }
            conn = netc
            return netc, nil
        },
    },
}

Credentials:

In [None]:
// Create a new Tweet Reader.
consumerKey := "kgbOdbtngw1BJUL41QO7uwSKO"
consumerSecret := "iw0fqWGscqsUuuxgUBQsqYwteFxivfiUXX5qqN2ITqBXgloFL0"
accessToken := "104241306-PNR3msoakEhdyl7nweX9K2iSV5vfR00zJU6Mg4hE"
accessSecret := "A2Yvtl5sgpVdxwLyU91iEisamDRU7TFDwXKDM87peGFSQ"
r := NewTweetReader(consumerKey, consumerSecret, accessToken, accessSecret)

// Create oauth Credentials.
creds := &oauth.Credentials{
    Token:  r.AccessToken,
    Secret: r.AccessSecret,
}

// Create an oauth Client.
authClient := &oauth.Client{
    Credentials: oauth.Credentials{
        Token:  r.ConsumerKey,
        Secret: r.ConsumerSecret,
    },
}

## Streaming tweets

In a previous notebook, we manually performed a loop over a hardcoded number of tweets, printing out the collected tweets. However, really what we want is a process (i.e., a goroutine) that continually gathers the tracked tweets and communicates them to us. 

As we know from yesterday, we can communicate data between goroutines via channels. So, let's create a channel that will let us communicate collected tweets:

In [None]:
tweets := make(chan Tweet)

Let's define the terms that we are searching for again:

In [None]:
// Define the terms for our search.
terms := []string{"Trump", "Russia"}

And then we are ready to start:

1. A 1st goroutine that will handle the gathering of these tweets and the communication of the tweets to the channel, and
2. A 2nd goroutine that will read the tweets from that channel and print them out. 

*Note* - We will also create a context value that will allow us to stop the goroutine:

In [None]:
ctx, _ := context.WithTimeout(context.Background(),  2*time.Second)

In [None]:
fmt.Println("Start 1st goroutine to collect tweets...")
go func() {
    
    // Prepare the query.
    form := url.Values{"track": terms}
    formEnc := form.Encode()
    u, err := url.Parse("https://stream.twitter.com/1.1/statuses/filter.json")
    if err != nil {
        fmt.Println("Error parsing URL:", err)
    }
            
    // Prepare the request.
    req, err := http.NewRequest("POST", u.String(), strings.NewReader(formEnc))
    if err != nil {
        fmt.Println("creating filter request failed:", err)
        continue
    }
    req.Header.Set("Authorization", authClient.AuthorizationHeader(creds, "POST", u, form))
    req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    req.Header.Set("Content-Length", strconv.Itoa(len(formEnc)))
            
    // Execute the request.
    resp, err := client.Do(req)
    if err != nil {
        fmt.Println("Error getting response:", err)
        continue
    }
    if resp.StatusCode != http.StatusOK {
        fmt.Println("Unexpected HTTP status code:", resp.StatusCode)
        continue
    }
            
    // Decode the results.
    decoder := json.NewDecoder(resp.Body)
    for {
        var t Tweet
        if err := decoder.Decode(&t); err != nil {
            break
        }
        tweets <- t
    }
    resp.Body.Close()
}()

fmt.Println("Start a 2nd goroutine that prints the collected tweets...")
go func() {
    for {
        select {
            
        // Stop the goroutine.
        case <-ctx.Done():
            return
            
        // Print the tweets.
        case t := <-tweets:
            fmt.Println(t.Text)
        }
    }
}()

time.Sleep(3*time.Second)