#Streaming from Twitter to a Tinybird Data Source

Filter tweets using a list of tracking words. Stream the tweets to a Tinybird Data Source.



In [1]:
import tweepy
import time
from threading import Timer
import requests
import json
import sys

from io import StringIO
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from email.utils import parsedate_to_datetime

Get the access codes you need from your app on [Twitter's Developer Platform](https://developer.twitter.com) and the token from the [Tinybird](https://ui.tinybird.co/login) workspace where you want to put the Data Source of tweets.

In [8]:
TWITTER_API_KEY = ''
TWITTER_API_KEY_SECRET = ''
TWITTER_ACCESS_TOKEN = ''
TWITTER_ACCESS_TOKEN_SECRET = ''
TB_TOKEN = ''

Set up your variables, including the words to look for in tweets. We use [tweepy.Stream](https://docs.tweepy.org/en/stable/stream.html?highlight=.Stream) to filter realtime tweets on the list of keywords.

In [3]:
TB_HOST = 'https://api.tinybird.co/v0'
TB_API = 'events'
mode = 'append'
datasource = 'tweets'
search_label = 'MAD'
track = ['Madrid', 'madrid', 'Spain', 'spain', 'España', 'españa', 'Espana', 'espana']

endpoint = TB_HOST+'/'+TB_API
params = {
        'mode': mode,
        'name': datasource,
        'token': TB_TOKEN
        }

auth = tweepy.OAuthHandler(TWITTER_API_KEY, TWITTER_API_KEY_SECRET)
auth.set_access_token(TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, timeout=300)

time_limit = 60*60    # seconds to run stream
max_wait_seconds = 60 # time between sendings chunks to Tinybird
max_wait_records = 10_000
max_wait_bytes = 1024*1024

Send chunks of data to Tinybird

In [4]:
class TinybirdApiSink():
    def __init__(self, params, endpoint=endpoint):
        super().__init__()
        self.params = params
        self.url = endpoint
        retry = Retry(total=5, backoff_factor=0.2)
        adapter = HTTPAdapter(max_retries=retry)
        self._session = requests.Session()
        self._session.mount('http://', adapter)
        self._session.mount('https://', adapter)
        self.reset()
        self.wait = False

    def reset(self):
        self.chunk = StringIO()

    def append(self, value):
        try:
            self.chunk.write(json.dumps(value) + '\n')
        except Exception as e:
            print(e)

    def tell(self):
        return self.chunk.tell()

    def flush(self):
        self.wait = True
        data = self.chunk.getvalue()
        self.reset()

        ok = False
        try:
            response = self._session.post(self.url, 
                                params = self.params, 
                                files = dict(ndjson=data[:-1])
                                )
            print('flush response', response.status_code, response.text)
            ok = response.status_code < 400
            self.wait = False
            return ok
        except Exception as e:
            self.wait = False
            print(e)
            return ok

Listen for your tweets

In [5]:
class MyStreamListener(tweepy.StreamListener):

    def __init__(self, 
                 name, 
                 api, 
                 search_label, 
                 time_limit = 60, 
                 max_wait_seconds = 10,
                 max_wait_records = 10_000,
                 max_wait_bytes = 1024*1024):
        self.start_time = time.monotonic()
        self.limit = time_limit
        self.saveFile = open('abcd.json', 'a')
        self.name = name
        self.records = 0
        self.api = api
        self.search_label = search_label
        self.sink = TinybirdApiSink(params)
        self.max_wait_seconds = max_wait_seconds
        self.max_wait_records = max_wait_records
        self.max_wait_bytes = max_wait_bytes
        self.records = 0
        self.timer = None
        self.timer_start = None
        self.tr_timer = None
        self.tr_timer_start = None

    def append(self, record):
        self.sink.append(record)
        self.records += 1
        if self.records < self.max_wait_records and self.sink.tell() < self.max_wait_bytes:
            if not self.timer:
                self.timer_start = time.monotonic()
                self.timer = Timer(self.max_wait_seconds, self.flush)
                self.timer.start()
        else:
            self.flush()

    def flush(self):
        if self.timer:
            self.timer.cancel()
            self.timer = None
            self.timer_start = None
        if not self.records:
            return
        self.sink.flush()
        self.records = 0

    def on_data(self, raw_data):
        if (time.monotonic() - self.start_time) > self.limit:
          print('Finished time period')
          return False
        else:
          super().on_data(raw_data)
          tweet = json.loads(raw_data)
          if 'created_at' not in tweet or 'id' not in tweet or 'text' not in tweet:
            return
          date = str(tweet['created_at'])

          text = ''
          try:
            if tweet['truncated']:
                text = tweet['extended_tweet']['full_text']
            else:
                text = tweet['text']
          except Exception as e:
            print(e)
        
          try:
            if tweet.get('retweeted_status'):
                if tweet.get('retweeted_status')['truncated']:
                    text += tweet['retweeted_status'].get('extended_tweet', {})['full_text']
                else:
                    text += tweet['retweeted_status'].get('text')
          except Exception as e:
            print(e)
        
          try:
            if tweet.get('quoted_status'):
                q = tweet.get('quoted_status')
                if q['truncated']:
                    text += q.get('extended_tweet', {})['full_text']
                else:
                    text += q.get('text')
          except Exception as e:
            print(e)
                
          tw = {
            "search_label": self.search_label,
            "tweet": text,
            "date": parsedate_to_datetime(date).strftime("%Y-%m-%d %H:%M:%S")
          }
          self.append(tw)
          return True
        

In [6]:
def connect():
  try:
      myStreamListener = MyStreamListener(datasource, 
                                          api, 
                                          search_label = search_label, 
                                          time_limit = time_limit, 
                                          max_wait_seconds = max_wait_seconds)
      myStream = tweepy.Stream(auth = api.auth, 
                               listener = myStreamListener)
      myStream.filter(track = track)
  except Exception as e:
      print(e)

In [7]:
print('connect'), connect()

connect
flush response 202 {"successful_rows":520,"quarantined_rows":4}
flush response 202 {"successful_rows":464,"quarantined_rows":4}
flush response 202 {"successful_rows":471,"quarantined_rows":4}
flush response 202 {"successful_rows":498,"quarantined_rows":4}
flush response 202 {"successful_rows":502,"quarantined_rows":4}
flush response 202 {"successful_rows":545,"quarantined_rows":4}
flush response 202 {"successful_rows":495,"quarantined_rows":4}
flush response 202 {"successful_rows":472,"quarantined_rows":4}
flush response 202 {"successful_rows":490,"quarantined_rows":4}
flush response 202 {"successful_rows":483,"quarantined_rows":4}
flush response 202 {"successful_rows":454,"quarantined_rows":4}
flush response 202 {"successful_rows":457,"quarantined_rows":4}
flush response 202 {"successful_rows":462,"quarantined_rows":4}
flush response 202 {"successful_rows":484,"quarantined_rows":4}
flush response 202 {"successful_rows":524,"quarantined_rows":4}
flush response 202 {"successful_

(None, None)