<a href="https://colab.research.google.com/github/FGalvao77/Tutorial-Apache-Spark-Machine-Learning-com-PySpark/blob/main/Tutorial_Apache_Spark_Machine_Learning_com_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<br>

## **Tutorial Apache Spark: _Machine Learning_ com `PySpark`**
---
---

O tutorial do **Apache Spark** apresenta o processamento, a análise e o Machine Leaning de _Big Data_ com o `PySpark`.

**Apache Spark e Python para Big Data e Machine Learning**

<br>

O `Apache Spark` é conhecido como um mecanismo rápido, fácil de usar e geral para processamento de `Big Data` que possui módulos integrados para _streaming_, _SQL_, _Machine Learning (ML)_ e processamento de gráficos. Essa tecnologia é uma habilidade em demanda para **engenheiros de dados**, mas também os **cientistas de dados** podem se beneficiar do aprendizado do **Spark** ao fazer `Análise Exploratória de Dados` (**EDA** - _Exploratory Data Analysis_), extração de recursos e, é claro, ML.

Neste tutorial, você fará a interface do Spark com o Python por meio do PySpark, a API do Spark Python que expõe o modelo de programação do Spark ao Python. Mais concretamente, você se concentrará em:

- Instalar o PySpark localmente em seu computador pessoal e configurá-lo para que você possa trabalhar com o shell interativo do Spark para fazer análises rápidas e interativas de seus dados.
- Aprendendo a trabalhar com o básico do Spark: você verá como pode criar `RDDs` e realizar operações básicas neles.
- Introdução ao PySpark no Jupyter Notebook e/ou Google Colab e carregamento em um conjunto de dados da vida real.
- Explorando e pré-processando os dados que você carregou na primeira etapa, a ajuda de DataFrames, que exige que você faça uso do _Spark SQL_, que permite consultar dados estruturados dentro de programas Spark.
- Criando um modelo de regressão linear com Spark ML para alimentá-lo com os dados, após o qual você poderá fazer previsões. E por fim,
- Avaliando o modelo de aprendizado de máquina que você criou.

**Instalando o Apache Spark e as dependências necessárias**

<br>

Instalar o Spark e fazê-lo funcionar pode ser um desafio, você pode instalá-lo e configurar no seu PC.

A primeira coisa que você deseja fazer é verificar se atende aos pré-requisitos. O Spark é escrito em linguagem de programação Scala e é executado no ambiente Java Virtual Machine (JVM). É por isso que você precisa verificar se possui um [Java Development Kit (JDK)](http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) instalado. Você faz isso porque o JDK fornecerá uma ou mais implementações da JVM. De preferência, você deseja escolher o mais recente, que, no momento da redação deste artigo, é o JDK8.

Em seguida, você está lendo para baixar o Spark!

**Baixando `pyspark` com pip**

Em seguida, você pode baixar e instalar o PySpark com a ajuda de pip. Isso é bastante fácil e muito parecido com a instalação de qualquer outro pacote em Python. Você apenas executa o comando usual e o trabalho pesado é feito para você:

```python 
!pip install pyspark
```

Nesse tutorial iremos utilizar o PySpark no ambiente do `Google Colab`, por ser mais prático! Mas mesmo assim, será necessário algumas configurações extras.

In [1]:
# Instalando as dependências necessárias e as bibliotecas "findspark" e "pyspark"
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install -q pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
# Configurando as dependências de ambiente
import os

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.1.2-bin-hadoop2.7'

**Os dados**

Para nosso tutorial, você usará o conjunto de dados [California Housing](https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html) . Observe, é claro, que na verdade são dados 'pequenos' e que usar o Spark nesse contexto pode ser um exagero. Este tutorial é apenas para fins educacionais e destina-se a fornecer uma ideia de como você pode usar o PySpark para criar um modelo de aprendizado de máquina.


