# 81. Introdução ao Spark

**Spark**
> NPL com Spark
> 
> Utilizando bibliotecas Nativas de ML do Spark
> 
> Classificação: Spam

**Como Utilizar Spark**
> Instalar a versão Open Source
>> https://spark.apache.org/
>
> Provedor em Nuvem
>> AWS
>>
>> Azure
>>
>> Databricks

**Databricks**
> Dos criadores do Spark
>
> Community Edition
>
> https://community.cloud.databricks.com/login.html

**Spark**
> Ferramenta de processamento de dados distribuidos em clusters
> 
> Roda em memória
> 
> Veloz
> 
> Escalável
> 
> Particionável
>
> Escala horizontal - Cluster
>
> Replicação/Tolerância a Falha
> 
> Particionamento

**Spark vs Python, R ou Banco de Dados**
> Custo computacional: CPU, Memória, Rede, Etc.
> 
> Arquitetura voltada a processar dados.
> 
> Melhor performance, porém não substitui o Python, SQL ou um SGBD.

**Linguagens**
> Scala.
> 
> Java.
> 
> Python.
> 
> R.
> 
> SQL.

**Por que Spark?**
> NLP são tarefas com alto custo computacional
> 
> Spark tem alta performance pela sua natureza 'distribuida'.
> 
> Com pyspark, tem-se tudo do Python+Spark.

**Arquitetura e Componentes**
> Machine Learning (Mlib).
> 
> SQL (Spark SQL).
> 
> Processamento em Streaming.
> 
> Processamento de Grafos (GraphX).

**Spark SQL**
> Permite ler dados tabulares de várias fontes (CSV, Json, Parquet, Orc, etc).
> 
> Pode usar sintaxe SQL.

**Streaming: Spark Structured Streaming**
> Dados estruturados.

**Grafos acíclicos dirigidos**
> O spark constrói gráficos acíclicos dirigidos.

**Elementos**
> SparkSession: Sessão.
> 
> Aplication: Programa.

**Transformações e Ações**
> Um dataframe é imutável: traz tolerância a falha.
> 
> Uma transformação gera um novo dataframe.
> 
> O processamento de transformação de fato só ocorre quando há uma ação: Lazy evaluation.

**Lazy Evaluation**
> Filter
>> Union
>>> Sample
>>>> Show



# 82. Etapas de Processamento

> Ingressar no Databricks
> 
> Criar um Cluster
> 
> Importar Dados e Criar Tabela
> 
> Criar Notebook

**Pipeline**
> Importar módulos
> 
> Criar sessão do Spark
> 
> Criar dataframe do Spark
> 
> Transformar variável dependente (category)
> 
> Tokenizar
> 
> Word2vec
> 
> Dividir treino e teste
> 
> Criar modelo RandomForest
> 
> Prever dados de teste
> 
> Avaliar Performance

# 84. Pré-Processamento

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import Tokenizer, StringIndexer, Word2Vec
spark = SparkSession.builder.appName("NLP").getOrCreate()

In [None]:
# spam = spark.sql("select * from spam")

spam = spark.read.csv(r"D:\Users\Nayan Couto\Cloud Drive\Documentos\Arquivos PDF, PPT, DOC\CURSOS\Processamento de Linguagem Natural\NLP\10_NLP_SPARK\spam.csv", inferSchema=True, header=True)

In [None]:
#spam.show(10, truncate=False)
spam.show(10)

In [None]:
stringmodel = StringIndexer(inputCol="Category", outputCol="CategoryIndex")

In [None]:
spamnew = stringmodel.fit(spam).transform(spam)

In [None]:
spamnew.show(10)

In [None]:
tokens = Tokenizer(inputCol="Message", outputCol="MessageToken")
spamtoken = tokens.transform(spamnew)

In [None]:
spamtoken.show(10)

In [None]:
spamtoken.select("MessageToken").show(10,truncate=False)

# 85. Criando e Avaliando o Modelo

In [None]:
from pyspark.ml import Pipeline

# Criar o objeto word2vec
word2vec = Word2Vec(inputCol="MessageToken", outputCol="Messagew2v")

# Criar o pipeline
pipeline = Pipeline(stages=[word2vec])

# Treinar o pipeline
spamresult = pipeline.fit(spamtoken).transform(spamtoken)

In [None]:
spamresult.show(10)

In [None]:
spamresult.select("Messagew2v").show(10, truncate=False)

In [None]:
spamTreino, spamTeste = spamresult.randomSplit([0.7,0.3])

In [None]:
spamTreino.show(10)

In [None]:
spamTeste.show(10)

In [None]:
rf = RandomForestClassifier(labelCol="CategoryIndex", featuresCol="Messagew2v", numTrees=500)

In [None]:
modelo = rf.fit(spamTreino)

In [None]:
previsoes = modelo.transform(spamTeste)

In [None]:
previsoes.show(10)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
avaliar = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="CategoryIndex", metricName="areaUnderROC")
areaUnderRoc = avaliar.evaluate(previsoes)
print(areaUnderRoc)

# Notebook Spark no databricks

https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1231607683567896/2083876346600474/7009669397555107/latest.html