# Stream Tweets and Users and write into Postgre DB


For BdF users, CNTLM must be running.

Also the script required that PostgreSQL be installed

----
## 0. Packages and functions
### Packages

In [1]:
import os
import pickle 
import datetime
import time
import random
import json
import re
import random
import urllib3

import tweepy
import tqdm
from dotenv import load_dotenv
import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, Boolean, JSON, BigInteger
from sqlalchemy.orm import sessionmaker

#from faker import Faker

#import matplotlib.pyplot as plt
#import seaborn as sns

#### Proxy config

In [2]:
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
os.environ["CURL_CA_BUNDLE"] = ""
os.environ["HTTPS_PROXY"] = "localhost:3128"

### Functions

In [3]:
def get_user_timeline(user, count=None):
    """Get full user timeline, 'user' can be screen_name or user_id
    """
    c = tweepy.Cursor(api.user_timeline, user, tweet_mode="extended", count=200)
    items = c.items(count) if count else c.items()
    try:
        out = list(items)
    except tweepy.TweepError as e:
        print(e)
    else:
        return out
    
    
def get_full_text_if_present(status):
        try:
            text = status.extended_tweet["full_text"]
        except AttributeError:
            text = status.text
        return text

    
def bad_tweet(text_tweet):
    bad_keywords = ["cricket", "bowled", "footwork"]
    return any([bad in text_tweet.lower() for bad in bad_keywords])


def get_info_from_tweet(status):
        text = get_full_text_if_present(status)
        text = re.sub(pattern="'{1,}", repl="''", string=text)
        date = status.created_at.isoformat()
        id_tw = status.id
        d = {
            "id": status.id,
            "created_at": date,
            "text": text,
            "lang": status.lang,
            "retweet_count": status.retweet_count,
            "retweeted": status.retweeted,
            "entities": status.entities
        }
        return d


def get_user_from_status(status):
    user_fields = [
    "id", 
    "screen_name",
    "name",
    "description",
    "statuses_count",
    "protected",
    "verified",
    "followers_count",
    "friends_count",
    "created_at",
    ]
    return {key:status.user._json.get(key) for key in user_fields}


### 1. OAUTH to Twitter API
#### Load API keys and secrets as environment variables

In [6]:
if not os.path.isfile('.env'):
    raise FileNotFoundError(
        "You should an .env file with the following keys: " + 
        ', '.join(["consumer_key", "consumer_secret", "key", "secret"]))
else:
    load_dotenv()

consumer_info = dict(
    consumer_key=os.getenv("consumer_key"),
    consumer_secret=os.getenv("consumer_secret"),
)

access_info = dict(
    key=os.getenv("key"),
    secret=os.getenv("secret"),
)

oauth = tweepy.OAuthHandler(**consumer_info)
oauth.set_access_token(**access_info)
api = tweepy.API(oauth, proxy="localhost:3128", wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

#tl = get_user_timeline("banquedefrance", count=10)
#tweet = tl[0]

In [7]:
api.get_user("banquedefrance")

User(_api=<tweepy.api.API object at 0x0000027173A5B3C8>, _json={'id': 223406334, 'id_str': '223406334', 'name': 'Banque de France', 'screen_name': 'banquedefrance', 'location': 'Paris, France', 'profile_location': None, 'description': 'Institution indépendante, la Banque de France a trois grandes missions : la stratégie monétaire, la stabilité financière, les services à l’économie.', 'url': 'http://t.co/cqSL5ZCbI1', 'entities': {'url': {'urls': [{'url': 'http://t.co/cqSL5ZCbI1', 'expanded_url': 'http://www.banque-france.fr', 'display_url': 'banque-france.fr', 'indices': [0, 22]}]}, 'description': {'urls': []}}, 'protected': False, 'followers_count': 35721, 'friends_count': 131, 'listed_count': 870, 'created_at': 'Mon Dec 06 09:09:58 +0000 2010', 'favourites_count': 261, 'utc_offset': None, 'time_zone': None, 'geo_enabled': True, 'verified': True, 'statuses_count': 17371, 'lang': None, 'status': {'created_at': 'Mon Jul 13 12:54:48 +0000 2020', 'id': 1282659745392885760, 'id_str': '12826

### 2. Declare class for SQLAlchemy's ORM

In [None]:
Base = declarative_base()

class TwitterUser(Base):
    __tablename__ = 'users'
    
    id = Column(BigInteger, primary_key=True)
    created_at = Column(DateTime)
    screen_name = Column(String)
    name = Column(String)
    description = Column(String)
    followers_count = Column(Integer)
    friends_count = Column(Integer)
    verified = Column(Boolean)
    protected = Column(Boolean)
    statuses_count = Column(Integer)
    
    def __repr__(self):
        return "<TwitterUser(id={}, screen_name={}, followers_count={})>".format(self.id, self.screen_name, self.followers_count)

class Tweet(Base):
    __tablename__ = 'tweets'
    
    id = Column(BigInteger, primary_key=True)
    created_at = Column(DateTime)
    text = Column(String)
    lang = Column(String)
    retweet_count = Column(Integer)
    retweeted = Column(Boolean)
    entities = Column(JSON)
    
    def __repr__(self):
        return "<Tweet(id={}, created_at={}, text={})>".format(self.id, self.created_at, self.text)

In [None]:
engine = create_engine('postgresql://postgres:postgres@localhost:5432/twitter', echo=True)

#### Create database if not already there

In [None]:
drop_db_if_there = False

if drop_db_if_there:
    TwitterUser.__table__.drop(engine)
    Tweet.__table__.drop(engine)

In [None]:
Base.metadata.create_all(engine)

### 3. Streaming from Twitter API

#### 3.1. Init session to interact with postgres (query/insert)

In [None]:
Session = sessionmaker(bind=engine)
session = Session()

#### 3.2. Define Stream Listener

In [None]:
class ECBStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        user_dict = get_user_from_status(status)
        tweet_dict = get_info_from_tweet(status)
        
        text = tweet_dict["text"]
        
        if bad_tweet(text):
            print("Cricket:\n", text, "\n"*2)
            pass
        else:
            print("European Central Bank:\n", text, "\n"*2)
            current_user = TwitterUser(**user_dict)
            current_tweet = Tweet(**tweet_dict)
            
            try:
                session.add(current_user)
                session.add(current_tweet)
                session.commit()
            except Exception as e:
                print(e)
                session.rollback()

    def on_error(self, status_code):
        print(status_code)


#### 3.3. Init classes

In [None]:
StreamListener = ECBStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=StreamListener)

## 4. Run streaming

The block below is blocking unless is_async=True is passed

In [None]:
myStream.filter(track=["ecb"], languages=["en"], is_async=True)

### 5. Query DB

In [None]:
[print("Tweet {}: {}".format(i, tweet.text), "\n--------\n") for i, tweet in enumerate(session.query(Tweet).all())]

In [None]:
[(user.screen_name, user.followers_count) for user in session.query(TwitterUser).all()]