# Modelo Spark Apache

### Importe o FindSpark

In [1]:
# Import findspark
# Importar o módúlo findspark de modo à utilizar o  
# método .init, responsável por inicializar o spark.
import findspark
findspark.init()

### Importe SparkSession

In [3]:
# Import SparkSession
# Após o spark ter sido encontrado no sistema, é necessário criar 
# a Sessão Spark, onde é possível configurar os nós do cluster, 
# bem como a memória alocada para cada um deles.
from pyspark.sql import SparkSession

In [4]:
# Build the SparkSession
spark = SparkSession.builder \
   .master("local[*]") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "6gb") \
   .config('spark.sql.debug.maxToStringFields', 2000) \
   .config('spark.debug.maxToStringFields', 2000) \
   .getOrCreate()
   
sc = spark.sparkContext

### Iniciando o Desenvolvimento com Spark

##### Com a sessão spark criada, pode-se trabalhar no ambiente de desenvolvimento. 

* A primeira etapa é importar o conjunto de dados, neste caso, o arquivo chama-se “Salary_Data.csv”, contendo dados de determinados funcionários, com os seus salários e anos de experiência em determinada função.

* Note que os dados foram salvos em uma variável chamada rdd, que significa Resilient Distributed Dataset, a principal estrutura de dados do Spark.
  
* Essa estrutura permite trabalhar com computação distribuída, ou seja, os dados serão distribuídos entre os nós do cluster, e controlados pelo nó master. Desta forma pode-se processá-los em paralelo, aumentando a velocidade de processamento.

In [5]:
# Load in the data
rdd = sc.textFile('../datasets/Salary_Data.csv')

* O método .take é indicado para visualização de uma parcela dos dados. Neste caso, o retorno da função trás 2 entradas do dataset. Note que a primeira entrada é ‘1.1,39343.00’ e a segunda entrada é ‘1.3,46205.00’. Os valores 1.1 e 1.3 representam os anos de experiência de um funcionário, já as entradas 39343.00 e 46205.00 representam os seus respectivos salários.

In [6]:
rdd.take(2)

                                                                                

['1.1,39343.00', '1.3,46205.00']

* Como as entradas vieram juntas na mesma string, é necessário separar os valores pela vírgula. Deste modo, usa-se o método split(“,”), responsável por isso. Também usa-se a função map, que mapeia a operação entre parênteses para todas as linhas do rdd.

* Note que foi usado paradigma funcional de programação com a função lambda line: line.split(“,”). O Spark se beneficia deste paradigma, portanto é necessário utilizá-lo.

In [7]:
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

In [8]:
# Inspect the first line
rdd.take(1)

[['1.1', '39343.00']]

Observe que os valores foram separados pela vírgula, como esperado.

Existem os métodos .first e .top, que mostram a primeira linha, e a linha do topo, respectivamente. São métodos semelhantes.

In [9]:
# Inspect the first line 
rdd.first()

['1.1', '39343.00']

In [10]:
# Take top elements
rdd.top(1)

[['9.6', '112635.00']]

Nesta etapa, importa-se o módulo Row, onde o rdd faz a transformação para linhas do tipo Row. Essa transformação é necessária pois serão tratados os nomes das colunas, como YearsExperience e Salary, representados pela linha 0 e linha 1 respectivamente.

Mapeou-se todas as linhas para a formatação de colunas especificadas, e chamou-se o método .toDF(), onde é feita a transformação do rdd para DataFrame (semelhante ao DataFrame da biblioteca pandas).

In [11]:
# Import the necessary modules 
from pyspark.sql import Row

In [12]:
# Map the RDD to a DF
df = rdd.map(lambda line: Row(YearsExperience=line[0], Salary=line[1])).toDF()

Usando o método .show, é possível inspecionar como o DataFrame está.

In [13]:
# Show the top 20 rows 
df.show()

+--------+---------------+
|  Salary|YearsExperience|
+--------+---------------+
|39343.00|            1.1|
|46205.00|            1.3|
|37731.00|            1.5|
|43525.00|            2.0|
|39891.00|            2.2|
|56642.00|            2.9|
|60150.00|            3.0|
|54445.00|            3.2|
|64445.00|            3.2|
|57189.00|            3.7|
|63218.00|            3.9|
|55794.00|            4.0|
|56957.00|            4.0|
|57081.00|            4.1|
|61111.00|            4.5|
|67938.00|            4.9|
|66029.00|            5.1|
|83088.00|            5.3|
|81363.00|            5.9|
|93940.00|            6.0|
+--------+---------------+
only showing top 20 rows



O método .printSchema mostra algumas informações sobre os tipos de dados presentes nas colunas, conforme linha abaixo.

