In [1]:
BATCH_SIZE = 1000
PREFIX = "test_tweets_2"

In [2]:
import requests
import os
import json
from minio import Minio
import io
from dotenv import load_dotenv
from tqdm import tqdm

load_dotenv()  # take environment variables from .env.

# To set your enviornment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'
bearer_token = os.environ.get("TWITTER_BEARER_TOKEN")

def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2FilteredStreamPython"
    return r


def get_rules():
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))
    return response.json()


def delete_all_rules(rules):
    if rules is None or "data" not in rules:
        return None

    ids = list(map(lambda rule: rule["id"], rules["data"]))
    payload = {"delete": {"ids": ids}}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot delete rules (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    print(json.dumps(response.json()))


def set_rules(delete):
    # You can adjust the rules if needed
    ETH_search_rules = [
        {
            "value": "#ETH OR #ethereum OR #eth"
        },
    ]
    payload = {"add": ETH_search_rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload,
    )
    if response.status_code != 201:
        raise Exception(
            "Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))

def get_stream(set):
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True,
    )
    print(response)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Cannot get stream (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    i = 0
    batch = []
    with tqdm(total=BATCH_SIZE) as pbar:
        for response_line in response.iter_lines():
            if response_line:
                response = json.loads(response_line)
                batch.append(response)
                pbar.update(1)
                if len(batch) >= BATCH_SIZE:
                    file_name = f"{PREFIX}/{i: <8}.json"
                    formatted = json.dumps(batch, indent=4, sort_keys=True)
                    pbar.set_description(f"saving {len(batch)} tweets as {file_name}")
                    s3_client.put_object(bucket, f"{file_name}", io.BytesIO(bytes(formatted, "utf-8")), len(formatted))
                    pbar.set_description(f"saved {file_name}")
                    i = i + 1
                    pbar.reset()
                    batch = []

In [3]:
s3_access_key = os.environ.get("S3_ACCESS_KEY")
s3_secret_key = os.environ.get("S3_SECRET_KEY")
bucket = "twitter-data-123456"
s3_client = Minio("storage.googleapis.com", access_key=s3_access_key, secret_key=s3_secret_key)

In [4]:
rules = get_rules()
delete = delete_all_rules(rules)
set = set_rules(delete)

{"data": [{"id": "1487245685946818561", "value": "#ETH OR #ethereum OR #eth"}], "meta": {"sent": "2022-02-04T11:28:43.230Z", "result_count": 1}}
{"meta": {"sent": "2022-02-04T11:28:44.795Z", "summary": {"deleted": 1, "not_deleted": 0}}}
{"data": [{"value": "#ETH OR #ethereum OR #eth", "id": "1489561559911260161"}], "meta": {"sent": "2022-02-04T11:28:46.087Z", "summary": {"created": 1, "not_created": 0, "valid": 1, "invalid": 0}}}


In [5]:
get_stream(set)

<Response [200]>
200


saved test_tweets_2/1       .json:  51%|██████████████████████████████████████████▍                                        | 512/1000 [01:09<01:06,  7.35it/s]


ChunkedEncodingError: ("Connection broken: ConnectionResetError(54, 'Connection reset by peer')", ConnectionResetError(54, 'Connection reset by peer'))