<a href="https://colab.research.google.com/github/DarthHugh/desafio_python/blob/master/desafio_python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparo do ambiente


In [None]:
!git clone https://github.com/projeto22/challenge-python.git

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
 
# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [None]:
# iniciar uma sessão local 
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()


# Dicionário

In [None]:
import os
import re
from pyspark.sql.types import StringType
from pyspark.sql.functions import monotonically_increasing_id 


files = [f for f in os.listdir('challenge-python/dataset')]

lista_final = []


for filename in files:
  with open('challenge-python/dataset/'+filename, 'r', encoding='cp1252') as f:
    # Lê - tudo minuscula - separa palavras numa lista
    words = f.read().lower().split()  

    # tira plurais e caracteres especiais, de forma que o dicionário armazene apenas as palavras
    words = [word.replace("'s", '') for word in words]
    words = [re.sub('\W+','', word ) for word in words]

    #à cada iteração Junta lista por lista de palavras
    lista_final.extend(words) 

#dataframe com todas as palavras distintas
df_palavras = sc.createDataFrame(lista_final, StringType()).distinct()

In [None]:
# Adiciona o código de cada palavra
rdd_df = df_palavras.rdd.zipWithIndex()
df_final = rdd_df.toDF()

df_final = df_final.withColumn('word', df_final['_1'].getItem("value"))
df_final = df_final.withColumnRenamed('_2','id_word')

# Monta a estrutura do dicionário
df_dicionario = df_final.select('word','id_word')
# Roube e adicionei cabeçalho ao arquivo salvo para facilitar a leitura no próximo Job :p sei nem se poderia hehehe
df_dicionario.write.mode('overwrite').option("sep","\t").option("header", "true").csv('challenge-python/dicionario')

# INDICIE REVERSO

In [None]:
import re
from pyspark.sql.types import StructType,StructField, IntegerType
import pyspark.sql.functions as sql

df_recover_dic = sc.read.csv("challenge-python/dicionario", inferSchema=True, header=True, sep='\t')
df_recover_dic = df_recover_dic.withColumn('id_word', df_recover_dic.id_word.cast('int'))

In [None]:
# Cria dataframe vazio, com dois campos
schema = StructType([
  StructField('id_word', IntegerType()),
  StructField('id_file', IntegerType()),
  ])
df_pares_final = sc.createDataFrame(sc.sparkContext.emptyRDD(),schema)

'''
Para cada arquivo - separa todas as palavras em uma lista, faz join com dicionário para recuperar os IDs 
(considerando que não tenho acesso ao script de construção do dicionário)
Monta os pares de código de palavras e código do arquivo
Incrementa no dataframe criado anteriormente '''
for filename in files:
  with open('challenge-python/dataset/'+filename, 'r', encoding='cp1252') as f:
    words = f.read().lower().split()  
    words = [word.replace("'s", '') for word in words]
    words = [re.sub('\W+','', word ) for word in words]
    pares = []

    df_pares = sc.createDataFrame(words, StringType())

    filename = int(filename)
    df_join = df_pares.join(df_recover_dic, df_pares.value == df_recover_dic.word, how='left')
    df_join = df_join.withColumn('id_file', sql.lit(filename)).select('id_word','id_file')

    df_pares_final = df_pares_final.union(df_join)

In [None]:
# Tira os distintos, ordena e agrupa listas de arquivos por código de palavra
df_pares_final = df_pares_final.distinct().orderBy('id_word','id_file')
df_pares_final = df_pares_final.filter(df_pares_final.id_word.isNotNull())
df_pares_final = df_pares_final.groupby('id_word').agg(sql.collect_list('id_file').alias('files')).orderBy('id_word')

df_pares_final.write.mode('overwrite').option("sep","\t").option("header", "true").parquet('challenge-python/indice_reverso')