# Request on Elasticsearch using Python

### Using Library Elasticsearch from:

https://elasticsearch-py.readthedocs.io/en/7.10.0/index.html

In [6]:
!pip install urllib3 -U

Requirement already up-to-date: urllib3 in c:\users\juan\anaconda3\lib\site-packages (2.2.1)


In [1]:
from elasticsearch import Elasticsearch

es = Elasticsearch(
    ['https://dd10ab92a777447c823f2241f941f0cb.us-central1.gcp.cloud.es.io:443'],
    basic_auth=('Admin', 'Password1'),
)
cloud_id='cf6846b64f08414bb8160517d240d3a5:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlvJGRkMTBhYjkyYTc3NzQ0N2M4MjNmMjI0MWY5NDFmMGNiJDI2OGQyN2IzNmRhODQ5NTNiMWFlYzUxZjU0NTY0NTRh'



In [28]:
# ignore 400 cause by IndexAlreadyExistsException when creating an index
es.indices.create(index='my-index-python', ignore=400)

  es.indices.create(index='my-index-python', ignore=400)


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'my-index-python'})

In [29]:
# ignore 404 and 400
es.indices.delete(index='trafico-madrid', ignore=[400, 404])

  es.indices.delete(index='my-index-python', ignore=[400, 404])


ObjectApiResponse({'acknowledged': True})

### Making Requests

In [2]:
import requests
import json

In [18]:
url = 'https://dd10ab92a777447c823f2241f941f0cb.us-central1.gcp.cloud.es.io:443/'
username = 'Admin'
password = 'Password1'
index='trafico-madrid'
headers = {"Content-Type": "application/json"}
data= """{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  }
}"""

In [35]:
response = requests.put(url+index, data=data, headers=headers, auth=(username, password))

# Actualizar settings

In [None]:
es.indices.delete(index='trafico-madrid', ignore=[400, 404])
url = 'https://dd10ab92a777447c823f2241f941f0cb.us-central1.gcp.cloud.es.io:443/'
username = 'Admin'
password = 'Password1'
index='trafico-madrid'
headers = {"Content-Type": "application/json"}
data= """{
  "settings": {
    "number_of_shards": 3,
    "index.mapping.total_fields.limit": 40,
    "number_of_replicas": 1
  }
}"""
response = requests.put(url+index, data=data, headers=headers, auth=(username, password))
res = response.json()
print(res)

# Obtener los settings con un get

In [41]:
index='trafico-madrid/_settings'
response=requests.get(url+index,headers=headers,auth=(username,password))
res=response.json()
print(res)

{'trafico-madrid': {'settings': {'index': {'routing': {'allocation': {'include': {'_tier_preference': 'data_content'}}}, 'mapping': {'total_fields': {'limit': '40'}}, 'number_of_shards': '3', 'provided_name': 'trafico-madrid', 'creation_date': '1712145172600', 'number_of_replicas': '1', 'uuid': '3oxmclBiQ_Sn75uryADaBA', 'version': {'created': '8503000'}}}}}


# Crear un mapping

In [43]:
index = 'trafico-madrid'
if es.indices.exists(index=index):
    try:
        es.indices.delete(index=index)
        print(f"El índice '{index}' ha sido borrado correctamente.")
    except Exception as excep:
        print(f"Error al borrar el índice '{index}': {excep}")
else:
    print(f"El índice '{index}' no existe.")

mapping = {
    "mappings": {
        "properties": {
            "fecha_hora": {
                "type": "date",
                "format": "yyyy-MM-dd'T'HH:mm:ss'Z'"
            },
            "idelem": {
                "type": "keyword"
            },
            "descripcion": {
                "type": "text"
            },
            "accesoAsociado": {
                "type": "keyword"
            },
            "intensidad": {
                "type": "integer"
            },
            "ocupacion": {
                "type": "integer"
            },
            "carga": {
                "type": "integer"
            },
            "nivelServicio": {
                "type": "keyword"
            },
            "intensidadSat": {
                "type": "integer"
            },
            "error": {
                "type": "keyword"
            },
            "subarea": {
                "type": "keyword"
            },
            "coordenadas": {
                "type": "geo_point"
            }
        }
    }
}
try:
    es.indices.create(index=index, body=mapping)
    print(f"El índice '{index}' ha sido creado correctamente con el mapping actualizado.")