Agora se você preferir poderá baixar os dados [aqui](http://lib.stat.cmu.edu/datasets/). Procure a pasta `houses.zip`, faça o download e descompacte-o para que você possa acessar as pastas de dados.

Em seguida, você usará o método `textFile()` para ler os dados da pasta que você baixou para **RDDs**. Este método pega um URI para o arquivo, que é neste caso o caminho local de sua máquina, e o lê como uma coleção de linhas. Para sua comodidade, você lerá não apenas o arquivo `.data`, mas também o arquivo `.domain` que contém o cabeçalho. Isso permitirá que você verifique novamente a ordem das variáveis.

```python
# Load in the data | Carregando os dados
rdd = sc.textFile('/Users/yourName/Downloads/CaliforniaHousing/cal_housing.data')

# Load in the header | Lendo o cabeçalho
header = sc.textFile('/Users/yourName/Downloads/CaliforniaHousing/cal_housing.domain')
```

Mas para aplicação no `Colab`, utilizarei o conjunto de dados já presente no ambiente, ou seja, um conjunto de dados de treino e outro para teste. 

In [3]:
# Importando os dados de treino e teste, esses já presente no ambiente do Colab
train_data = '/content/sample_data/california_housing_train.csv'
test_data = '/content/sample_data/california_housing_test.csv'

### **Carregando, preparando e explorando os dados**

Mesmo sabendo um pouco mais sobre seus dados, você deve reservar um tempo para prosseguir e explorá-los mais profundamente; Antes de fazer isso, no entanto, você configurará seu Jupyter Notebook com Spark e dará alguns primeiros passos para definir o SparkContext.

**PySpark no notebook Jupyter / Google Colab**

Nesta parte do tutorial, você não usará o ishell, mas criará seu próprio aplicativo. Você fará isso em um Jupyter Notebook. Você já tem tudo o que precisa instalado, então não precisa fazer muito para que o PySpark funcione no Jupyter.

Você pode simplesmente iniciar o aplicativo de notebook da mesma forma que sempre faz, executando `$jupyter notebook`. Então, você faz um novo notebook e simplesmente importa a biblioteca `findspark` e usa a função `init()`. Nesse caso, você fornecerá o caminho `/usr/local/spark` porque `init()` tem certeza de que esse é o caminho onde instalou o Spark.

**Dica:** se você não tem ideia se seu caminho está definido corretamente ou onde você instalou o Spark no seu PC, você sempre pode usar `findspark.find()` para detectar automaticamente o local onde o Spark está instalado.

In [4]:
# Importando e inicializando a bilioteca
import findspark

findspark.init()

In [5]:
# Importando o módulo para iniciar uma sessão Spark e definindo os seus parâmetros
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local') \
    .appName('Linear Regression Model') \
    .config('spark.executor.memory', '1gb') \
    .getOrCreate()

In [6]:
# Instanciando a variável "sc"
sc = spark.sparkContext

sc

In [7]:
# Importando a biblioteca para análise e manipulação de dados
import pandas as pd

In [8]:
# Instanciando o conjunto de dados de treino
train = pd.read_csv(
    filepath_or_buffer=train_data
)

# Visualizando as primeiras observações
train.head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0
1,-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82,80100.0
2,-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509,85700.0
3,-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917,73400.0
4,-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.925,65500.0


In [9]:
# Contabilizando a quantidade de linhas e colunas e, visualizando o nome dos atributos
train.shape, train.columns

((17000, 9),
 Index(['longitude', 'latitude', 'housing_median_age', 'total_rooms',
        'total_bedrooms', 'population', 'households', 'median_income',
        'median_house_value'],
       dtype='object'))

In [10]:
# Instanciando o conjunto de dados de teste
test = pd.read_csv(
    filepath_or_buffer=test_data
)

# Visualizando as primeiras observações
test.head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-122.05,37.37,27.0,3885.0,661.0,1537.0,606.0,6.6085,344700.0
1,-118.3,34.26,43.0,1510.0,310.0,809.0,277.0,3.599,176500.0
2,-117.81,33.78,27.0,3589.0,507.0,1484.0,495.0,5.7934,270500.0
3,-118.36,33.82,28.0,67.0,15.0,49.0,11.0,6.1359,330000.0
4,-119.67,36.33,19.0,1241.0,244.0,850.0,237.0,2.9375,81700.0


In [11]:
# Contabilizando a quantidade de linhas e colunas e, visualizando o nome dos atributos
test.shape, test.columns

((3000, 9),
 Index(['longitude', 'latitude', 'housing_median_age', 'total_rooms',
        'total_bedrooms', 'population', 'households', 'median_income',
        'median_house_value'],
       dtype='object'))

In [12]:
# Realizando a concatenação dos conjuntos de dados - treino e teste
california_housing = pd.concat(
    objs=[train, test],
    ignore_index=True
)

# Visualizando as últimas observações
california_housing.tail()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
19995,-119.86,34.42,23.0,1450.0,642.0,1258.0,607.0,1.179,225000.0
19996,-118.14,34.06,27.0,5257.0,1082.0,3496.0,1036.0,3.3906,237200.0
19997,-119.7,36.3,10.0,956.0,201.0,693.0,220.0,2.2895,62000.0
19998,-117.12,34.1,40.0,96.0,14.0,46.0,14.0,3.2708,162500.0
19999,-119.63,34.42,42.0,1765.0,263.0,753.0,260.0,8.5608,500001.0


In [13]:
# Contabilizando a quantidade de linhas e colunas e, visualizando o nome dos atributos
california_housing.shape, california_housing.columns

((20000, 9),
 Index(['longitude', 'latitude', 'housing_median_age', 'total_rooms',
        'total_bedrooms', 'population', 'households', 'median_income',
        'median_house_value'],
       dtype='object'))

In [14]:
# Salvando o conjunto de dados final em um objeto do tipo ".csv" com o nome de "house"
california_housing.to_csv(path_or_buf='house.csv')

In [15]:
# Inspecionando o ambiente
%ls

house.csv     [0m[01;34mspark-3.1.2-bin-hadoop2.7[0m/
[01;34msample_data[0m/  spark-3.1.2-bin-hadoop2.7.tgz


In [16]:
# Visualizando as primeiras observações do conjunto de dados "house.csv"
!head house.csv

,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value
0,-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0
1,-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82,80100.0
2,-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509,85700.0
3,-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917,73400.0
4,-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.925,65500.0
5,-114.58,33.63,29.0,1387.0,236.0,671.0,239.0,3.3438,74000.0
6,-114.58,33.61,25.0,2907.0,680.0,1841.0,633.0,2.6768,82400.0
7,-114.59,34.83,41.0,812.0,168.0,375.0,158.0,1.7083,48500.0
8,-114.59,33.61,34.0,4789.0,1175.0,3134.0,1056.0,2.1782,58400.0


In [17]:
# Carregando o conjunto de dados em um objeto do tipo data frame spark
df = spark.read.csv(path='house.csv', header=True, inferSchema=True, sep=',')

In [18]:
# Visualizando o tipo do objeto "df"
type(df)

pyspark.sql.dataframe.DataFrame

Agora que você tem seu DataFrame df, você pode inspecioná-lo com os seguintes métodos:
- `first()`
- `take()`
- também com o `head()` e 
- `show()`:

In [19]:
# Visualizando as primeiras observações
df.show()

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|_c0|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  0|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  1|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  2|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  3|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  4|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           6

In [20]:
# Visualizando o tipo de dado em cada atributo
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [21]:
# Com método ".dtypes" também é possível visualizar o tipo de dado nos atributos
df.dtypes

[('_c0', 'int'),
 ('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double')]

In [23]:
# Visualizando o nome dos atributos
df.columns

['_c0',
 'longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [24]:
# Visualizando todas as colunas menos a primeira
df.columns[1:]

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [25]:
# Instanciando as colunas de interesse
cols = df.columns[1:]

cols

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [26]:
# Realizando a seleção das colunas de interesse instanciadas no objeto "cols"
df = df.select(cols)

# Visualizando as primeiras observações do conjunto de dados após a seleção
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

In [27]:
# Contabilizando a quantidade de linhas e colunas e, o nome dos atributos
df.count(), len(df.columns)

(20000, 9)

In [28]:
# Definindo uma função para visualizar a dimensão de um conjunto de dados
def dataframe_dim(data: object):
    num_rows = data.count()
    num_cols = len(data.columns)
    return num_rows, num_cols

In [29]:
# Aplicando a função "dataframe_dim" no objeto "df"
dataframe_dim(data=df)

(20000, 9)

In [30]:
# Selecionando as colunas "population" e "total_bedrooms" e, visualizando as primeiras observações
df.select('population', 'total_bedrooms').show(n=10)

+----------+--------------+
|population|total_bedrooms|
+----------+--------------+
|    1015.0|        1283.0|
|    1129.0|        1901.0|
|     333.0|         174.0|
|     515.0|         337.0|
|     624.0|         326.0|
|     671.0|         236.0|
|    1841.0|         680.0|
|     375.0|         168.0|
|    3134.0|        1175.0|
|     787.0|         309.0|
+----------+--------------+
only showing top 10 rows



In [33]:
# Realizando o agrupamento dos dados da coluna "housing_median_age" com a função ".gropuBy()"
# E, posteriormente contabilizando e ordenando os dados de exibição
# De maior idade média para a menor com o argumento "ascending=False"
df.groupBy('housing_median_age').count() \
    .sort('housing_median_age', ascending=False).show()

+------------------+-----+
|housing_median_age|count|
+------------------+-----+
|              52.0| 1225|
|              51.0|   43|
|              50.0|  128|
|              49.0|  132|
|              48.0|  169|
|              47.0|  197|
|              46.0|  237|
|              45.0|  286|
|              44.0|  347|
|              43.0|  342|
|              42.0|  359|
|              41.0|  288|
|              40.0|  292|
|              39.0|  357|
|              38.0|  382|
|              37.0|  525|
|              36.0|  830|
|              35.0|  810|
|              34.0|  669|
|              33.0|  598|
+------------------+-----+
only showing top 20 rows



In [34]:
# Visualizando a estatística descritiva do conjunto de dados
df.describe().show()

+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|   total_bedrooms|        population|       households|     median_income|median_house_value|
+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              20000|            20000|             20000|             20000|            20000|             20000|            20000|             20000|             20000|
|   mean|-119.56617200000018|35.62674950000045|          28.62775|        2637.05155|         537.9918|        1425.55765|        499.52545|3.8721321550000423|      207082.71675|
| stddev|  2.003608838779282|2.136140953719693|12.582229355812826|2176.3147574716336|420.6311188674329|11

> Observe os valores mínimo e máximo de todos os atributos (numéricos). Você vê que vários atributos têm uma ampla gama de valores: você precisará normalizar seu conjunto de dados.

### **Pré-processamento de dados**

Com todas essas informações coletadas em sua pequena análise exploratória de dados, você sabe o suficiente para pré-processar seus dados para alimentá-los ao modelo.

- Você não deve se preocupar com valores ausentes; todos os valores zero foram excluídos do conjunto de dados.
- Você provavelmente deve padronizar seus dados, pois você viu que o intervalo de valores mínimos e máximos é bastante grande.
- Existem possivelmente alguns atributos adicionais que você pode adicionar, como um recurso que registra o número de quartos por cômodo ou os cômodos por residência.
- Sua variável dependente também é bastante grande; Para facilitar sua vida, você terá que ajustar um pouco os valores.

**Pré-processando os valores-alvo**
<br>

Primeiro, vamos começar com `median_house_value`, sua variável dependente. Para facilitar seu trabalho com os valores-alvo, você expressará os valores das casas em unidades de 100.000. Isso significa que um alvo como 452600.000000 deve se tornar 4.526:

In [35]:
# Importando o modulo necessário para aplicar a manipulação do atributo de interesse
from pyspark.sql.functions import col

In [36]:
# Ajustando os valores da coluna "median_house_value"
df = df.withColumn('median_house_value', 
                   col('median_house_value') / 100_000)

In [38]:
# Com a função ".take()", visualizando as 5 primeiras linhas
df.take(5)

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=0.669),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=0.801),
 Row(longitude=-114.56, latitude=33.69, housing_median_age=17.0, total_rooms=720.0, total_bedrooms=174.0, population=333.0, households=117.0, median_income=1.6509, median_house_value=0.857),
 Row(longitude=-114.57, latitude=33.64, housing_median_age=14.0, total_rooms=1501.0, total_bedrooms=337.0, population=515.0, households=226.0, median_income=3.1917, median_house_value=0.734),
 Row(longitude=-114.57, latitude=33.57, housing_median_age=20.0, total_rooms=1454.0, total_bedrooms=326.0, population=624.0, households=262.0, median_income=1.925, median_house_value=0.655)]

In [40]:
# Visualizando a estatística descritiva somente do atributo alvo após sua normalização
df.select('median_house_value').describe().show()

+-------+------------------+
|summary|median_house_value|
+-------+------------------+
|  count|             20000|
|   mean|  2.07082716750003|
| stddev| 1.155570558561624|
|    min|           0.14999|
|    max|           5.00001|
+-------+------------------+



**Engenharia de recursos**

<br>
Agora que você ajustou os valores em "median_house_value, também pode adicionar as variáveis ​​adicionais. Você adicionará as seguintes colunas ao conjunto de dados:

- Cômodos por domicílio que se refere ao número de cômodos dos domicílios por grupo de quarteirões;
- População por domicílio , que basicamente dá uma indicação de quantas pessoas moram em domicílios por grupo de quarteirão, e
- Quartos por cômodo, o que lhe dará uma ideia de quantos cômodos são quartos por grupo de quarteirões.

Ao trabalhar com DataFrames, você pode usar melhor o método `select()` para selecionar as colunas com as quais trabalhará, ou seja **total_rooms**, **households**, e **population**. Além disso, você deve indicar que está trabalhando com colunas adicionando a função `col()` ao seu código. Caso contrário, você não poderá fazer operações elementares como a divisão que você tem em mente para essas três variáveis.

In [41]:
# Divide "total_rooms" por "households"
roomsPerHousehold = df.select(col('total_rooms') / col('households'))

roomsPerHousehold.show()

+--------------------------+
|(total_rooms / households)|
+--------------------------+
|        11.889830508474576|
|         16.52267818574514|
|         6.153846153846154|
|        6.6415929203539825|
|         5.549618320610687|
|         5.803347280334728|
|         4.592417061611374|
|         5.139240506329114|
|         4.535037878787879|
|         5.523985239852398|
|         4.540048543689321|
|         4.549199084668192|
|         6.118483412322274|
|        5.1732776617954075|
|         4.826666666666667|
|        6.3740648379052365|
|                 6.5546875|
|        1.6296296296296295|
|                    4.3375|
|         6.466666666666667|
+--------------------------+
only showing top 20 rows



In [42]:
# Divide "population" por "households"
populationPerHousehold = df.select(col('population') / col('households'))

populationPerHousehold.show()

+-------------------------+
|(population / households)|
+-------------------------+
|       2.1504237288135593|
|       2.4384449244060473|
|       2.8461538461538463|
|       2.2787610619469025|
|        2.381679389312977|
|        2.807531380753138|
|       2.9083728278041074|
|       2.3734177215189876|
|       2.9678030303030303|
|        2.904059040590406|
|        2.953883495145631|
|       2.7048054919908466|
|        2.748815165876777|
|       2.8100208768267225|
|       3.1633333333333336|
|        2.506234413965087|
|                2.6015625|
|       2.3703703703703702|
|                 2.421875|
|       1.9333333333333333|
+-------------------------+
only showing top 20 rows



In [43]:
# Divide "total_bedrooms" por "total_rooms"
bedroomsPerRoom = df.select(col('total_bedrooms') / col('total_rooms'))

bedroomsPerRoom.show()

+------------------------------+
|(total_bedrooms / total_rooms)|
+------------------------------+
|           0.22861724875267284|
|           0.24849673202614378|
|           0.24166666666666667|
|           0.22451698867421718|
|           0.22420907840440166|
|           0.17015140591204037|
|           0.23391812865497075|
|           0.20689655172413793|
|           0.24535393610357067|
|           0.20641282565130262|
|            0.2141138732959102|
|           0.24295774647887325|
|           0.19209914794732766|
|           0.18724778046811946|
|            0.2610497237569061|
|           0.22965571205007826|
|           0.19189511323003575|
|                          0.75|
|            0.2780979827089337|
|           0.24742268041237114|
+------------------------------+
only showing top 20 rows



In [44]:
# Adicionando as colunas criadas ao data frame
df = df.withColumn('roomsPerHousehold', col('total_rooms') / col('households')) \
    .withColumn('populationPerHousehold', col('population') / col('households')) \
    .withColumn('bedroomsPerRoom', col('total_bedrooms') / col('total_rooms')) 

In [45]:
# Visualizando a primeira linha
df.first()

Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=0.669, roomsPerHousehold=11.889830508474576, populationPerHousehold=2.1504237288135593, bedroomsPerRoom=0.22861724875267284)

In [47]:
# Visualizando a primeira linha e somente os atributos criados
df.select('roomsPerHousehold', 
          'populationPerHousehold',
          'bedroomsPerRoom').first()

Row(roomsPerHousehold=11.889830508474576, populationPerHousehold=2.1504237288135593, bedroomsPerRoom=0.22861724875267284)

> Você vê que, para a primeira linha, há cerca de ~ 11,9 cômodos por domicílio, os domicílios no grupo de quarteirões consistem em cerca de ~2,1 pessoas e a quantidade de quartos é bastante baixa com ~ 0,2.

In [46]:
# Visualizando o nome das colunas
df.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'roomsPerHousehold',
 'populationPerHousehold',
 'bedroomsPerRoom']

Em seguida, - e isso já está prevendo um problema que você pode ter ao padronizar os valores em seu conjunto de dados - você também reordenará os valores. Como você não deseja necessariamente padronizar seus valores de destino, certifique-se de isolá-los em seu conjunto de dados.

Nesse caso, você precisará fazer isso usando o método `select()` e passando os nomes das colunas na ordem que for mais adequada. Nesse caso, a variável de destino "median_house_value" é colocada primeiro, para que não seja afetada pela padronização.

Observe também que este é o momento de deixar de fora as variáveis ​​que você pode não querer considerar em sua análise. Nesse caso, vamos deixar de fora variáveis ​​como `longitude`, `latitude` e `housing_median_age` e `total_rooms`.

In [48]:
# Realizando a seleção dos atributos
df = df.select(
    'median_house_value',
    'total_bedrooms',
    'population',
    'households',
    'median_income',
    'roomsPerHousehold',
    'populationPerHousehold',
    'bedroomsPerRoom'
)

**Estandardização**

<br>
Agora que você reordenou os dados, está pronto para normalizá-los. Ou quase, pelo menos. Há apenas mais uma etapa que você precisa seguir que é separar os recursos da variável de destino. Em essência, isso se resume a isolar a primeira coluna em seu DataFrame do restante das colunas.

Nesse caso, você usará a função `map()` que usa com **RDDs** para executar essa ação. Você também vê que faz uso da função `DenseVector()`, mas afinal para que isso? 

    Um vetor denso é um vetor local apoiado por uma matriz dupla que representa seus valores de entrada. 
    Em outras palavras, é usado para armazenar matrizes de valores para uso no PySpark.

Em seguida, você volta a criar um DataFrame `input_data` e renomeia as colunas passando uma lista como um segundo argumento. Esta lista consiste nos nomes das colunas **"label"** e **"features"**.

In [49]:
# Importando o módulo para realizar a criação do vetor denso
from pyspark.ml.linalg import DenseVector

In [50]:
# Realizando o mapeamento dos atributos e com a função "lambda" aplicando a vetorização
input_data = df.rdd.map(
    lambda x: (x[0], DenseVector(x[1:]))
)

In [51]:
# Instanciando um novo objeto data frame com os dados vetorizados
df = spark.createDataFrame(
    data=input_data,
    schema=['label', 'features']
)

In [52]:
# Visualizando as primeiras observações
df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|0.669|[1283.0,1015.0,47...|
|0.801|[1901.0,1129.0,46...|
|0.857|[174.0,333.0,117....|
|0.734|[337.0,515.0,226....|
|0.655|[326.0,624.0,262....|
| 0.74|[236.0,671.0,239....|
|0.824|[680.0,1841.0,633...|
|0.485|[168.0,375.0,158....|
|0.584|[1175.0,3134.0,10...|
|0.481|[309.0,787.0,271....|
|0.865|[801.0,2434.0,824...|
| 0.62|[483.0,1182.0,437...|
|0.486|[248.0,580.0,211....|
|0.704|[464.0,1346.0,479...|
| 0.45|[378.0,949.0,300....|
|0.691|[587.0,1005.0,401...|
|0.949|[322.0,666.0,256....|
| 0.25|[33.0,64.0,27.0,0...|
| 0.44|[386.0,775.0,320....|
|0.275|[24.0,29.0,15.0,1...|
+-----+--------------------+
only showing top 20 rows



Em seguida, você pode finalmente dimensionar os dados. Você pode usar o Spark ML para fazer isso, esta biblioteca tornará o aprendizado de máquina em Big Data escalável e fácil. Você encontrará ferramentas como algoritmos de ML e tudo o que precisa para criar pipelines de ML práticos. Nesse caso, você não precisa fazer muito pré-processamento, portanto, um pipeline talvez seja um exagero, mas se quiser investigá-lo, considere definitivamente visitar esta [página](https://spark.apache.org/docs/latest/ml-pipeline.html).

As colunas de entrada são os recursos, e a coluna de saída com o redimensionado que será incluído no `scaled_df` será nomeada **"features_scaled"**.

In [53]:
# Importando o módulo
from pyspark.ml.feature import StandardScaler

In [54]:
# Selecionando a coluna de entrada e nomeando a coluna de saída
standardScaler = StandardScaler(
    inputCol='features',
    outputCol='features_scaled'
)

In [55]:
# Treinando o modelo com o conjunto de dados
scaler = standardScaler.fit(df)

In [56]:
# Realizando o redimensionamento dos dados
scaled_df = scaler.transform(df)

In [57]:
# Visualizando duas linhas após o  redimensionamento dos dados
scaled_df.take(2)

[Row(label=0.669, features=DenseVector([1283.0, 1015.0, 472.0, 1.4936, 11.8898, 2.1504, 0.2286]), features_scaled=DenseVector([3.0502, 0.8974, 1.2365, 0.786, 4.7528, 0.5281, 3.939])),
 Row(label=0.801, features=DenseVector([1901.0, 1129.0, 463.0, 1.82, 16.5227, 2.4384, 0.2485]), features_scaled=DenseVector([4.5194, 0.9982, 1.2129, 0.9577, 6.6048, 0.5988, 4.2815]))]

In [58]:
# Visualizando a estatística descritiva 
scaled_df.describe().show()

+-------+-----------------+
|summary|            label|
+-------+-----------------+
|  count|            20000|
|   mean| 2.07082716750003|
| stddev|1.155570558561624|
|    min|          0.14999|
|    max|          5.00001|
+-------+-----------------+



### **Construindo um modelo de aprendizado de máquina com Spark ML**

Com todo o pré-processamento concluído, finalmente é hora de começar a construir seu modelo de Regressão Linear! Como sempre, primeiro você precisa dividir os dados em conjuntos de treinamento e teste. Felizmente, isso não é problema com o método `randomSplit()`.

In [59]:
# Separando os dados em treino e de validação
train_data, val_data = scaled_df.randomSplit(
    weights=[0.8, 0.2], seed=1522
)

In [60]:
# Visualizando a dimensão dos conjuntod de dados apóes seu particionamento
dataframe_dim(data=train_data), dataframe_dim(data=val_data),

((16032, 3), (3968, 3))

In [61]:
# Visualizando as primeiras observações do conjunto de dados de treino
train_data.show()

+-------+--------------------+--------------------+
|  label|            features|     features_scaled|
+-------+--------------------+--------------------+
|0.14999|[28.0,18.0,8.0,0....|[0.06656663937606...|
|0.14999|[73.0,85.0,38.0,1...|[0.17354873837331...|
|0.14999|[239.0,490.0,164....|[0.56819381467428...|
|0.14999|[267.0,628.0,225....|[0.63476045405035...|
|  0.175|[168.0,259.0,138....|[0.39939983625640...|
|  0.225|[451.0,1230.0,375...|[1.07219836995022...|
|  0.225|[1743.0,6835.0,14...|[4.14377330116017...|
|   0.25|[33.0,64.0,27.0,0...|[0.07845353926465...|
|  0.266|[309.0,808.0,294....|[0.73461041311445...|
|  0.269|[543.0,1423.0,482...|[1.29091732790015...|
|  0.275|[24.0,29.0,15.0,1...|[0.05705711946520...|
|  0.283|[3114.0,12427.0,2...|[7.40316125060974...|
|    0.3|[183.0,500.0,177....|[0.43506053592215...|
|  0.325|[49.0,51.0,20.0,4...|[0.11649161890811...|
|  0.325|[332.0,667.0,288....|[0.78929015260193...|
|  0.342|[153.0,112.0,47.0...|[0.36373913659065...|
|  0.344|[12

In [62]:
# Visualizando as primeiras observações do conjunto de dados de validação
val_data.show()

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|0.225|[73.0,216.0,63.0,...|[0.17354873837331...|
|0.225|[79.0,167.0,53.0,...|[0.18781301823961...|
|  0.3|[448.0,338.0,182....|[1.06506623001707...|
|0.325|[275.0,508.0,253....|[0.65377949387208...|
|0.329|[386.0,436.0,213....|[0.91766867139863...|
|0.346|[208.0,660.0,188....|[0.49449503536506...|
|0.388|[103.0,470.0,96.0...|[0.24487013770481...|
|0.388|[263.0,1274.0,241...|[0.62525093413948...|
|0.394|[255.0,677.0,213....|[0.60623189431775...|
|0.398|[316.0,672.0,241....|[0.75125207295847...|
|0.425|[30.0,542.0,32.0,...|[0.07132139933150...|
| 0.43|[228.0,1222.0,224...|[0.54204263491940...|
|0.438|[78.0,284.0,73.0,...|[0.18543563826190...|
| 0.44|[835.0,1908.0,686...|[1.98511228139342...|
|0.444|[441.0,1530.0,391...|[1.04842457017305...|
|0.444|[465.0,1193.0,447...|[1.10548168963825...|
|0.446|[453.0,1613.0,400...|[1.07695312990565...|


In [63]:
# importando o módulo de regressão linear
from pyspark.ml.regression import LinearRegression

In [64]:
# Definindo os parâmetros do modelo
linearReg = LinearRegression(
    labelCol='label',
    maxIter=10, 
    regParam=0.3,
    elasticNetParam=0.8
)

In [65]:
# Treinando o modelo no conjunto de dados de treino
linearModel = linearReg.fit(dataset=train_data)

In [66]:
# Realizando a predição do modelo após seu treinamento no conjunto de dados de validação
predicted = linearModel.transform(dataset=val_data)

In [69]:
# Visualizando as primeiras observações do objeto "predicted"
predicted.show(truncate=True)

+-----+--------------------+--------------------+------------------+
|label|            features|     features_scaled|        prediction|
+-----+--------------------+--------------------+------------------+
|0.225|[73.0,216.0,63.0,...|[0.17354873837331...|1.7404451323951722|
|0.225|[79.0,167.0,53.0,...|[0.18781301823961...|1.2173172622444626|
|  0.3|[448.0,338.0,182....|[1.06506623001707...|1.3343981294820373|
|0.325|[275.0,508.0,253....|[0.65377949387208...|1.4535067317868342|
|0.329|[386.0,436.0,213....|[0.91766867139863...|1.3160374051062278|
|0.346|[208.0,660.0,188....|[0.49449503536506...|1.6174199459194385|
|0.388|[103.0,470.0,96.0...|[0.24487013770481...| 1.535588336220294|
|0.388|[263.0,1274.0,241...|[0.62525093413948...|1.5367549783137644|
|0.394|[255.0,677.0,213....|[0.60623189431775...| 1.425979533819471|
|0.398|[316.0,672.0,241....|[0.75125207295847...| 1.528144048576244|
|0.425|[30.0,542.0,32.0,...|[0.07132139933150...|1.4574788703431742|
| 0.43|[228.0,1222.0,224...|[0.542

In [73]:
# Selecionando a coluna "prediction" e aplicando a função ".map()" juntamente com a "lambda"
predictions = predicted.select('prediction').rdd.map(
    lambda x: x[0]
)

predictions

PythonRDD[190] at RDD at PythonRDD.scala:53

In [74]:
# Selecionando a coluna "label" e aplicando a função ".map()" juntamente com a "lambda"
labels = predicted.select('label').rdd.map(
    lambda x: x[0]
)

labels

PythonRDD[195] at RDD at PythonRDD.scala:53

In [75]:
# Com a função ".zip()" realizando a concatenação dos objetos "predictions" e "labels"
predictionAndLabel = predictions.zip(labels).collect()

In [76]:
# Visualizando as 15 primeiras observações do obbjeto "predictionAndLabel"
predictionAndLabel[:15]

[(1.7404451323951722, 0.225),
 (1.2173172622444626, 0.225),
 (1.3343981294820373, 0.3),
 (1.4535067317868342, 0.325),
 (1.3160374051062278, 0.325),
 (1.59344822861789, 0.344),
 (1.483506099904647, 0.379),
 (1.535588336220294, 0.388),
 (1.6021424899335153, 0.392),
 (1.2894268544980199, 0.396),
 (1.146290980506271, 0.425),
 (1.4154519777855348, 0.427),
 (1.4159797444468667, 0.437),
 (1.5927815759930497, 0.44),
 (1.4528123019692922, 0.441)]

### **Avaliando o modelo**

<br>
Olhar para os valores previstos é uma coisa, mas outra e melhor é olhar para algumas métricas para ter uma ideia melhor de quão bom é o seu modelo. Você pode começar imprimindo os coeficientes e a interceptação do modelo.

In [77]:
# Visualizando o coeficiente angular da reta de regressão
linearModel.coefficients

DenseVector([0.0, 0.0, 0.0, 0.2778, 0.0, 0.0, 0.0])

In [78]:
# Visualizando o valor do intercepto
linearModel.intercept

0.9974052276252747

Em seguida, você também pode usar o atributo **summary** para obter o `rootMeanSquaredError`, `r2` e o `r2adj`.

- RMSE (Root Mean Squared Error - "raiz do erro quadrático médio") mede quanto erro existe entre dois conjuntos de dados comparando um valor previsto e um valor observado ou conhecido. Quanto menor for um valor de RMSE, mais próximos serão os valores previstos e observados.

- R2 (“R ao quadrado”) ou o coeficiente de determinação é uma medida que mostra o quão perto os dados estão da linha de regressão ajustada. Esta pontuação estará sempre entre 0 e 100% (ou 0 a 1 neste caso), onde 0% indica que o modelo não explica nada da variabilidade dos dados de resposta em torno de sua média, e 100% indica o contrário, explica toda a variabilidade. Isso significa que, em geral, quanto maior o R-quadrado, melhor o modelo se ajusta aos seus dados.

- R-quadrado ajustado é uma versão modificada do R-quadrado que foi ajustada para o número de preditores no modelo. O R-quadrado ajustado aumenta somente se o novo termo melhorar o modelo mais do que seria esperado pelo acaso. Ele diminui quando um preditor melhora o modelo menos do que o esperado por acaso.

In [79]:
# Visualizando a raiz do erro quadrático médio
linearModel.summary.rootMeanSquaredError

0.8841928587948994

In [80]:
# Visualizando o R ao quadrado
linearModel.summary.r2

0.41845382365605144

In [81]:
# Visualizando o R ao quadrado ajustado
linearModel.summary.r2adj

0.4181997782719771

E suma, há definitivamente algumas melhorias necessárias para o seu modelo! Se você quiser continuar com este modelo, você pode "brincar" com os parâmetros que você passou para o seu modelo, as variáveis ​​que você incluiu no seu DataFrame original, e etc.

Lembre-se que, todo projeto de análise de dados e construção de modelos de aprendizado de máquina é importante sempre manter um espírito crítico e usar métodos cientificos com embasemento teórico e prático!

Terminamos nosso tutorial e por enquanto é isso! 

In [82]:
# Parando o "SparkSession"
spark.stop()

In [83]:
# Visualizando as variávies de ambiente
%whos

Variable                 Type                     Data/Info
-----------------------------------------------------------
DenseVector              type                     <class 'pyspark.ml.linalg.DenseVector'>
LinearRegression         ABCMeta                  <class 'pyspark.ml.regression.LinearRegression'>
SparkSession             type                     <class 'pyspark.sql.session.SparkSession'>
StandardScaler           ABCMeta                  <class 'pyspark.ml.feature.StandardScaler'>
bedroomsPerRoom          DataFrame                DataFrame[(total_bedrooms / total_rooms): double]
california_housing       DataFrame                       longitude  latitud<...>n[20000 rows x 9 columns]
col                      function                 <function col at 0x7fda98d2f160>
cols                     list                     n=9
dataframe_dim            function                 <function dataframe_dim at 0x7fda98d1eca0>
df                       DataFrame                DataFrame[label: d

### **Material de referência**

- https://www.datacamp.com/tutorial/apache-spark-tutorial-machine-learning