In [None]:
import os
from timeit import default_timer as timer
fromm datetime import datetime

import numpy as np

os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda3/bin/python"

os.environ['HADOOP_LOG_DIR'] = '/Users/elissandro/Projetos/Especializacao/MDMR/logs'
os.environ['HADOOP_CONF_DIR'] = '/Users/elissandro/Projetos/Especializacao/MDMR/conf/spark'
os.environ['HADOOP_USER_NAME'] = 'elissandro'

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover
    
spark = SparkSession \
    .builder \
    .appName('Trabalho Final - DMR') \
    .config('spark.driver.host', '192.168.1.37') \
    .master('yarn') \
    .getOrCreate() 

In [None]:
# Ajustando locale da JVM para en-US, estava misturando os locale e ficando en-BR...
# Fonte: https://stackoverflow.com/questions/55246080/pyspark-stopwordsremover-parameter-locale-given-invalid-value
locale = spark.sparkContext._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

In [None]:
def load_file(filePath, type, delimiter = None):
    if type == 'text':
        df = spark.read.load(filePath, format='text')
    else:
        df = spark.read.csv(filePath, inferSchema=True, header=True, sep=delimiter)
    return df

def concat_all_reviews_v1(reviews):
    """
        Concatena todas as reviews (linhas no DF) em um RDD no formato (1, TEXTO).
        Retorna um RDD
    """
    rdd = reviews.rdd.map(lambda r: (1, r[0])).reduceByKey(lambda a, b: a + ' ' + b)
    return rdd

def concat_all_reviews_v2(reviews):
    """
        Concatena todas as reviews (linhas no DF) em um texto único.
        Retorna uma string
    """
    rdd = reviews.rdd.flatMap(lambda v: v[0].split()).reduce(lambda a, b: a + ' ' + b)
    return rdd

def tokenize_reviews(reviews):
    tokenization = Tokenizer(inputCol='review', outputCol='review_words')
    return tokenization.transform(reviews)

def remove_stop_words_reviews(reviews):
    stopword_removal = StopWordsRemover(inputCol='review_words',outputCol='review_words_refined')
    return stopword_removal.transform(reviews)

def counter_words_rdd_flatMap(df, index):
    """
        Filtra e conta palavras usando RDD flatMap
    """
    filtered = refined_reviews \
                .rdd \
                .flatMap(lambda v: [(w, 1) for w in v[index] if w != '' and len(w) > 3]) \
                .reduceByKey(lambda c1, c2: c1 + c2)
    return filtered

def counter_words_sql_expr(df, colunmName):
    """
        Filtra e conta palavras usando SparkSQL
    """
    from pyspark.sql.functions import expr
    
    filter_expr = f"filter({colunmName}, x -> x != '' and char_length(x) > 3)"
    filtered = df.withColumn(colunmName, expr(filter_expr)).rdd.flatMap(lambda v: v[0])

    return filtered.groupBy(colunmName).count()


In [61]:
hdfs_folder = 'hdfs://192.168.1.37:9000/trabalho-final'

local_path = '/Users/elissandro/Projetos/Especializacao/MDMR/logs/exec_logs'
if not os.path.exists(local_path):
    os.makedirs(local_folder)
    
exec_logs_folder = f"file://{local_path}"

from pathlib import Path
Path("local_folder").mkdir(parents=True, exist_ok=True)

now = datetime.now()

files = ["mobile_test.tsv"]

lines = []
exec_info = []
execs_per_file = 10

for f in files:    
    print(f"Loading file: {f}")
    content = load_file(f"{hdfs_folder}/{f}", type='tsv', delimiter='\t')
    for r in np.arange(0, execs_per_file):
        print("Round " + str(r))

        start = timer()
        print("Removing empty reviews...")
        content = content.select('review_body').filter(content.review_body != '')
        
        print("Concating content...")
        all_reviews_rdd = concat_all_reviews_v1(content)
        
        print("Creating DF to Mlib...")
        # Cria DataFrame para uso com a Mlib
        all_reviews_df = spark.createDataFrame(data=all_reviews_rdd, schema=['id', 'review'])
        
        print("Tokenization...")
        all_reviews_df_tokenized = tokenize_reviews(all_reviews_df)
        
        print("Removing StopWords...")
        refined_reviews = remove_stop_words_reviews(all_reviews_df_tokenized)
        
        print("Filter and counter Words...")
        filtered = counter_words_rdd_flatMap(refined_reviews, 3)
        
        end = timer()
        exec_time = end - start
        l = (f, str(r), "{:.4f}".format(exec_time))
        lines.append(l)
        print(f"Execution time:{exec_time} sec")
        print("---------------------------")
        
    print("Writing exec logs...")
    logs_execs = spark.createDataFrame(data=lines, schema=['file', 'round', 'time'])
    # logs_execs.write.csv(f"{exec_logs_folder}/exec_{f}_{now.strftime('%Y%m%d-%H%M%S')}.csv")
    logs_execs.toPandas() \
        .to_csv(f"exec_{now.strftime('%Y%m%d-%H%M%S')}.csv", index = False, header=True)
#        .to_csv(f"{exec_logs_folder}/exec_{now.strftime('%Y%m%d-%H%M%S')}.csv", index = False, header=True)

filtered.take(10)


Loading file: mobile_test.tsv
Round 0
Removing empty reviews...
Concating content...
Creating DF to Mlib...
Tokenization...
Removing StopWords...
Filter and counter Words...
Execution time:1.0862637780010118 sec
---------------------------
Round 1
Removing empty reviews...
Concating content...
Creating DF to Mlib...
Tokenization...
Removing StopWords...
Filter and counter Words...
Execution time:0.7496607160010171 sec
---------------------------
Round 2
Removing empty reviews...
Concating content...
Creating DF to Mlib...
Tokenization...
Removing StopWords...
Filter and counter Words...
Execution time:1.0919096670004365 sec
---------------------------
Round 3
Removing empty reviews...
Concating content...
Creating DF to Mlib...
Tokenization...
Removing StopWords...
Filter and counter Words...
Execution time:1.7772060750012315 sec
---------------------------
Round 4
Removing empty reviews...
Concating content...
Creating DF to Mlib...
Tokenization...
Removing StopWords...
Filter and cou

[('advertised.', 4),
 ('everything', 3),
 ('works', 16),
 ('perfectly,', 1),
 ('happy', 4),
 ('camera.', 1),
 ('matter', 1),
 ('fact', 1),
 ('going', 2),
 ('another', 4)]

In [None]:
spark.stop()

In [54]:


from pathlib import Path
Path('local_folder').mkdir(parents=True, exist_ok=True)


In [56]:

print(local_folder)
if not os.path.exists(local_folder):
    os.makedirs(local_folder)

/Users/elissandro/Projetos/Especializacao/MDMR/logs/exec_logs