except Exception as excep:
    print(f"Error al crear el índice '{index}': {excep}")

El índice 'trafico-madrid' ha sido borrado correctamente.
El índice 'trafico-madrid' ha sido creado correctamente con el mapping actualizado.


In [52]:
index_1="trafico-madrid/_doc"
data="""{
    "fecha_hora":"2015-01-01T12:10:25Z",
    "idelem":"11111",
    "descripcion":"No hay descripcion",
    "accesoAsociado":"222222",
    "intensidad":80,
    "ocupacion":1,
    "carga":4,
    "nivelServicio":0,
    "intensidadSat":3000,
    "error":"S",
    "subarea":"2222",
    "coordenadas":{
        "lat":28.04,
        "lon":13.56
    }
}"""



response=requests.post(url+index_1,data=data,headers=headers,auth=(username,password))
res=response.json()
print(res)

{'_index': 'trafico-madrid', '_id': 'wVjio44B1QPrHfuDuNIc', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}


In [55]:
index="trafico-madrid/_search"
data="""{
  "query": {
    "match": {
      "carga": "4"
    }
  }
}
"""
response=requests.get(url+index,data=data,headers=headers,auth=(username,password))
res=response.json()
print(res)

{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 1.0, 'hits': [{'_index': 'trafico-madrid', '_id': 'wVjio44B1QPrHfuDuNIc', '_score': 1.0, '_source': {'fecha_hora': '2015-01-01T12:10:25Z', 'idelem': '11111', 'descripcion': 'No hay descripcion', 'accesoAsociado': '222222', 'intensidad': 80, 'ocupacion': 1, 'carga': 4, 'nivelServicio': 0, 'intensidadSat': 3000, 'error': 'S', 'subarea': '2222', 'coordenadas': {'lat': 28.04, 'lon': 13.56}}}]}}


In [60]:
index="trafico-madrid/_delete_by_query"
data="""{
  "query": {
    "match": {
      "_id": "wVjio44B1QPrHfuDuNIc"
    }
  }
}
"""
response=requests.post(url+index,data=data,headers=headers,auth=(username,password))
res=response.json()
print(res)

{'took': 16, 'timed_out': False, 'total': 1, 'deleted': 1, 'batches': 1, 'version_conflicts': 0, 'noops': 0, 'retries': {'bulk': 0, 'search': 0}, 'throttled_millis': 0, 'requests_per_second': -1.0, 'throttled_until_millis': 0, 'failures': []}


# 1.4: Consulta del API e ingesta de datos en ELASTICSEARCH 

In [4]:
from xml.etree import ElementTree
response=requests.get("https://informo.madrid.es/informo/tmadrid/pm.xml")
root=ElementTree.fromstring(response.content)
fecha=root[0].text
print(fecha)

08/04/2024 10:10:08


In [5]:
out=[]
for pm in root[1:-1]:
    d={}
    for i in pm:
        d[i.tag]=i.text
    out.append(d)

In [132]:
o=out[1:2]

In [133]:
procesar(o)

{'idelem': '6808', 'intensidad': '240', 'ocupacion': '3', 'carga': '20', 'nivelServicio': '0', 'velocidad': '38', 'error': 'N', 'st_x': '441891,868185106', 'st_y': '4481354,05712662'}


In [125]:
import datetime
import utm
from unidecode import unidecode

index = "trafico-madrid"

