In [None]:
!curl -o ml-100k.zip http://files.grouplens.org/datasets/movielens/ml-100k.zip

In [None]:
!unzip ml-100k.zip

In [None]:
import pandas as pd

from functools import reduce
from elasticsearch import Elasticsearch

In [None]:
r_cols = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('ml-100k/u.data', sep='\t', names=r_cols, encoding='latin-1')
ratings['timestamp'] = pd.to_datetime(ratings['timestamp'].astype(int), unit='s')
ratings

In [None]:
m_cols = ['movie_id', 'title', 'release_date', 'video_release_date',
          'imdb_url', 'unknown', 'action', 'adventure', 'animation',
          'children', 'comedy', 'crime', 'documentary', 'drama', 'fantasy',
          'film_noir', 'horror', 'musical', 'mystery', 'romance', 'sci_fi',
          'thriller', 'war', 'western']
movies = pd.read_csv('ml-100k/u.item', sep='|', names=m_cols, encoding='latin-1')
movies['release_date'] = pd.to_datetime(movies['release_date'], format='%d-%b-%Y')
movies['video_release_date'] = pd.to_datetime(movies['video_release_date'], format='%d-%b-%Y')
movies

In [None]:
u_cols = ['user_id', 'age', 'gender', 'occupation', 'zip_code']
users = pd.read_csv('ml-100k/u.user', sep='|', names=u_cols, encoding='latin-1')
users

In [None]:
ratings = pd.merge(pd.merge(ratings, users, on='user_id'), movies, on='movie_id')
ratings

In [None]:
# [x for x in ratings.columns]
ratings.dtypes

In [None]:
def deep_get(dictionary, keys, default=None):
    value = reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary)
    if value is None:
        return default
    return value

In [None]:
def get_log_index(data):
    return f"logana_log.{data.get('@timestamp')[0:10]}"

In [None]:
def to_isoformat(data, column):
    value = data.get(column)
    if pd.isnull(value):
        return None
    return value.isoformat()

def create_logana_request(data):
    release_date = to_isoformat(data, 'release_date')
    video_release_date = to_isoformat(data, 'video_release_date')
    timestamp = to_isoformat(data, 'timestamp')
    return {
        'request': {
            'id': {
                'request': f"{data.get('user_id')}_{data.get('movie_id')}",
                'user': data.get('user_id'),
            },
            'attributes': {
                'keyword': {
                    'gender': data.get('gender'),
                    'occupation': data.get('occupation'),
                    'zip_code': data.get('zip_code'),
                },
                'integer': {
                    'age': data.get('age'),
                },
            },
            'conditions': {
            },
        },
        'response': {
            'results': {
                'doc_movie': {
                    'id': data.get('movie_id'),
                    'keyword': {
                        'title': data.get('title'),
                        'imdb_url': str(data.get('imdb_url')),
                        'zip_code': str(data.get('zip_code')),
                    },
                    'date': {
                        'release_date': release_date,
                        'video_release_date': video_release_date,
                    },
                    'integer': {
                        'rating': data.get('rating'),
                    },
                    'boolean': {
                        'unknown': True if data.get('unknown') == 1 else False,
                        'action': True if data.get('action') == 1 else False,
                        'adventure': True if data.get('adventure') == 1 else False,
                        'animation': True if data.get('animation') == 1 else False,
                        'children': True if data.get('children') == 1 else False,
                        'comedy': True if data.get('comedy') == 1 else False,
                        'crime': True if data.get('crime') == 1 else False,
                        'documentary': True if data.get('documentary') == 1 else False,
                        'drama': True if data.get('drama') == 1 else False,
                        'fantasy': True if data.get('fantasy') == 1 else False,
                        'film_noir': True if data.get('film_noir') == 1 else False,
                        'horror': True if data.get('horror') == 1 else False,
                        'musical': True if data.get('musical') == 1 else False,
                        'mystery': True if data.get('mystery') == 1 else False,
                        'romance': True if data.get('romance') == 1 else False,
                        'sci_fi': True if data.get('sci_fi') == 1 else False,
                        'thriller': True if data.get('thriller') == 1 else False,
                        'war': True if data.get('war') == 1 else False,
                        'western': True if data.get('western') == 1 else False,
                    },
                }
            },
            'attributes': {
            }
        },
        '@timestamp': timestamp,
    }
    

In [None]:
es = Elasticsearch('localhost:9220')

def bulk_insert(bulk_data):
    res = es.bulk(bulk_data, params={"request_timeout": 60})
    if res.get('errors'):
        for item in res.get('items'):
            status = item.get('index').get('status')
            if status >= 200 and status < 300:
                continue
            print(f'{item}')

In [None]:
def insert_requests(df, id_field, bulk_size=5000):
    bulk_data = []
    count = 0
    for idx, row in df.iterrows():
        logana_obj = create_logana_request(row)
        _index = get_log_index(logana_obj)
        _id = deep_get(logana_obj, id_field)
        bulk_data.append({"index": {"_index": _index, "_id" : _id}})
        bulk_data.append(logana_obj)
        count += 1
        if count % bulk_size == 0:
            print(f'docs: {count}')
            bulk_insert(bulk_data)
            bulk_data = []

    if len(bulk_data) > 0:
        bulk_insert(bulk_data)
        count += int(len(bulk_data)/2)
        print(f'docs: {count}')

insert_requests(ratings, 'request.id.request')