In [None]:
import os

os.environ['PYWIKIBOT_DIR'] = './wiki_reader/'

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType, DoubleType

from spark_app.scorers import score_text, vader_scorer
from spark_app.spark_tools import SparkSentimentStreamer
from pathlib import Path

from ml.OutputProcessing import process_output_files

## Input

In [None]:
request = 'peano'

batch_size = 100
limit = None
preload_content = True
is_category = False
debug_info = True

wiki_page_dir = 'requests/'
spark_results_dir = 'responses/'

## Processing

In [None]:
# from ml.LogisticRegressionCached import readFromCache

# (lrModel, pipelineModel) = readFromCache('./ml/train/')

# def score_text_ml(text):
#     df = spark.createDataFrame([(text, 2)], ['text', 'target'])
#     df_transformed = pipelineModel.transform(df) # To fix
#     predictions = lrModel.transform(df_transformed)
#     predictions = predictions.select(['text', 'probability', 'prediction'])
#     pd_predictions = predictions.toPandas()
#     positive_probability = pd_predictions.iloc[0]['probability'][1]
#     overall_probability = 2 * positive_probability - 1
    
#     return overall_probability

In [None]:
def cleanup(path):
    path = Path(path)
    path.mkdir(parents=True, exist_ok=True)
    for x in path.iterdir():
        if x.is_file():
            x.unlink()
        else:
            cleanup(x)
            x.rmdir()

In [None]:
def spark_process(request, score_func):
    data_in = wiki_page_dir + request
    data_out = spark_results_dir + request
    cleanup(data_out)
    
    spark = SparkSession.builder\
        .master("local[*]")\
        .appName("NetworkWordCount")\
        .getOrCreate()
    
    sc = spark.sparkContext
    ssc = StreamingContext(sc, 1)    
    
    streamer = SparkSentimentStreamer(sc, ssc, spark, score_func, data_in, data_out,debug_info=debug_info)
    streamer.run()
    streamer.stop()

In [None]:
from concurrent.futures import ThreadPoolExecutor
import wiki_reader.reader as reader

wiki_wrapper = lambda r,b,l,cat,cont: reader.query(r,out_dir=wiki_page_dir,batch_size=b,debug_info=debug_info,
                                              limit=l,is_category=cat,preload_content=cont)

query_size = reader.query_size(request)

In [None]:
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(wiki_wrapper, request, batch_size, limit, is_category, preload_content)
    e.submit(spark_process, request, vader_scorer)
#     e.submit(process_output_files, spark_results_dir + request, query_size)