def procesar(data):
    
    for i in data:

        try:
            #fecha_hora = datetime.datetime.strptime(fecha, "%d-%m-%YT%H:%M:%SZ")
            idelem = i["idelem"]
            descripcion = unidecode(i["descripcion"].replace("Ø", "n.").replace("´","").replace("'",""))
            acceso_asociado = i["accesoAsociado"]
            try:
                intensidad = float(i["intensidad"])
            except KeyError:
                pass
            except:
                del i["intensidad"]
            try:
                ocupacion = float(i["ocupacion"])
            except KeyError:
                pass
            except:
                del i["ocupacion"]
            try:
                carga = float(i["carga"])
            except KeyError:
                pass
            except:
                del i["carga"]
            try:
                intensidad_sat = float(i["intensidadSat"])
            except KeyError:
                pass
            except:
                del i["intensidadSat"]
            
            nivel_servicio = i["nivelServicio"]
            error = i["error"]
            subarea = i["subarea"]

        
            latitud, longitud = utm.to_latlon(float(i["st_x"].replace(',', '.')), float(i["st_y"].replace(',', '.')), 30, 'T')

            # Lo que irá a Elastic
            data= {
                "idelem": idelem,
                "descripcion": descripcion,
                "accesoAsociado": acceso_asociado,
                "intensidad": intensidad,
                "ocupacion": ocupacion,
                "carga": carga,
                "nivelServicio": nivel_servicio,
                "intensidadSat": intensidad_sat,
                "error": error,
                "subarea": subarea,
                "coordenadas": {"lat": latitud, "lon": longitud}
            }
            es.index(index=index, body=data)
            
        except Exception as e:
            print("Error en el dato:")

In [134]:
index="trafico-madrid/_search"
data="""{
  "query": {
    "match": {
      "idelem": "7000"
    }
  }
}
"""
response=requests.get(url+index,data=data,headers=headers,auth=(username,password))
res=response.json()
print(res)

{'took': 1, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 1, 'relation': 'eq'}, 'max_score': 8.003029, 'hits': [{'_index': 'trafico-madrid', '_id': 'dkcspY4BPZ1dPWaNmzAp', '_score': 8.003029, '_source': {'idelem': '7000', 'descripcion': '(MICRO)PSO. FERROVIARIOS n.3 (GIRO SENTIDO ALBERTO PALACIOS)', 'accesoAsociado': '0', 'intensidad': 42.0, 'ocupacion': 60.0, 'carga': 61.0, 'nivelServicio': '2', 'intensidadSat': 1500.0, 'error': 'N', 'subarea': '1738', 'coordenadas': {'lat': 40.34310942829344, 'lon': -3.7178268075649976}}}]}}


# 1.6: Ingesta de muchos elementos de manera simultánea 

In [23]:
import json
def njsonear(df):
    mjson=""
    for i in df:
        mjson+='{ "index": {"_index": "trafico-madrid"}}\n'
        mjson+=json.dumps(i) +'\n'
    return mjson

In [24]:
mjson=njsonear(out)

In [26]:
url = 'https://dd10ab92a777447c823f2241f941f0cb.us-central1.gcp.cloud.es.io:443/'
index='trafico-madrid'
api="""/_bulk"""
headers={"Content-type":"application/json"}
response=requests.post(url+index+api,data=mjson,headers=headers,auth=(username,password))
response.json()

{'errors': False,
 'took': 394,
 'items': [{'index': {'_index': 'trafico-madrid',
    '_id': 'eEfavI4BPZ1dPWaNJkG4',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 2, 'failed': 0},
    '_seq_no': 4486,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'trafico-madrid',
    '_id': 'eUfavI4BPZ1dPWaNJkG4',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 2, 'failed': 0},
    '_seq_no': 4487,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'trafico-madrid',
    '_id': 'ekfavI4BPZ1dPWaNJkG4',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 2, 'failed': 0},
    '_seq_no': 4488,
    '_primary_term': 1,
    'status': 201}},
  {'index': {'_index': 'trafico-madrid',
    '_id': 'e0favI4BPZ1dPWaNJkG4',
    '_version': 1,
    'result': 'created',
    '_shards': {'total': 2, 'successful': 2, 'failed': 0},
    '_seq_no': 4489,
    '_primary_term': 1,
