In [1]:
import pandas as pd
import numpy as np
import json
from datetime import datetime as dt
import itertools

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


# README

This script processes dumps from the collections and curated-corpus databases into a format that can be uploaded to elasticsearch indices.

## File requirements

Requires JSON-encoded dumps from the collections and curated-corpus databases (recommend using datagrip). These scripts assume the following folder structure:
```
./corpus/
        ApprovedItem.json
        ApprovedItemAuthor.json

./collections/
              _CollectionToCollectionAuthor.json
              Collection.json
              CollectionAuthor.json
              CollectionLabel.json
              CollectionStory.json
              CollectionStoryAuthor.json
              CurationCategory.json
              IABCategory.json
              Label.json
```

There are a few collections tables which are not required for this script (Image, CollectionPartner, CollectionPartnership).

## Outputs
JSON-encoded payload files which can be converted into bulk upload elasticsearch requests.


# Collections processing

In [2]:
collection = pd.read_json('./collections_dump/Collection.json', 
                          dtype={'curationCategoryId':"Int64", "IABChildCategoryId":"Int64","IABParentCategoryId": "Int64"}
                         ).set_index('id')\
    .replace('\xa0', ' ', regex=True)
collection.head()
len_collection = len(collection)
collection.tail()

Unnamed: 0_level_0,externalId,slug,title,excerpt,intro,imageUrl,publishedAt,status,createdAt,updatedAt,curationCategoryId,IABChildCategoryId,IABParentCategoryId,language
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1
1233,7acf927a-199d-434c-9145-51fdaf735d92,lese-reise-guide-hamburg-deutschland,"Alte Kämpfe, neue Schönheit: Eine Reise nach H...",Eine wunderschöne Stadt ist Hamburg sicher. Ab...,*Bildquelle: Westend61 / Getty Images*,https://s3.us-east-1.amazonaws.com/pocket-coll...,2024-05-23 09:50:16.0,PUBLISHED,2024-05-15 09:23:07.0,2024-05-23 09:50:15.885,16.0,178.0,149.0,DE
1234,cad8d293-d18e-456f-92fc-488e73bf5930,linsensuppen-von-dal-omas-eintopf-und-turkisch...,"Linsensuppen – Dal, Omas Eintopf, Ottolenghi, ...",,Diese Rezepte aus aller Welt sind so vielfälti...,,,DRAFT,2024-05-23 20:00:31.0,2024-05-23 20:13:08.104,6.0,147.0,130.0,DE
1235,5cd02276-00e0-4260-9aa0-8cb27f11c498,pocket-pride-reads,Spotlight on LGBTQIA+ Voices,"Explore fascinating profiles, thoughtful perso...",TKTKT intro text.\n\nImage credit.,https://s3.us-east-1.amazonaws.com/pocket-coll...,2024-05-23 23:36:30.0,PUBLISHED,2024-05-23 23:12:24.0,2024-05-23 23:48:36.510,,,,EN
1236,19818299-b88c-44a4-94b5-4f642149f9ca,pocket-pride-geschichten,LGBTQIA+ Storys im Scheinwerferlicht,Queere Stimmen,TKTKTK intro text\n\nBildquelle: X,https://s3.us-east-1.amazonaws.com/pocket-coll...,2024-05-23 23:48:02.0,PUBLISHED,2024-05-23 23:47:37.0,2024-05-28 13:01:05.779,,,,DE
1237,e7bc6a3f-8371-4a86-9ee5-a32cf55176fe,resteverwertung-so-gehts-essensreste-verarbeiten,Nicht wegschmeißen! Wie du Essensreste am best...,"Nachwachsen lassen, Chips und Suppen daraus ma...",*Bildquelle: Flavia Morlachetti / Getty Images*,https://s3.us-east-1.amazonaws.com/pocket-coll...,,REVIEW,2024-05-26 08:23:22.0,2024-05-27 15:31:28.765,6.0,147.0,130.0,DE


In [3]:
collection_author_join = pd.read_json('./collections_dump/_CollectionToCollectionAuthor.json')\
    .rename(columns={'A': 'CollectionId', 'B': 'AuthorId'})
# Checking if there are multiple authors for one collection or not
# It's not hard to model but if there aren't any... why bother...
multi_authors = len(collection_author_join['CollectionId']) - collection_author_join['CollectionId'].nunique()
if multi_authors == 0:
    print("There aren't multiple authors for one collection")
else: 
    print("Gotta do the modeling for multiple authors")

There aren't multiple authors for one collection


In [4]:
# Non-multiple author join

collections = collection.merge(collection_author_join, left_index=True, right_on='CollectionId', how='left').sort_index()
assert len(collections) == len_collection

