In [3]:
!pip install textblob 

Collecting textblob
  Downloading textblob-0.17.1-py2.py3-none-any.whl (636 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m636.8/636.8 KB[0m [31m137.8 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting nltk>=3.1
  Downloading nltk-3.7-py3-none-any.whl (1.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m183.6 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting regex>=2021.8.3
  Downloading regex-2022.4.24-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (763 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m763.2/763.2 KB[0m [31m183.9 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: regex, nltk, textblob
Successfully installed nltk-3.7 regex-2022.4.24 textblob-0.17.1


In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.mllib.linalg import Vectors
from textblob import TextBlob
import json
import requests


### Host + Ports (You may change this)

In [None]:
HOST = "host.docker.internal"   # if inside docker container
# HOST = "localhost"            # if outside docker container 

STREAM_PORT = 4040     # the port to which your stream listenner is binded
ELASTIC_PORT = 9200    # the port of your Elasticsearch server

## Spark

### Spark context

In [None]:
# Create a local StreamingContext with batch interval of 10 second
sc = SparkContext.getOrCreate()
sc.setCheckpointDir("spark_checkpoint")    # important to use an invertible window operation later
ssc = StreamingContext(sc, 10)

# create a DStream for the tweets 
raw_tweets = ssc.socketTextStream(HOST, STREAM_PORT)

### Processing Tweets

In [None]:
# acts as a label encoder [class -> int]
tag_to_int = {}
loc_to_int = {}

def process_tweets(tweet):
    tweet = json.loads(tweet)
    
    # for the label encoder
    if tweet['tag'] not in tag_to_int:
        tag_to_int[tweet['tag']] = len(tag_to_int)
        
    if tweet['location'] not in loc_to_int:
        loc_to_int[tweet['location']] = len(loc_to_int)
       
    # sentiment analysis
    polarity, subjectivity = TextBlob(tweet['text']).sentiment
    return {
        'text':tweet['text'],
        'location':loc_to_int[tweet['location']],
        'tag':tag_to_int[tweet['tag']],
        'polarity':polarity,
        'subjectivity':subjectivity
    }

# add the processing to the pipeline
tweets = raw_tweets.map(process_tweets)
train_data = tweets.map(lambda tweet: Vectors.dense([tweet['tag'], tweet['location'], tweet['polarity'], tweet['subjectivity']]))

### Clustering

In [None]:
model = StreamingKMeans(k=4, decayFactor=1.0).setRandomCenters(4, 1.0, 0)

model.trainOn(train_data)
result = model.predictOn(train_data)   # evaluate on the training data cuz don't know what else to evaluate on :D

### Size of clusters (window operation)

In [None]:
pairs = result.map(lambda cluster: (f'cluster-{cluster+1}', 1))
# window of size 30s, and slides by 10s (very arbitrary)
clusterCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)

# print the results
clusterCounts.pprint()

### Start the processing

In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2022-05-06 10:07:00
-------------------------------------------

-------------------------------------------
Time: 2022-05-06 10:07:10
-------------------------------------------
('cluster-4', 6)
('cluster-3', 2)

-------------------------------------------
Time: 2022-05-06 10:07:20
-------------------------------------------
('cluster-4', 13)
('cluster-3', 4)

-------------------------------------------
Time: 2022-05-06 10:07:30
-------------------------------------------
('cluster-4', 16)
('cluster-3', 8)

-------------------------------------------
Time: 2022-05-06 10:07:40
-------------------------------------------
('cluster-4', 16)
('cluster-3', 8)

-------------------------------------------
Time: 2022-05-06 10:07:50
-------------------------------------------
('cluster-4', 19)
('cluster-3', 7)

-------------------------------------------
Time: 2022-05-06 10:08:00
-------------------------------------------
('cluster-4', 23)
('cl

## Elasticsearch

### functions to handle the requests

In [69]:
def elasticsearch_curl(uri='', json_body='', verb='get', verbose=1):
    # verbose : 0 => none, 1 => errors, 2 => all
    uri = f'http://{HOST}:{ELASTIC_PORT}/'+uri
    # pass header option for content type if request has a
    # body to avoid Content-Type error in Elasticsearch v6.0
    headers = {
        'Content-Type': 'application/json',
    }

    try:
        # make HTTP verb parameter case-insensitive by converting to lower()
        if verb.lower() == "get":
            resp = requests.get(uri, headers=headers, data=json_body)
        elif verb.lower() == "post":
            resp = requests.post(uri, headers=headers, data=json_body)
        elif verb.lower() == "put":
            resp = requests.put(uri, headers=headers, data=json_body)

        # read the text object string
        try:
            resp_text = json.loads(resp.text)
        except:
            resp_text = resp.text

        # catch exceptions and print errors to terminal
    except Exception as error:
        resp_text = error
        if verbose>=1:
            print ('\nelasticsearch_curl() error:', error)

    if verbose>=2:
        print(json.dumps(resp_text, sort_keys=False, indent=4))
        
    return resp_text

In [108]:
def insert_doc(rdd):
    """ insert all tweets of the rdd in Elasticsearch

    Args:
        rdd (RDD): the rdd containing the tweets to insert
    """

    # create the body (tweets)
    body = ""
    for tweet in rdd.collect():
        body += '{ "create": { } }' + '\n' + tweet + '\n'

    if body: # if there is at least one tweet

        # send a PUT request to insert the tweets
        response = elasticsearch_curl(
            f'logs-my_app-default/_bulk?pretty',
            verb='put',
            json_body=body,
            verbose=1
        )
        if 'errors' in response.keys() and not response['errors']:
            print(f'{len(response["items"])} tweets inserted')
        else:
            print("Failed to insert")
            
def query(q):
    """ run a query on the tweets

    Args:
        q (dict): a query to run
    """

    # send a GET request
    response = elasticsearch_curl(
                '_search',
                verb='get',
                json_body=json.dumps(q),
                verbose=1
            )
    
    # print the matching results
    hits = response['hits']['hits']
    if len(hits)==0:
        print("No match found")
    for hit in response['hits']['hits']:
        print(json.dumps(hit['_source'], sort_keys=False, indent=4))

### Start inserting tweets

In [None]:
tweets = raw_tweets.foreachRDD(insert_doc)

ssc.start()
ssc.awaitTermination()

7 tweet inserted
9 tweet inserted
9 tweet inserted
12 tweet inserted
7 tweet inserted
7 tweet inserted


### Run some queries

In [109]:
q = {"query": {"match": {"text":"amazon"}}}
query(q);

{
    "text": "[$629.99]\nASUS KO NVIDIA GeForce RTX 3070 V2 OC Edition 8GB GDDR6 Gami\n#rtx3070 #geforce3070\nSource: Amazon Checker v3Z\nReason: Sold by Amazon\n\n\ud83d\uded2: ",
    "location": "Pittsburgh, PA",
    "tag": "amazon"
}
{
    "text": "[$629.99]\nASUS KO NVIDIA GeForce RTX 3070 V2 OC Edition 8GB GDDR6 Gami\n#rtx3070 #geforce3070\nSource: Amazon Checker v3Z\nReason: Sold by Amazon\n\n\ud83d\uded2: ",
    "location": "Pittsburgh, PA",
    "tag": "amazon"
}
{
    "text": "Check out \"Blinding Lights\" by The Weeknd on Amazon Music. ",
    "location": "\u0191\u0289\u036bc\u0367\u043a\u036d\u03b9\u036a\u03b7\u0363 Texas",
    "tag": "amazon"
}
{
    "text": "@BrentBozell I have 4 kids. Why should amazon pay me?",
    "location": "Selmer, TN",
    "tag": "amazon"
}
{
    "text": "@peter3isamazing COME ON FUNKO &amp; AMAZON, DROP A WORKING LINK RIGHT NOW",
    "location": "they/she 25",
    "tag": "amazon"
}
{
    "text": "Deals: Amazon Introduces New BOGO 30% Off Sale on Col

In [125]:
q = {
  "query": {
    "terms": {
      "text": [ "apple", "samsung" ],
      "boost": 1.0
    },
      "terms": {
      "text": [ "amazon" ],
      "boost": 2.0
    }
  }
}
query(q);

{
    "text": "RT @lowkey_harshita: Amazon people are the best \ud83d\ude2d\ud83d\ude2d\n@PrimeVideoIN @AmazonHelp thank you for making our day.\n\n#KaranKundrra ",
    "location": "India",
    "tag": "amazon"
}
{
    "text": "RT @lowkey_harshita: Amazon people are the best \ud83d\ude2d\ud83d\ude2d\n@PrimeVideoIN @AmazonHelp thank you for making our day.\n\n#KaranKundrra ",
    "location": "India",
    "tag": "amazon"
}
{
    "text": "RT @lowkey_harshita: Amazon people are the best \ud83d\ude2d\ud83d\ude2d\n@PrimeVideoIN @AmazonHelp thank you for making our day.\n\n#KaranKundrra ",
    "location": "Mumbai, India",
    "tag": "amazon"
}
{
    "text": "boAt Xtend Smartwatch with Alexa Built-in\n\n1.69\u201d HD Display\nMultiple Watch Faces\nStress Monitor\nSleep Monitor &amp; 5 ATM\n\nPrice - \u20b92,399\n\nOffers - 5% back with Amazon Pay ICICI Bank credit card for Prime members. 3% back for others\n\n\ud83d\ude4fFollow us\n\n",
    "location": "Chandigarh",
    "tag": "amazon"
}
{
    

In [129]:
q = {
  "query": {
    "terms": {
      "text": [ "facebook"],
      "boost": 1.0
    }
  },
  "highlight": {
    "fields": {
      "text": {}
    }
  }
}

query(q);

{
    "text": "Know how to stay in touch with your contacts and get information during an emergency. Radio, TV and official social media accounts are the best places to get information. You can also mark yourself safe on Facebook during a disaster: ",
    "location": "Whitby, ON",
    "tag": "facebook"
}
{
    "text": "@theharshpuniya @PitchGroundHQ @iuditg @elonmusk He is only on Twitter because he decided to buy it!\n\nHe isn't on Instagram, Facebook, snapchat,tinder,tiktok!",
    "location": "Dhule, India",
    "tag": "facebook"
}
{
    "text": "RT @PopCrave: Beyonc\u00e9 earns her first Daytime Emmy nomination in the original song category for her theme song to Facebook Watch\u2019s \u201cTalks w\u2026",
    "location": "Oakland, California ",
    "tag": "facebook"
}
{
    "text": "RT @kdlexfiles: happy 400k views on facebook!!\n\n#KDLexRunToMe ",
    "location": "twenty one",
    "tag": "facebook"
}
{
    "text": "RT @kdlexfiles: happy 400k views on facebook!!\n\n#KDLexRunToMe ",
