In [66]:
import pandas as pd
from elasticsearch import Elasticsearch, helpers
from utils.index_helper import IndexHelper

In [None]:
es = Elasticsearch([{'host': 'localhost', 'port': 9200, 'scheme': 'http'}])

In [None]:
clustered_ratings_df = pd.read_csv('../data/produced/clustered_ratings.csv')
clustered_ratings_df

In [None]:
unique_clusters = len(clustered_ratings_df['cluster'].unique())
print(f'Unique clusters: {unique_clusters}')
unique_isbns = len(clustered_ratings_df['isbn'].unique())
print(f'Unique ISBNs: {unique_isbns}')

In [None]:
def index_data(records: {}, index: str):
    for record in records:
        yield {
            "_index": index,
            "_source": record
        }

In [None]:
clustered_ratings_dict = clustered_ratings_df.to_dict('records')
helpers.bulk(es, index_data(clustered_ratings_dict, 'clustered_ratings'))

In [None]:
response = es.search(
    index="clustered_ratings",
    aggs={
        "cluster_agg": {
            "terms": {
                "field": "cluster",
                "size": unique_clusters
            },
            "aggs": {
                "isbn_agg": {
                    "terms": {
                        "field": "isbn.keyword",
                        "size": unique_isbns
                    },
                    "aggs": {
                        "avg_rating": {
                            "avg": {
                                "field": "rating"
                            }
                        }
                    }
                }
            }
        }
    })

In [63]:
avg_ratings_df = pd.DataFrame([{
    "cluster": cluster_bucket['key'],
    "isbn": isbn_bucket['key'],
    "avg_rating": isbn_bucket['avg_rating']['value']
} for cluster_bucket in response['aggregations']['cluster_agg']['buckets']
    for isbn_bucket in cluster_bucket['isbn_agg']['buckets']])

avg_ratings_df['avg_rating'] = avg_ratings_df['avg_rating'].round(2)

avg_ratings_df

Unnamed: 0,cluster,isbn,avg_rating
0,1,0971880107,1.28
1,1,044023722X,4.70
2,1,0312195516,6.25
3,1,0060928336,4.87
4,1,0452282152,5.25
...,...,...,...
28216,5,1842550942,8.00
28217,5,1853260495,0.00
28218,5,1853260622,8.00
28219,5,1892958015,0.00


In [None]:
avg_ratings_df.to_csv('../data/produced/avg_ratings.csv', index=False)

In [67]:
ih = IndexHelper(es)
ih.bulk_index(data=avg_ratings_df, index='avg_ratings')

  if self.es.indices.exists(index=index):
  self.es.indices.delete(index=index)


Deleting index...


  helpers.bulk(self.es, _index_data(data_dict, index))
