# Elastic Search Playground
First, load the credentials to connect the elastic search client and the posgres database. 

In [10]:
import json

es_cred = json.load(open('../es-credentials.json'))
pg_cred = json.load(open('../pg-credentials.json'))

Check if the Elastic Search instance is running either by
- elastic search python library
- basic https request

In [11]:
from elasticsearch import Elasticsearch
import psycopg2

def es_connect(credentials: json):
    """
    Connect to an elastic search API.
    """
    try:
        print("Connecting to Elastic Search...")
        es = Elasticsearch(es_cred['URL'], basic_auth=(es_cred['USER'], es_cred['PWD']), ca_certs=es_cred['CERT'])
    except Exception:
        print("Unable to connect to", credentials['URL'])
        exit(1)
    print("Successfully connected to", credentials['URL'])
    return es


def pg_connect(credentials: json):
    """
    Connect to a PostgreSQL database.
    """
    try:
        print("Connecting to PostgreSQL database...")
        pg = psycopg2.connect(dbname=credentials["DB"], user=credentials['USER'], password=credentials['PWD'])
    except Exception:
        print("Failed to connect to PostgresQL database ", credentials['URL'])
    print("Successfully connected to", credentials['URL'])
    return pg

In [15]:
# connect to elastic instance
es_client = es_connect(credentials=es_cred)

# Get info of API
es_client.info()

Connecting to Elastic Search...
Successfully connected to https://localhost:9200


ObjectApiResponse({'name': 'f6240d32ea65', 'cluster_name': 'docker-cluster', 'cluster_uuid': 'YIiFu2p-QOWJhSPb-Zcavw', 'version': {'number': '8.5.2', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': 'a846182fa16b4ebfcc89aa3c11a11fd5adf3de04', 'build_date': '2022-11-17T18:56:17.538630285Z', 'build_snapshot': False, 'lucene_version': '9.4.1', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

In [13]:
import requests

# access API via http request
requests.get(es_cred['URL'], auth=(es_cred['USER'], es_cred['PWD']), verify=es_cred['CERT'])

<Response [200]>

For this project the elastic search python package is utilized. Let's check the overall status of the elastic search instance.

In [16]:
es_client.cluster.health()

ObjectApiResponse({'cluster_name': 'docker-cluster', 'status': 'yellow', 'timed_out': False, 'number_of_nodes': 1, 'number_of_data_nodes': 1, 'active_primary_shards': 15, 'active_shards': 15, 'relocating_shards': 0, 'initializing_shards': 0, 'unassigned_shards': 2, 'delayed_unassigned_shards': 0, 'number_of_pending_tasks': 0, 'number_of_in_flight_fetch': 0, 'task_max_waiting_in_queue_millis': 0, 'active_shards_percent_as_number': 88.23529411764706})

---
## Create Index for Tweets

In [None]:
es_client.indices.create(index="tweets")

## Delete Index

In [17]:
es_client.indices.delete(index="test")

ObjectApiResponse({'acknowledged': True})

---
## Feed data into ES

Now we want to feed data from the Twitter PostgreSQL database into Elastic Search.

In [10]:
def iterate(cursor, attributes, size=100):
    """
    An iterator that returns objects from a PostgreSQL database connection.
    The objects are turned into dictionaries of a certain shape defined by attributes. 
    """
    while True:
        results = cursor.fetchmany(size)
        if not results:
            break

        for result in results:
            obj = {}
            for i, attr in enumerate(attributes):
                obj[attr] = result[i]
            yield obj

In [67]:
# connect to postgres and retrieve cursor object
pg_cursor = pg_connect(credentials=pg_cred).cursor()

# selected attributes from postgres twitter table
# CAVEAT: order matters!
ATTRIBUTES = ["id", "conversation_id", "author_id", "retweet_count", "reply_count", "like_count", "created_at", "txt", "word_count"]

# filter the tweets to have at least a certain number of words
WORD_COUNT = 25

# formulate query to add number of words within a tweet text
word_count_query = (
    f"SELECT {', '.join(ATTRIBUTES)}, array_length(string_to_array(regexp_replace(txt,  '[^\w\s]', '', 'g'), ' '), 1) AS word_count FROM tweet)"
)

# compose final query
pg_query = (
    f"SELECT * FROM ( "
    f"{word_count_query} AS t " 
    f"WHERE word_count >= {WORD_COUNT} "
)

# execute the query
print("Executing Query\n", pg_query)
pg_cursor.execute(query=pg_query)

Connecting to PostgreSQL database...
Successfully connected to localhost:5432
Executing Query
 SELECT * FROM ( SELECT id, conversation_id, author_id, retweet_count, reply_count, like_count, created_at, txt, array_length(string_to_array(regexp_replace(txt,  '[^\w\s]', '', 'g'), ' '), 1) AS word_count FROM tweet) AS t WHERE word_count >= 25 


In [13]:
from tqdm import tqdm

# insert data from Postgres to ES in a lazy manner
for data in tqdm(iterate(cursor=pg_cursor, attributes=ATTRIBUTES, size=1000)):
    # insert data into ES
    try:
        # parse the id to avoid duplicates
        es_client.index(index="tweets", document=data, id=data["id"])
    except Exception as exc:
        print("Exception during data insertion!")
        exit(1)

190826it [1:44:01, 30.57it/s]


---
## Search data

In [5]:
es_query = {
      "match": {
        "txt": "Maß Bier"
    }
}


In [9]:
res = es_client.search(index="tweets", size=2, query=es_query)
res["hits"]["hits"]

[{'_index': 'tweets',
  '_id': '1443707088392409095',
  '_score': 16.841162,
  '_source': {'id': 1443707088392409095,
   'conversation_id': 1443707088392409095,
   'author_id': 235703405,
   'retweet_count': 6,
   'reply_count': 5,
   'like_count': 82,
   'created_at': '2021-10-01T00:39:27+02:00',
   'txt': 'Ich finde, CSU’ler wie #Ramsauer wären auf dem Oktoberfest bei einem Schweinshaxen und einer Maß Bier besser aufgehoben als bei einer Talkshow von Markus #Lanz!'}},
 {'_index': 'tweets',
  '_id': '1423614844863979527',
  '_score': 9.578211,
  '_ignored': ['txt.keyword'],
  '_source': {'id': 1423614844863979527,
   'conversation_id': 1423614844863979527,
   'author_id': 940691491835564033,
   'retweet_count': 1,
   'reply_count': 0,
   'like_count': 9,
   'created_at': '2021-08-06T14:00:03+02:00',
   'txt': 'Heute ist der Internationale Tag des #Bieres, d.h.:\n1️⃣ #Freunde treffen, um gemeinsam Bier zu genießen.\n2️⃣Die Männer und Frauen zu ehren, welche das Bier brauen und serviere