In [14]:
# Print the data types of all `df` columns
# df.dtypes

# Print the schema of `df`
df.printSchema()

root
 |-- Salary: string (nullable = true)
 |-- YearsExperience: string (nullable = true)



Criou-se uma função chamada convertColumn, que recebe como argumento o dataframe df, os nomes das colunas, e o novo tipo para as quais serão feitos os casts das colunas.

Logo após a criação da função, define-se a variável columns, como uma lista contendo os nomes das colunas do df, e aplica-se a função para o dataframe em si, convertendo os valores para FloatType.

In [15]:
# Import all from `sql.types`
from pyspark.sql.types import *

In [16]:
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

In [17]:
# Assign all column names to `columns`
columns = ['YearsExperience', 'Salary']

In [18]:
# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [19]:
df.show()

+-------+---------------+
| Salary|YearsExperience|
+-------+---------------+
|39343.0|            1.1|
|46205.0|            1.3|
|37731.0|            1.5|
|43525.0|            2.0|
|39891.0|            2.2|
|56642.0|            2.9|
|60150.0|            3.0|
|54445.0|            3.2|
|64445.0|            3.2|
|57189.0|            3.7|
|63218.0|            3.9|
|55794.0|            4.0|
|56957.0|            4.0|
|57081.0|            4.1|
|61111.0|            4.5|
|67938.0|            4.9|
|66029.0|            5.1|
|83088.0|            5.3|
|81363.0|            5.9|
|93940.0|            6.0|
+-------+---------------+
only showing top 20 rows



Também é possível mostrar apenas uma coluna com o método .select.

In [20]:
df.select('Salary').show(10)

+-------+
| Salary|
+-------+
|39343.0|
|46205.0|
|37731.0|
|43525.0|
|39891.0|
|56642.0|
|60150.0|
|54445.0|
|64445.0|
|57189.0|
+-------+
only showing top 10 rows



Outra operação bastante conhecida é o groupby, onde pode-se agrupar os dados por um determinado pivô. Neste caso, usa-se a coluna Salary como pivô, efetuando a contagem dos valores e ordenando-os em ordem decrescente.

In [21]:
df.groupBy("Salary").count().sort("Salary",ascending=False).show()



+--------+-----+
|  Salary|count|
+--------+-----+
|122391.0|    1|
|121872.0|    1|
|116969.0|    1|
|113812.0|    1|
|112635.0|    1|
|109431.0|    1|
|105582.0|    1|
|101302.0|    1|
| 98273.0|    1|
| 93940.0|    1|
| 91738.0|    1|
| 83088.0|    1|
| 81363.0|    1|
| 67938.0|    1|
| 66029.0|    1|
| 64445.0|    1|
| 63218.0|    1|
| 61111.0|    1|
| 60150.0|    1|
| 57189.0|    1|
+--------+-----+
only showing top 20 rows



                                                                                

Por último, temos o método .describe, que faz a descrição do df baseado nas colunas, retornando uma contagem dos elementos, a média, desvio padrão, valores mínimo e máximo.

In [22]:
df.describe().show()

+-------+------------------+------------------+
|summary|            Salary|   YearsExperience|
+-------+------------------+------------------+
|  count|                30|                30|
|   mean|           76003.0|5.3133333643277485|
| stddev|27414.429784582302| 2.837888172228781|
|    min|           37731.0|               1.1|
|    max|          122391.0|              10.5|
+-------+------------------+------------------+



Agora é hora de começar a tratar os dados de modo à deixá-los no formato que o algoritmo de Machine Learning espera. Para isso, usa-se o módulo DenseVector. Este DenseVector é uma maneira otimizada de lidar com valores numéricos, acelerando o processamento realizado pelo Spark.
Assim, mapeia-se as linhas do df transformando-as em DenseVector, e cria-se um novo dataframe, chamado df, com as colunas ‘label’ e ‘features’.

Recordando que uma Regressão Linear é um problema de Aprendizado Supervisionado, ou seja, o algoritmo necessita do ‘ground truth’, os rótulos das entradas, de modo que ele possa comparar com sua saída e calcular alguma métrica de erro, como Erro Quadrático Médio (do inglês, Mean Squared Error, MSE), bastante empregado em problemas de Regressão.

In [23]:
# Import all from `sql.functions` 
from pyspark.sql.functions import *

# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# Show the first 2 lines of `df`
#df.take(2)

AnalysisException: "cannot resolve '`medianHouseValue`' given input columns: [Salary, YearsExperience];;\n'Project [Salary#14, YearsExperience#11, ('medianHouseValue / 100000) AS medianHouseValue#174]\n+- Project [cast(Salary#0 as float) AS Salary#14, YearsExperience#11]\n   +- Project [Salary#0, cast(YearsExperience#1 as float) AS YearsExperience#11]\n      +- LogicalRDD [Salary#0, YearsExperience#1], false\n"

