In [33]:
from moredata.enricher import Enricher, EnricherBuilder
from moredata.enricher.elasticsearch_connector import ElasticsearchConnector, IndexHandler, ReindexHandler, Pipeline, PipelineHandler
from moredata.models.data import JsonData
from moredata.parser import parse_document
from moredata.datasets import get_path
from moredata.utils import read_json_from_file, write_json_generator_to_json

from elasticsearch import Elasticsearch

In [6]:
HOST = 'localhost'
PORT = 9200

es = Elasticsearch(
    hosts=[{'host': HOST, 'port': PORT}],
    timeout = 10000
)

## Bulk dataset to enrich

In [13]:
import pandas as pd

df = pd.read_csv(get_path("airbnb-berlin-main"))
df = df.loc[(~df['latitude'].isna()) & (~df['longitude'].isna())]
df.to_json('./data/airbnb-berlin.json', orient='records')

df = pd.read_csv(get_path("airbnb-berlin-extra"))
df.to_json('./data/airbnb-berlin-extra.json', orient='records')

In [7]:
def create_airbnb_index(index):
    mapping = read_json_from_file('./mappings/airbnb.json')
    index.create_index(mapping=mapping)

def bulk_airbnb(index):
    data = JsonData(data_file='./data/airbnb-berlin.json', parser=parse_document)
    index.load_index(data.parse, geo_location=True, streaming=True)

In [11]:
index_main_airbnb = IndexHandler(es, "airbnb", "house")
create_airbnb_index(index_main_airbnb)
bulk_airbnb(index_main_airbnb)

In [20]:
def create_airbnb_extra_info_index(index):
    mapping = read_json_from_file('./mappings/airbnb-extra.json')
    index.create_index(mapping=mapping)
def bulk_airbnb_extra_info(index):
    data = JsonData(data_file='./data/airbnb-berlin-extra.json', parser=parse_document)
    index.load_index(data.parse, streaming=True)

In [21]:
index_extra_airbnb = IndexHandler(es, "airbnb-extra", "info")
create_airbnb_extra_info_index(index_extra_airbnb)
bulk_airbnb_extra_info(index_extra_airbnb)

In [29]:
def create_policy(client):
    from moredata.enricher.elasticsearch_connector.policy_handler import Policy, PolicyHandler
    enrich_fields = [
        "amenities",
        "accommodates",
        "beds",
        "bedrooms" 
    ]
    policy = Policy(client,
                    policy_handler=PolicyHandler(type_match="match",
                                                 match_field="id",
                                                 index="airbnb-extra",
                                                 enrich_fields=enrich_fields),
                    name="airbnb-extra-policy")
    policy.create_policy()
    policy.execute_policy()

In [30]:
create_policy(es)

In [31]:
data = JsonData(data_file='./data/airbnb-berlin.json', parser=parse_document)

elk_extra_enricher = Enricher(connector=ElasticsearchConnector(
    index_handler=index_extra_airbnb,
    pipeline=Pipeline(client=es,
                        name="airbnb-extra-enricher",
                        pipeline_handler=PipelineHandler(
                            description="enriching airbnb data with extra info",
                            match_field="id",
                            target_field_name="extra",
                            policy_name="airbnb-extra-policy")),
    reindex_handler=ReindexHandler(index="airbnb",
                                    target_index="airbnb-enriched",
                                    pipeline_name="airbnb-extra-enricher")))

In [36]:
data_enriched = elk_extra_enricher.enrich(data)

In [37]:
write_json_generator_to_json("./data/airbnb_enriched", data_enriched, 1000000)

In [39]:
data_enriched = pd.read_json("./data/airbnb_enriched-0.json", orient='records')

In [41]:
pd.concat([data_enriched, pd.json_normalize(data_enriched['extra'], max_level=0).drop(['id'], axis=1)], axis=1)

Unnamed: 0,neighbourhood_cleansed,geo_location,latitude,neighbourhood,price,extra,name,id,room_type,longitude,bedrooms,amenities,beds,accommodates
0,Brunnenstr. Nord,POINT (13.39749 52.54425),52.54425,"Berlin, Germany",$20.00,"{'bedrooms': '1.0', 'amenities': '[""Dishes and...",bright & airy Pberg/Mitte 3 months or more,1944,Private room,13.39749,1.0,"[""Dishes and silverware"", ""Kitchen"", ""Oven"", ""...",1.0,1.0
1,Brunnenstr. Süd,POINT (13.40256 52.53454),52.53454,"Berlin, Germany",$59.00,"{'bedrooms': '1.0', 'amenities': '[""Smoke alar...",Berlin-Mitte Value! Quiet courtyard/very central,2015,Entire home/apt,13.40256,1.0,"[""Smoke alarm"", ""Cooking basics"", ""Lockbox"", ""...",0.0,3.0
2,Prenzlauer Berg Südwest,POINT (13.41758 52.535),52.53500,"Berlin, Germany",$90.00,"{'bedrooms': '1.0', 'amenities': '[""Dedicated ...",Fabulous Flat in great Location,3176,Entire home/apt,13.41758,1.0,"[""Dedicated workspace"", ""Hangers"", ""Kitchen"", ...",2.0,4.0
3,Schöneberg-Nord,POINT (13.34906 52.49885),52.49885,"Berlin, Germany",$29.00,"{'bedrooms': '1.0', 'amenities': '[""Host greet...",BerlinSpot Schöneberg near KaDeWe,3309,Private room,13.34906,1.0,"[""Host greets you"", ""Washer"", ""Hangers"", ""Hot ...",1.0,1.0
4,Frankfurter Allee Süd FK,POINT (13.45477 52.51171),52.51171,"Berlin, Germany",$79.00,"{'bedrooms': '1.0', 'amenities': '[""Smoke alar...",Stylish East Side Loft in Center with AC & 2 b...,6883,Entire home/apt,13.45477,1.0,"[""Smoke alarm"", ""Cooking basics"", ""Dishes and ...",1.0,2.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
19853,Hellersdorf-Nord,POINT (13.60749 52.54192),52.54192,,$22.00,"{'bedrooms': '1.0', 'amenities': '[""Dedicated ...",Rollstuhlgeeignete Wohnung für zwei in Hönow M...,48597066,Entire home/apt,13.60749,1.0,"[""Dedicated workspace"", ""Hangers"", ""Private en...",1.0,2.0
19854,Tempelhof,POINT (13.41783 52.45901),52.45901,,$80.00,"{'bedrooms': '1.0', 'amenities': '[""Hot water ...",Lichtdurchflutete Wohnung,48599795,Entire home/apt,13.41783,1.0,"[""Hot water kettle"", ""Dryer"", ""Hangers"", ""Hot ...",1.0,2.0
19855,Alexanderplatz,POINT (13.4068 52.51105),52.51105,,"$1,840.00","{'bedrooms': '1.0', 'amenities': '[""Air condit...",Bright Twin Capacity 1 At Mitte,48600069,Private room,13.40680,1.0,"[""Air conditioning"", ""Hot water"", ""Elevator"", ...",1.0,1.0
19856,Frankfurter Allee Süd FK,POINT (13.46587 52.49805),52.49805,,$25.00,"{'bedrooms': '1.0', 'amenities': '[""Dedicated ...",One Room Apartment,48602039,Entire home/apt,13.46587,1.0,"[""Dedicated workspace"", ""Hangers"", ""Kitchen"", ...",,1.0