# Process authors
authors = pd.read_json('./collections_dump/CollectionAuthor.json').set_index('id')[['name']].rename(columns={"name":"AuthorName"})
collections = collections.merge(authors, right_index=True, left_on='AuthorId')
assert len(collections) == len_collection

# Process categories
categories = pd.read_json('./collections_dump/CurationCategory.json').set_index('id')[['name']].rename(columns={"name": "CategoryName"})
collections = collections.merge(categories, right_index=True, left_on='curationCategoryId', how='left')
assert len(collections) == len_collection

# Process IAB
iab = pd.read_json('./collections_dump/IABCategory.json').set_index('id')[['name']].rename(columns={"name":"IABCategory"})
collections = collections.merge(iab, right_index=True, left_on='IABChildCategoryId', how='left').rename(columns={"IABCategory":"IABChildName"})
collections = collections.merge(iab, right_index=True, left_on='IABParentCategoryId', how='left').rename(columns={"IABCategory":"IABParentName"})
assert len(collections) == len_collection

# Process collection labels

collection_labels = pd.read_json("./collections_dump/CollectionLabel.json")[['collectionId', 'labelId']]
labels = pd.read_json("./collections_dump/Label.json").set_index('id')[['name']]
# multiple labels can be applied to one collection
agg_labels = collection_labels.merge(labels, left_on='labelId', right_index=True).groupby('collectionId').agg({'name': lambda x: list(x)}).rename(columns={"name":"LabelName"})
collections = collections.merge(agg_labels, right_index=True, left_index=True, how='left')
assert len(collections) == len_collection

# there are only 'en' and 'de' languages, so we don't need to filter
# just build the corpus appropriately with string concatenation
collections['CorpusIndex'] = collections['language'].apply(lambda x: f'corpus_{x.lower()}')

In [5]:
def collection_to_doc(row):
    return {
        "meta": { '_id': row['externalId'], '_index': row['CorpusIndex']},
        'fields': {
            'title': row['title'],
            'url': row['slug'],
            'excerpt': row['excerpt'],
            'is_syndicated': False,
            'publisher': 'Pocket',
            'authors': row['AuthorName'],
            'published_at': row['publishedAt'] if row['publishedAt'] == None else round(dt.fromisoformat(row['publishedAt']).timestamp()),
            'is_collection': True,
            'is_collection_story': False,
            'created_at': round(dt.fromisoformat(row['createdAt']).timestamp()),
            'collection_labels': row['LabelName'],
            'language': row['language'],
            'curation_category': row['CategoryName'],
            'iab_child': row['IABChildName'],
            'iab_parent': row['IABParentName'],
            'status': row['status']
        }
    }

def collection_story_to_doc(row):
        return {
        "meta": { '_id': row['externalId'], '_index': row['CorpusIndex']},
        'fields': {
            'title': row['title'],
            'url': row['url'],
            'excerpt': row['excerpt'],
            'publisher': row['publisher'],
            'authors': row['AuthorName'],
            'is_collection_story': True,
            'parent_collection_id': row['parentId'],
            'created_at': round(dt.fromisoformat(row['CollectionCreatedAt']).timestamp()),
            'collection_labels': row['LabelName'],
            'language': row['language'],
            'curation_category': row['CategoryName'],
            'iab_child': row['IABChildName'],
            'iab_parent': row['IABParentName'],
            'status': row['status']
        }
    }

In [6]:
collection_payloads = list(collections.replace({np.nan: None}).apply(lambda x: collection_to_doc(x), axis=1))
with open('collection_payloads.json', 'w') as f:
    json.dump(collection_payloads, f)

# Collection Stories Processing

In [8]:
stories = pd.read_json('./collections_dump/CollectionStory.json').set_index('id')
collections_meta = collections.copy(deep=True)[
    ['externalId', 'status', 'IABParentName', 'IABChildName', 'CategoryName', 'LabelName', 'createdAt', 'language', 'CorpusIndex']
].rename(columns={'externalId': 'parentId', 'createdAt': 'CollectionCreatedAt'})
orig_story_len = len(stories)

In [9]:
stories = stories.merge(collections_meta, left_on='collectionId', right_index=True,
                        how='inner') # don't upload orphaned collection stories (there are some -- I don't know why)
stories_len = len(stories)

In [10]:
print(f"orphaned collections: {orig_story_len - stories_len}")
print(orig_story_len)
print(stories_len)

orphaned collections: 15
11808
11793


