In [70]:
import boto3
import json
from urllib.parse import unquote

In [71]:
aws_region = ''
aws_account_id = ''

In [72]:
event = {}

In [73]:
loader_queue_url = "https://sqs.{}.amazonaws.com/{}/nept-mlops-dev-gremlin-csv-loader".format(aws_region, aws_account_id)
s3_bucket = "nept-mlops-dev-{}".format(aws_account_id)
input_path = "wikimedia-events/raw-events"
output_path = "wikimedia-events/gremlin-csv"

In [74]:
s3_client = boto3.client('s3')
sqs_client = boto3.client('sqs')

In [75]:
keys = list(map(lambda x: unquote(x['s3']['object']['key']), event['Records']))
keys = list(filter(lambda x: x.startswith(input_path), keys))

In [76]:
key = keys[0]

In [77]:
import uuid

def get_user_vertex_id(event):
    if 'user' in event:
        return "user:{}".format(event['user'])
    
def get_title_vertex_id(event):
    if 'title' in event:
        return "title:{}".format(event['title'])

class UserVertexConverter(object):
    def header(self):
        return "~id, ~label"
    def convert(self, event):
        user_vertex_id = get_user_vertex_id(event)
        if user_vertex_id:
            return [ 
                "\"{}\",user".format(user_vertex_id)
            ]
    
class TitleVertexConverter(object):
    def header(self):
        return "~id, ~label"
    def convert(self, event):
        title_vertex_id = get_title_vertex_id(event)
        if title_vertex_id:
            return [
                "\"{}\",title".format(title_vertex_id)
            ]
    
class InteractionEdgeConverter(object):
    def header(self):
        return "~id, ~from, ~to, ~label"
    def convert(self, event):
        if 'type' in event:
            user_vertex_id = get_user_vertex_id(event)
            title_vertex_id = get_title_vertex_id(event)
            if user_vertex_id and title_vertex_id:
                return [
                    "\"{}\",\"{}\",\"{}\",\"{}\"".format(
                        str(uuid.uuid4()),
                        user_vertex_id,
                        title_vertex_id,
                        event['type'])
                ]

class Converters(object):
    user_vertex_converter = UserVertexConverter()
    title_vertex_converter = TitleVertexConverter()
    interaction_edge_converter = InteractionEdgeConverter()
    
    def to_vertexes(self):
        return [
            ('user', self.user_vertex_converter),
            ('title', self.title_vertex_converter)
        ]
        
    def to_edges(self):
        return [
            ('interaction', self.interaction_edge_converter)
        ]

In [78]:
def process_content_with_converter(content_json, converter_type, converter_name, converter, original_key_suffix):
    output_key = "{}{}-{}-{}".format(output_path, original_key_suffix, converter_type, converter_name)
    converted_lines = [converter.header()]
    for line in content_json:
        new_lines = converter.convert(line)
        if new_lines:
            for line in new_lines:
                converted_lines.append(line)
    converted_content = "\n".join(converted_lines)
    s3_client.put_object(Body=converted_content.encode('utf-8'), Bucket=s3_bucket, Key=output_key)
    return output_key

In [79]:
def process_file(key):
    original_key_suffix = key[len(input_path):-1]
    content = s3_client.get_object(Bucket=s3_bucket, Key=key)['Body'].read().decode('utf-8')
    content_json = list(map(lambda x: json.loads(x), content.splitlines()))
    vertex_keys = []
    edge_keys = []
    for (converter_name, converter) in Converters().to_vertexes():
        output_key = process_content_with_converter(content_json, 'vertexes', converter_name, converter, original_key_suffix)
        vertex_keys.append(output_key)
    for (converter_name, converter) in Converters().to_edges():
        output_key = process_content_with_converter(content_json, 'edges', converter_name, converter, original_key_suffix)
        vertex_keys.append(output_key)
    loader_message = {
        "vertex_files": list(map(lambda x: "s3://{}/{}".format(s3_bucket, x), vertex_keys)),
        "edge_files": list(map(lambda x: "s3://{}/{}".format(s3_bucket, x), edge_keys))
    }
    sqs_client.send_message(
        QueueUrl=loader_queue_url,
        MessageBody=json.dumps(loader_message)
    )

In [80]:
process_file(key)