# Streaming from Twitter into local host & port for sparkStream python code to read and analyzie

Run this file first, then sparkStreamWorking.pynb

In [1]:
import socket
import requests
import os
import json
import sys


In [2]:
bearer_token =''



In [3]:
def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2FilteredStreamPython"
    return r

def delete_all_rules(rules):
    if rules is None or "data" not in rules:
        return None

    ids = list(map(lambda rule: rule["id"], rules["data"]))
    payload = {"delete": {"ids": ids}}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot delete rules (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    print(json.dumps(response.json()))


def set_rules(delete):
    # You can adjust the rules if needed
    sample_rules = [
        {"value": "-is:retweet has:geo (from:HealthPolicyNew OR from:INHIMSS OR from:emilycfreeman OR from:HL7IndiaTSC OR from:healthbees OR from:MEDITECHSA OR from:SWBenefitsAssoc OR from:digihealthinfo)",
        },
        {"value": "critical care","lang": "EN", "place_country":"US"}


    ]
    payload = {"add": sample_rules}
    response = requests.post(
        "https://api.twitter.com/2/tweets/search/stream/rules",
        auth=bearer_oauth,
        json=payload,
    )
    if response.status_code != 201:
        raise Exception(
            "Cannot add rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))

    
def get_rules():
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth
    )
    if response.status_code != 200:
        raise Exception(
            "Cannot get rules (HTTP {}): {}".format(response.status_code, response.text)
        )
    print(json.dumps(response.json()))
    return response.json()

    
    
def get_stream(set, conn):
    response = requests.get(
        "https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True,
    )
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(
            "Cannot get stream (HTTP {}): {}".format(
                response.status_code, response.text
            )
        )
    for response_line in response.iter_lines():
        if response_line:
            json_response = json.loads(response_line)
            tweet_text = json.dumps(json_response["data"]["text"])
            print(tweet_text)
            #print(json.dumps(json_response, indent=4, sort_keys=True))
            conn.sendall((tweet_text+"\n").encode("utf-8"))
            #conn.send((tweet_text+'\n').encode("utf-8"))
        
def create_url():
    return "https://api.twitter.com/2/tweets/sample/stream"



In [4]:
def main():
    url = create_url()
    rules = get_rules()
    delete = delete_all_rules(rules)
    set = set_rules(delete)
    print("set", set)
    # server (local machine) creates listening socket
    #s = socket.socket()
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    #host = "0.0.0.0" 
    host = "127.0.0.1"
    port = 5555
    #s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.bind((host, port))
    print('socket is ready')
    # server (local machine) listens for connections
    s.listen()
    print('socket is listening')
    # return the socket and the address on the other side of the connection (client side)
    c_socket, addr = s.accept()
    #c_socket.setblocking(False)
    print("Received request from: " + str(addr))
    print('start sending data from Twitter to socket')
    get_stream(set,c_socket)
    print("******data********")



if __name__ == "__main__":
    main()


{"data": [{"id": "1519471391887745025", "value": "-is:retweet has:geo (from:HealthPolicyNew OR from:INHIMSS OR from:emilycfreeman OR from:HL7IndiaTSC OR from:healthbees OR from:MEDITECHSA OR from:SWBenefitsAssoc OR from:digihealthinfo)"}, {"id": "1519471391887745026", "value": "hurricane"}], "meta": {"sent": "2022-04-28T00:25:59.730Z", "result_count": 2}}
{"meta": {"sent": "2022-04-28T00:26:00.808Z", "summary": {"deleted": 2, "not_deleted": 0}}}
{"data": [{"value": "-is:retweet has:geo (from:HealthPolicyNew OR from:INHIMSS OR from:emilycfreeman OR from:HL7IndiaTSC OR from:healthbees OR from:MEDITECHSA OR from:SWBenefitsAssoc OR from:digihealthinfo)", "id": "1519472969864597505"}, {"value": "hurricane", "id": "1519472969864597504"}], "meta": {"sent": "2022-04-28T00:26:02.583Z", "summary": {"created": 2, "not_created": 0, "valid": 2, "invalid": 0}}}
set None
socket is ready
socket is listening
Received request from: ('127.0.0.1', 57162)
start sending data from Twitter to socket
200
"RT @

"RT @Annette_Taddeo: Ron DeSantis cites the upcoming hurricane season as reasons for a special session on insurance \u2014 we\u2019re from Florida, we\u2026"
"Tropical storms and hurricanes that begin with the letter 'I' have had more names retired than any other letter. https://t.co/79iQFrbplR"
"RT @Captn_Hurricane: Alignment of 5 planets, Earth, Mars, Venus, Saturn and Jupiter, as seen from Africa at 4 am. https://t.co/g1XWoa53h3"
"No pause for thought as Villarreal run into relentless Red hurricane | Jonathan Liew https://t.co/8lritefCMW https://t.co/LFZgM9fQkv"
"The World Meteorological Organization has retired #Ida from the list of hurricane names. https://t.co/souXkNpDdf"
"@normcharlatan @JeremiahOshan If so, is there a hurricane of jacarandas?"
"you could be my luck\neven in a hurricane of frowns\ni know that we'll be safe and sound"
"RT @WorldAtWarRuss1: Destruction of the MLRS \"Hurricane\" of the Armed Forces of Ukraine in the Kharkov \n\n#Kyiv \n#Ukraine \n#UkraineRussiaWar\

"@hurricane_jane9 I hate this"
"@bhani7 That would have been a hurricane! Lol"
"RT @PChidambaram_IN: The Umran Malik hurricane is blowing away everything in its way\n\nThe sheer pace and aggression is a sight to behold\n\nA\u2026"
"RT @1DOPEKEV: I be asking myself how hot does a club gota be for Hurricane Chris to say \u201cIt's so hot up in the club that I ain't got no sho\u2026"
"RT @Deoliver47: 4 years and 216 days post Hurricane Maria in Puerto Rico, Puerto Rican women are now faced with a different type of storm.\u2026"
"Keep in mind that you're not going to see an intense -NAO for weeks on end. It's just the general lack of intense trade winds which promotes a warming pattern across the MDR. This does have hurricane season implications if it can persist."
"RT @1DOPEKEV: I be asking myself how hot does a club gota be for Hurricane Chris to say \u201cIt's so hot up in the club that I ain't got no sho\u2026"
"RT @MojoMoomey: Friday Night #MojoRocks #BoomRadio\ud83d\udca3\ud83d\udca5

"@fredschwab1 Thank you!!  Got home Sunday.  Been spaying hfrs all week.  Finally some nice weather without the hurricane here."


KeyboardInterrupt: 

# Rules

    {"value": "hurricane OR earthquake", "lang": "EN", "place_country":"US",
        "tag": "theme:info has:geo original from weather agencies and gauges"},
        {"value": "disaster has:images"},