In [13]:
story_authors = pd.read_json('./collections_dump/CollectionStoryAuthor.json')\
    [['name', 'collectionStoryId']]\
    .rename(columns={'name':"AuthorName"})\
    .groupby('collectionStoryId')\
    .agg({'AuthorName': lambda x:  list(x)})
    
stories = stories.merge(story_authors, left_index=True, right_index=True, how='left')
assert stories_len == len(stories)

In [14]:
stories_payloads = list(stories.replace({np.nan: None}).apply(lambda x: collection_story_to_doc(x), axis=1))
with open('collection_story_payloads.json', 'w') as f:
    json.dump(stories_payloads, f)

# Corpus Processing

In [15]:
items = pd.read_json('./corpus/ApprovedItem.json').set_index('id')
len_items = len(items)

In [16]:
# all the languages are valid, so we don't need to filter
# just build the corpus appropriately with string concatenation
items['CorpusIndex'] = items['language'].apply(lambda x: f'corpus_{x.lower()}')

In [17]:
authors = pd.read_json('./corpus/ApprovedItemAuthor.json')[['name', 'approvedItemId']]\
    .groupby('approvedItemId')\
    .agg({'name': lambda x: list(x)})\
    .rename(columns={'name':'AuthorName'})
items = items.merge(authors, right_index=True, left_index=True, how='left')\
    .replace('\xa0', ' ', regex=True)\
    .replace({np.nan: None})
assert len(items) == len_items

In [18]:
def corpus_to_doc(row):
    return {
        "meta": { '_id': row['externalId'], '_index': row['CorpusIndex']},
        'fields': {
            'title': row['title'],
            'url': row['url'],
            'excerpt': row['excerpt'],
            'is_syndicated': row['isSyndicated'] == 1,
            'publisher': row['publisher'],
            'authors': row['AuthorName'],
            'published_at': row['datePublished'] if row['datePublished'] is None else round(dt.fromisoformat(row['datePublished']).timestamp()),  # this isn't a UTC timestamp, but this is the best I can do...
            'is_collection': row['isCollection'] == 1,
            'is_collection_story': False,
            'created_at': round(dt.fromisoformat(row['createdAt']).timestamp()),
            'topic': row['topic'],
            'language': row['language'],
            'status': row['status']
        }
    }

In [19]:
items_payloads = list(items.apply(lambda x: corpus_to_doc(x), axis=1))
with open('items_payloads.json', 'w') as f:
    json.dump(items_payloads, f)

# Load into Elasticsearch

In [22]:
!pip install elasticsearch==7.10.1
!pip install tqdm

Collecting elasticsearch==7.10.1
  Downloading elasticsearch-7.10.1-py2.py3-none-any.whl.metadata (8.0 kB)
Downloading elasticsearch-7.10.1-py2.py3-none-any.whl (322 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m322.1/322.1 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: elasticsearch
  Attempting uninstall: elasticsearch
    Found existing installation: elasticsearch 7.10.0
    Uninstalling elasticsearch-7.10.0:
      Successfully uninstalled elasticsearch-7.10.0
Successfully installed elasticsearch-7.10.1

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3.11 -m pip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice

In [23]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
import tqdm
from copy import deepcopy
import os

In [24]:
def flatmap(func, *iterable):
    return itertools.chain.from_iterable(map(func, *iterable))

In [33]:
# replace this with appropriate endpoint
# client = Elasticsearch("http://localhost:4566/user-list-search")

In [34]:
print("indexing documents...")
progress = tqdm.tqdm(unit="docs", total=len(collection_payloads) + len(stories_payloads) + len(items_payloads))
def create_doc(data):
    doc = deepcopy(data['fields'])
    doc['_id'] = data['meta']['_id']
    doc['_index'] = data['meta']['_index']
    return doc

successes = 0
for ok, action in streaming_bulk(
    client=client, actions=map(create_doc, itertools.chain.from_iterable([collection_payloads, stories_payloads, items_payloads]))
):
    progress.update(1)
    successes += ok
print(f"indexed {successes} documents")


indexing documents...



160269docs [08:21, 319.66docs/s]                                                                                                                               | 0/160268 [00:00<?, ?docs/s][A

  0%|                                                                                                                                               | 1/160268 [00:01<52:57:59,  1.19s/docs][A
  0%|▍                                                                                                                                              | 501/160268 [00:01<06:29, 409.95docs/s][A
  1%|▉                                                                                                                                             | 1001/160268 [00:01<03:48, 696.44docs/s][A
  1%|█▎                                                                                                                                            | 1501/160268 [00:02<02:57, 893.30docs/s][A
  1%|█▊                               

indexed 160268 documents



100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 160268/160268 [02:38<00:00, 1002.12docs/s][A