In [1]:
import logging
import pyspark
import subprocess
import click
import traceback
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark import SparkConf, SparkContext

def _get_spark_handle():

    _spark = pyspark.sql.SparkSession.builder \
            .master("local[%d]" % 2) \
            .appName(
            "Spark Preprocessing") \
            .config("spark.executor.memory", "4g") \
            .config("spark.sql.execution.arrow.enable", "true") \
            .config("spark.ui.showConsoleProgress", "true") \
            .getOrCreate()

    return _spark


def _spark_handle():
    spark_handle = _get_spark_handle()
    return spark_handle


def _read_file(path):
    df = _spark_handle().read.options().json(path)
    return df


df = _read_file('/Users/allen.wu/git/kkstream/StepContent__nocontent_title_desc')


In [2]:
df.select("url", "title", "terms", "imageUrl", "traffic").show(20)

+--------------------+--------------------+--------------------+--------------------+-------+
|                 url|               title|               terms|            imageUrl|traffic|
+--------------------+--------------------+--------------------+--------------------+-------+
|http://www.nbcsan...|Travel Insurance:...|C~Penalty|27714|U...|https://media.nbc...|    151|
|https://www.rd.co...|Travel Jokes - Fu...|E~$99|8388|U^^Qua...|                null|   2215|
|https://petapixel...|Travel Like a Pho...|E~founder|137935|...|                null|   2033|
|https://www.rd.co...|Travel Point Perk...|C~Airline|46569|U...|                null|    897|
|https://www.rd.co...|Travel Tips: 10 S...|                null|                null|    183|
|https://www.rd.co...|Travel Tips: Avoi...|                null|                null|   1745|
|https://www.sfgat...|Travel Troublesho...|C~Want|128004|U^^...|https://s.hdnux.c...|    302|
|https://www.sfgat...|Travel Troublesho...|E~Sea Grottoes|6|

In [4]:
df.printSchema()
    

root
 |-- articleIds: string (nullable = true)
 |-- countryCode: string (nullable = true)
 |-- createTime: string (nullable = true)
 |-- dailyTraffic: string (nullable = true)
 |-- day_count: string (nullable = true)
 |-- entities: string (nullable = true)
 |-- first_level_taxonomy: string (nullable = true)
 |-- imageUrl: string (nullable = true)
 |-- original_url: string (nullable = true)
 |-- publisherIds: string (nullable = true)
 |-- taxonomy: string (nullable = true)
 |-- terms: string (nullable = true)
 |-- title: string (nullable = true)
 |-- title_desc: string (nullable = true)
 |-- traffic: long (nullable = true)
 |-- url: string (nullable = true)



In [14]:
import datetime
from taboola.common.datasethelper.document_reader import DocumentReader


def datetime_stamp():
    dt = datetime.datetime.now()
    return dt.strftime('%Y-%m-%d %H:%M:%S')


def print_dt_stamped(value):
    print datetime_stamp() + ": " + value


def load_train_docs(lines_file_dir):
    document_reader = DocumentReader(lines_file_dir, header_skip=0, batch_size=10000, max_words=None, shuffle=False,
                                     min_words=None).open_reader()
    if document_reader is None:
        raise IOError('Failed reading documents from train_file_dir {}.'.format(lines_file_dir))
    doc_counts = 0
    docs = []
    while not document_reader.is_eof():
        doc_tags, documents, _ = document_reader.get_batch()
        if not doc_tags:
            break
        for i, doc_tag in enumerate(doc_tags):
            words = ' '.join(documents[i][1])
            docs.append(words)
        doc_counts += len(doc_tags)
        print_dt_stamped("documents read: " + str(doc_counts) + ", first index: " + str(doc_tags[0][0]))
    document_reader.close_reader()
    return docs

train_docs = load_train_docs("/Users/jun.x/devfiles/topic_disco/20180323_106/all/StepLines__nocontent_v2")


In [16]:
for article in articles[:10]:
    print article.get_url()
for doc in train_docs[:10]:
    print str(doc)

In [17]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

def index_articles(es_endpoint, index_name, articles, train_docs):

    es = Elasticsearch(hosts=[es_endpoint], timeout=5000)
    
    count = 0
    ACTIONS = []
    for i, article in enumerate(articles):
        action = {
            "_index": index_name,
            "_type": "article",
            "_source": {
                "url": article.get_url(),
                "title": article.get_title(),
                "words": train_docs[i],
                "image_url": article.get_image_url(),
                "traffic": article.get_traffic()
            }
        }
        ACTIONS.append(action)
        if (i > 0 and i % 10000 == 0):
            print "Indexing " + str(len(ACTIONS)) + "documents, i: " + str(i)
            success, _ = bulk(es, ACTIONS, index = index_name, raise_on_error = True)
            count += success
            ACTIONS = []

    success, _ = bulk(es, ACTIONS, index = index_name, raise_on_error=True)
    count += success
    print("insert %s lines" % count)

index_articles("127.0.0.1:9200", "topic_disco_full", articles, train_docs)