In [1]:
import boto3
import pyspark
import math
import pandas as pd

In [2]:
sc = pyspark.SparkContext(appName="flaflu")
sc

In [3]:
def limpa_conteudo(conteudo):
    to_remove = [
        "!", ".", ",", ":", "@", "#", "$", "%", "/", "\\", "|", "´", "`",
        "*", "&", "(", ")", "[", "]", "}", "{", "+", "-", "<", ">", "?", 
        "°", "=", '"', "_", "'", ";", "^", "~", "¨",
    ]
    for i in to_remove:
        conteudo = conteudo.replace(i, " ")
    return conteudo

In [4]:
def conta_documento(item):
    conteudo = limpa_conteudo(item[1])
    palavras = conteudo.strip().split()
    palavras_ = [i for i in palavras if i.isalpha()]
    palavras_filtradas = [i for i in palavras_ if len(i) > 3]
    return [(i.lower(), 1) for i in set(palavras_filtradas)]

In [5]:
def calcula_idf(item):
    palavra, contagem = item
    idf = math.log10(N_docs / contagem)
    return (palavra, idf)

In [6]:
def filtra_doc(item):
    contagem = item[1]
    return (contagem < DOC_COUNT_MAX) and (contagem > DOC_COUNT_MIN)

In [7]:
def conta_palavra(item):
    conteudo = limpa_conteudo(item[1])
    palavras = conteudo.strip().split()
    palavras_ = [i for i in palavras if i.isalpha()]
    palavras_filtradas = [i for i in palavras_ if len(i) > 3]
    return [(i.lower(), 1) for i in palavras_filtradas]

In [8]:
def calcula_freq(item):
    palavra, contagem = item
    freq = math.log10(1 + contagem)
    return (palavra, freq)

In [9]:
def gera_rdd_freq(rdd, palavra):
    rdd_freq = (
        rdd.filter(lambda x: palavra in x[1])
        .flatMap(conta_palavra)
        .reduceByKey(lambda x, y: x + y)
        .map(calcula_freq)
    )
    return rdd_freq

In [10]:
def gera_relevancia(rdd_freq, rdd_idf):
    relevancia = rdd_freq.join(rdd_idf).map(lambda x: (x[0], x[1][0] * x[1][1]))
    return relevancia

In [11]:
def pega_top_100(rdd):
    return rdd.takeOrdered(100, key=lambda x: -x[1])

In [12]:
rdd = sc.sequenceFile("part-00000")
N_docs = rdd.count()
N_docs

36133

In [13]:
DOC_COUNT_MIN = 10
DOC_COUNT_MAX = N_docs * 0.7

In [16]:
rdd_idf = (
    rdd.flatMap(conta_documento)
    .reduceByKey(lambda x, y: x + y)
    .filter(filtra_doc)
    .map(lambda x: (x[0], math.log10(N_docs / x[1])))
)
idf_samples = rdd_idf.take(5)
df_idf = pd.DataFrame()
palavras = []
idfs = []
for sample in idf_samples:
    palavras.append(sample[0])
    idfs.append(sample[1])
df_idf["Palavra"] = palavras
df_idf["IDF"] = idfs
df_idf.to_excel("AmostraIDF.xlsx")

In [29]:
rdd_freq_fla = gera_rdd_freq(rdd, "flamengo")
rdd_freq_flu = gera_rdd_freq(rdd, "fluminense")
rdd_freq_flaflu = rdd_freq_fla.intersection(rdd_freq_flu)
rdd_freq_flaOnly = rdd_freq_fla.subtractByKey(rdd_freq_flaflu)
rdd_freq_fluOnly = rdd_freq_flu.subtractByKey(rdd_freq_flaflu)

freqNorm_FLA = rdd_freq_fla.take(5)
df_freqNormFLA = pd.DataFrame()
palavrasFLA = []
freqFLA = []
for sample in freqNorm_FLA:
    palavrasFLA.append(sample[0])
    freqFLA.append(sample[1])
df_freqNormFLA["Palavra"] = palavrasFLA
df_freqNormFLA["Freq. Normalizada"] = freqFLA
df_freqNormFLA.to_excel("AmostraFreqFLA.xlsx")

freqNorm_FLU = rdd_freq_flu.take(5)
df_freqNormFLU = pd.DataFrame()
palavrasFLU = []
freqFLU = []
for sample in freqNorm_FLU:
    palavrasFLU.append(sample[0])
    freqFLU.append(sample[1])
df_freqNormFLU["Palavra"] = palavrasFLU
df_freqNormFLU["Freq. Normalizada"] = freqFLU
df_freqNormFLU.to_excel("AmostraFreqFLU.xlsx")

In [None]:
rdd_relevancia = gera_relevancia(rdd_freq_flaflu, rdd_idf)
rdd_relevanciaFLA = gera_relevancia(rdd_freq_flaOnly, rdd_idf)
rdd_relevanciaFLU = gera_relevancia(rdd_freq_fluOnly, rdd_idf)

top_relevancia = pega_top_100(rdd_relevancia)
top_relevanciaFLA = pega_top_100(rdd_relevanciaFLA)
top_relevanciaFLU = pega_top_100(rdd_relevanciaFLU)

tops = [top_relevancia, top_relevanciaFLA, top_relevanciaFLU]
csv_names = ["top100_intersection.csv", "top100_FLA.csv", "top100_FLU.csv"]
# csv_names = [
#     "brasil_top100_intersection.csv",
#     "brasil_top100_FLA.csv",
#     "brasil_top100_FLU.csv",
# ]

for top, name in zip(tops, csv_names):
    df = pd.DataFrame(top, columns=["Palavra", "Relevancia"])
    # df.to_csv(f"s3://megadados-alunos/matheus-pedro/{name}")
    df.to_csv(f"{name}")

In [None]:
sc.stop()