In [2]:
##### install elasticsearch
# !pip install elasticsearch

Collecting elasticsearch
  Downloading elasticsearch-7.15.2-py2.py3-none-any.whl (379 kB)
     |████████████████████████████████| 379 kB 921 kB/s            
Installing collected packages: elasticsearch
Successfully installed elasticsearch-7.15.2


In [3]:
import os
import sys
import elasticsearch
import pandas as pd
from elasticsearch import Elasticsearch

## Connect the elastic search

In [14]:
# here localhost will not work
# because elastic search is running as a docker container and jupyter is also running as a docker container
# instead of localhost --> use IP of your machine
es = Elasticsearch([{'host': '192.168.0.103', 'port': 9200}])

In [15]:
# check whether we are connected to elastic search or not
es.ping()



True

## Create Index

In [20]:
# create index equivalent to database in RDBMS
es.indices.create(index='my-foo', ignore=400)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'my-foo'}

In [22]:
# get all the indices
res = es.indices.get_alias("*")
for name in res:
    print(name)

my-foo
.kibana-event-log-7.15.2-000001
.tasks
.apm-agent-configuration
.apm-custom-link
.kibana_7.15.2_001
.kibana_task_manager_7.15.2_001


  res = es.indices.get_alias("*")


## Delete Index

In [23]:
es.indices.delete(index='my-foo', ignore=[400,404])

{'acknowledged': True}

## Upload JSON doc
### This is just for learning purpose. This is not how it will be done in real scenarios

In [24]:
e1 = {
    "first_name": "Vaibhav",
    "last_name": "Jayaswal",
    "Age": 24,
    "Work": "Software Developer"
}

e2 = {
    "first_name": "Saurabh",
    "last_name": "Jayaswal",
    "Age": 19,
    "Work": "Student"
}

In [25]:
# create a index first
es.indices.create(index='typeperson', ignore=400)

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'typeperson'}

In [26]:
res1 = es.index(index='typeperson', doc_type='testpeople', body=e1, id=1)

  res1 = es.index(index='typeperson', doc_type='testpeople', body=e1, id=1)


In [27]:
res1

{'_index': 'typeperson',
 '_type': 'testpeople',
 '_id': '1',
 '_version': 1,
 'result': 'created',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 0,
 '_primary_term': 1}

In [28]:
res2 = es.index(index='typeperson', doc_type='testpeople', body=e2, id=2)

  res2 = es.index(index='typeperson', doc_type='testpeople', body=e2, id=2)


# How to ingest data in actual scenarios

In [60]:
import numpy as np
import datetime
from elasticsearch import helpers
from tqdm import tqdm
from ast import literal_eval

In [32]:
os.listdir()

['netflix_titles.csv', '.ipynb_checkpoints', 'Elastic Search.ipynb']

In [33]:
df = pd.read_csv('netflix_titles.csv')

In [41]:
df.head(2)

Unnamed: 0,show_id,type,title,director,cast,country,date_added,release_year,rating,duration,listed_in,description
0,s1,Movie,Dick Johnson Is Dead,Kirsten Johnson,,United States,"September 25, 2021",2020,PG-13,90 min,Documentaries,"As her father nears the end of his life, filmm..."
1,s2,TV Show,Blood & Water,,"Ama Qamata, Khosi Ngema, Gail Mabalane, Thaban...",South Africa,"September 24, 2021",2021,TV-MA,2 Seasons,"International TV Shows, TV Dramas, TV Mysteries","After crossing paths at a party, a Cape Town t..."


In [35]:
df.columns

Index(['show_id', 'type', 'title', 'director', 'cast', 'country', 'date_added',
       'release_year', 'rating', 'duration', 'listed_in', 'description'],
      dtype='object')

In [38]:
# use showid as id in index because it is unique
df.shape

(8807, 12)

In [39]:
df["show_id"].nunique()

8807

## clean the data

In [42]:
df.isna().sum()

show_id            0
type               0
title              0
director        2634
cast             825
country          831
date_added        10
release_year       0
rating             4
duration           3
listed_in          0
description        0
dtype: int64

In [43]:
df = df.dropna()

In [44]:
df.isna().sum()

show_id         0
type            0
title           0
director        0
cast            0
country         0
date_added      0
release_year    0
rating          0
duration        0
listed_in       0
description     0
dtype: int64

In [45]:
df.shape

(5332, 12)

## convert data into appropriate format that ES (elasticsearch) understand 

In [46]:
# convert df to dict - orient will be 'records' - e.g. {'col1': 1, 'col2': 0.5}, {'col1': 2, 'col2': 0.75}]
df2 = df.to_dict('records')

In [48]:
df2[0]

{'show_id': 's8',
 'type': 'Movie',
 'title': 'Sankofa',
 'director': 'Haile Gerima',
 'cast': 'Kofi Ghanaba, Oyafunmike Ogunlano, Alexandra Duah, Nick Medley, Mutabaruka, Afemo Omilami, Reggie Carter, Mzuri',
 'country': 'United States, Ghana, Burkina Faso, United Kingdom, Germany, Ethiopia',
 'date_added': 'September 24, 2021',
 'release_year': 1993,
 'rating': 'TV-MA',
 'duration': '125 min',
 'listed_in': 'Dramas, Independent Movies, International Movies',
 'description': 'On a photo shoot in Ghana, an American model slips back in time, becomes enslaved on a plantation and bears witness to the agony of her ancestral past.'}

## Settings and Mapping

In [96]:
Settings = {
    "settings":{
        "number_of_shards": 1,
        "number_of_replicas": 0
    },
    "mappings":{
        "properties":{
            "director":{
                "type": "text"
            },
            "duration":{
                "type": "text"
            }
        }
    }
}

In [97]:
#add settings
my_ind_settings = es.indices.create(index="netflix_ml", ignore=[400,404], body=Settings) 

  my_ind_settings = es.indices.create(index="netflix_ml", ignore=[400,404], body=Settings)


In [98]:
my_ind_settings

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'netflix_ml'}

## convert the data into ELK format

In [99]:
# create generator
def generator(df2):
    for c, line in enumerate(df2):
        yield{
            '_index': 'netflix_ml',
            '_type': '_doc',
            '_id': line.get("show_id", None),
            '_source': {
                "title": line.get("title", ""),
                "director": line.get("director", ""),
                "description": line.get("description", ""),
                "duration": line.get("duration", None),
                "cast": line.get("cast", None)
            }
        }
    raise StopIteration

In [100]:
mycustom = generator(df2)

In [101]:
mycustom

<generator object generator at 0xffff66aea890>

## Upload data into Elastic Search

In [102]:
# whenever we upload a doc, we know a director is string but ES does not know that, therefore we use mapping

# when we do bulk insert, it will automatically create the mapping


In [103]:
try:
    res = helpers.bulk(es, generator(df2))
    print("Working")
except Exception as error:
    pass

# GET netflix_ml/_mapping   --> bulk load will add mapping by default which can be seen by the given command in ES 
# dev tools

<Elasticsearch([{'host': '192.168.0.103', 'port': 9200}])>