In [25]:
# Import all from `sql.functions` if you haven't yet
#from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
   
# Inspect the result
df.first()

Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedrooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=4.526, roomsPerHousehold=6.984126984126984, populationPerHousehold=2.5555555555555554, bedroomsPerRoom=0.14659090909090908)

In [27]:
# Re-order and select columns

COLUMNS = ["medianHouseValue", "latitude", "longitude", "totalBedrooms", "totalRooms", "bedroomsperhouse", "roomsperhouse", "populationperhousehold",
           "households", "housingMedianAge", "population"]
          ['medianHouseValue', 'latitude', 'longitude', 'totalBedrooms', 'totalRooms', 'bedroomsperhouse', 'roomsperhouse', 'populationperhousehold', 
           'households', 'housingMedianAge', 'population']

df = df.select(COLUMNS)

AnalysisException: cannot resolve 'bedroomsperhouse' given input columns: [bedroomsPerRoom, households, housingMedianAge, latitude, longitude, medianHouseValue, medianIncome, population, populationPerHousehold, roomsPerHousehold, totalBedrooms, totalRooms];
'Project [medianHouseValue#945, latitude#65, longitude#55, totalBedrooms#95, totalRooms#85, 'bedroomsperhouse, 'roomsperhouse, populationperhousehold#972, households#115, housingMedianAge#75, population#105]
+- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, totalBedrooms#95, population#105, households#115, medianIncome#125, medianHouseValue#945, roomsPerHousehold#961, populationPerHousehold#972, (cast(totalBedRooms#95 as double) / cast(totalRooms#85 as double)) AS bedroomsPerRoom#984]
   +- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, totalBedrooms#95, population#105, households#115, medianIncome#125, medianHouseValue#945, roomsPerHousehold#961, (cast(population#105 as double) / cast(households#115 as double)) AS populationPerHousehold#972]
      +- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, totalBedrooms#95, population#105, households#115, medianIncome#125, medianHouseValue#945, (cast(totalRooms#85 as double) / cast(households#115 as double)) AS roomsPerHousehold#961]
         +- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, totalBedrooms#95, population#105, households#115, medianIncome#125, (cast(medianHouseValue#135 as double) / cast(100000 as double)) AS medianHouseValue#945]
            +- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, totalBedrooms#95, population#105, households#115, medianIncome#125, cast(medianHouseValue#8 as float) AS medianHouseValue#135]
               +- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, totalBedrooms#95, population#105, households#115, cast(medianIncome#7 as float) AS medianIncome#125, medianHouseValue#8]
                  +- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, totalBedrooms#95, population#105, cast(households#6 as float) AS households#115, medianIncome#7, medianHouseValue#8]
                     +- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, totalBedrooms#95, cast(population#5 as float) AS population#105, households#6, medianIncome#7, medianHouseValue#8]
                        +- Project [longitude#55, latitude#65, housingMedianAge#75, totalRooms#85, cast(totalBedrooms#4 as float) AS totalBedrooms#95, population#5, households#6, medianIncome#7, medianHouseValue#8]
                           +- Project [longitude#55, latitude#65, housingMedianAge#75, cast(totalRooms#3 as float) AS totalRooms#85, totalBedRooms#4, population#5, households#6, medianIncome#7, medianHouseValue#8]
                              +- Project [longitude#55, latitude#65, cast(housingMedianAge#2 as float) AS housingMedianAge#75, totalRooms#3, totalBedRooms#4, population#5, households#6, medianIncome#7, medianHouseValue#8]
                                 +- Project [longitude#55, cast(latitude#1 as float) AS latitude#65, housingMedianAge#2, totalRooms#3, totalBedRooms#4, population#5, households#6, medianIncome#7, medianHouseValue#8]
                                    +- Project [cast(longitude#0 as float) AS longitude#55, latitude#1, housingMedianAge#2, totalRooms#3, totalBedRooms#4, population#5, households#6, medianIncome#7, medianHouseValue#8]
                                       +- LogicalRDD [longitude#0, latitude#1, housingMedianAge#2, totalRooms#3, totalBedRooms#4, population#5, households#6, medianIncome#7, medianHouseValue#8], false


In [None]:
df.show()

In [None]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

In [None]:
# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[0])))

In [None]:
columns = ["label","features"]

# Replace `df` with the new DataFrame
df = spark.createDataFrame(data=input_data, schema = columns)

In [None]:
# Suponha que você tenha um DataFrame chamado df
# ...

# Acessando os dados da posição 0 usando head() e collect()
dados_posicao_0 = df.head(2)[1:]
print(dados_posicao_0)
