<a href="https://colab.research.google.com/github/grfgrf/exercicios/blob/main/pyspark02-txtMapReduce-GoogleColab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Download/instalação dependências Spark/Pyspark
*primeira run aprox 60segundos

1.   apt-get openjdk-8-jdk-headless
2.   download spark-3.2.0-bin-hadoop3.2.tgz
3.   unzip spark-3.2.0-bin-hadoop3.2    
4.   download sherlock.txt
5.   pip install findspark (pyspark)
6.   set JAVA_HOME e SPARK_HOME paths
7.   cria variavel "spark" como sparkSession



In [None]:
%%bash
#***IFs apenas para o notebook não executar novamente em caso de run all cells.

#verifica se openjdk está instalado
if (dpkg -l | grep -qw openjdk-8-jdk-headless) then 
  echo "Ja instalado - openjdk-8-jdk-headless" 
else 
  apt-get install openjdk-8-jdk-headless -qq > /dev/null | echo "openjdk-8-jdk-headless - instalado com sucesso" 
fi

In [None]:
%%bash
#download spark-hadoop
if [ -f "spark-3.2.0-bin-hadoop3.2.tgz" ]; then
  echo "Ja baixado - spark-3.2.0-bin-hadoop3.2.tgz"
else 
  wget -q  https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz | echo "spark-3.2.0-bin-hadoop3.2.tgz - baixado com sucesso"
fi

In [None]:
%%bash
#extrair spark-hadoop
if [ -d "spark-3.2.0-bin-hadoop3.2" ]; then
  echo "Ja descompactado - Pasta spark-3.2.0-bin-hadoop3.2"
else 
  tar xf spark-3.2.0-bin-hadoop3.2.tgz | echo "spark-3.2.0-bin-hadoop3.2.tgz - descompactado com sucesso"
fi

In [None]:
%%bash
#download sherlock.txt
if [ -f "/content/sample_data/sherlock.txt" ]; then
  echo "Ja baixado - sherlock.txt"
else 
  wget -q -O /content/sample_data/sherlock.txt https://www.gutenberg.org/files/1661/1661-0.txt | echo "sherlock.txt - baixado com sucesso"
fi

In [None]:
#usa folder do spark como lib pyspark
try:
    findspark
except NameError:
    !pip install -q findspark
    import findspark
    findspark.init('spark-3.2.0-bin-hadoop3.2')
    print("findspark - instalado com sucesso ")
else:
    print("já instalado - findspark")

In [6]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [None]:
#cria spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

# TOP 10 palavras com RDD

In [None]:
#Exemplo1 com algumas funções Spark
#atencao para unicodes e como limpar caracteres especiais
import re

#wholeTextFiles lê o arquivo inteiro em uma tupla. lista[(path,texto_inteiro)] 
rddExemplo1 = spark.sparkContext.wholeTextFiles('/content/sample_data/sherlock.txt')

#Trata o texto como no exemplo da documentacao: \s+
rddExemplo1 = rddExemplo1.map(lambda x : re.sub("\s+"," ",x[1]))

#Split por espaço em branco
rddExemplo1 = rddExemplo1.flatMap(lambda x: x.split(" "))

#Adiciona coluna com valor 1 para reduce (exemplo MRJob)
rddExemplo1 = rddExemplo1.map(lambda x: (x,1)) 
rddExemplo1.take(5)


In [None]:
#Exemplo1 - Solucao1
# RDD final com ReduceByKey (reduz keys iguais e soma values)
solucao1 = rddExemplo1.reduceByKey(lambda a,b: a+b)

#TOP 10
solucao1.sortBy(lambda x: x[1],ascending=False).take(10)

In [None]:
#Exemplo1 - Solucao2
#coleta rdd em lista
from operator import itemgetter
solucao2 = rddExemplo1.groupByKey().mapValues(len).collect()

#top10
solucao2.sort(key=itemgetter(1),reverse=True)
solucao2[0:10]

# TOP 10 palavras com DataFrame

In [None]:
#Exemplo da documentacao Spark - DataFrame
from pyspark.sql.functions import *

dfExemplo2 =spark.read.format('text').load('/content/sample_data/sherlock.txt')

wordCounts = dfExemplo2.select(explode(split(dfExemplo2.value, "\s+")).alias("word")).groupBy("word").count()

#top 10
wordCounts.filter(wordCounts['word']!="").orderBy(['count'],ascending=False).show(10)


# TOP 10 palavras com DataFrame + SQLquery

In [None]:
from pyspark.sql import Row

rddExemplo3 =spark.read.text('/content/sample_data/sherlock.txt').rdd

#Split RDD nos espaços em branco
rddExemplo3 = rddExemplo3.flatMap(lambda x: x[0].split(" "))

#add nome Coluna para converter em Dataframe
dfExemplo3 = rddExemplo3.map(Row("palavra")).toDF()

#Precisa criar tabela/view temporaria para executar queries SQL
dfExemplo3.createOrReplaceTempView('tabelaTemp')

#Query 
queryTop10 = """SELECT palavra,
                       COUNT(*) as qnt
                  FROM tabelaTemp
                 WHERE palavra <> ""           
              GROUP BY palavra
              ORDER BY 2 DESC
                 LIMIT 10
             """

#Executa query
spark.sql(queryTop10).show()

#dropa tabela temporaria
spark.catalog.dropTempView('tabelaTemp')

#TOP 10 palavras com DataFrame + Python list

In [None]:
from collections import Counter

#leitura .txt
#Dataframe com 1 COL e cada ROW sendo uma linha do .txt
dfExemplo4 =spark.read.text('/content/sample_data/sherlock.txt')


#coluna "value" do Dataframe para python list[]
listLivro = list(dfExemplo4.select("value").toPandas()["value"])


#Formata list[linhaTxt1,...,linhaTxtN]
#   para list[palavra1,...,palavraN]
def listPalavras(listLivro):
  resultado = []
  for paragrafo in listLivro:
   [resultado.append(palavra) for palavra in paragrafo.split()]
  return resultado    

#Conta ocorrencias com collections Counter
contaPalavra1 = Counter(listPalavras(listLivro))

#TOP 10
contaPalavra1.most_common(10)
