# Uma (muito breve) introdução à Python

![Python logo, disponível em: https://www.python.org/static/community_logos/python-logo-master-v3-TM-flattened.png](https://www.python.org/static/community_logos/python-logo-master-v3-TM-flattened.png)

Python é a linguagem "queridinha" dos Cientistas de Dados devido a seu catálogo vasto de bibliotecas de manipulação de dados, matemática e frameworks que abstraem muitos detalhes de programação. Além disso, a integração com as bibliotecas é extremamente simples, e sua sintaxe é simples e fácil de aprender. 


## Variáveis
Python é uma linguagem **fortemente tipada**, ou seja, faz uso dos tipos convencionais (int, float, char, etc.), porém ela é também **dinamicamente tipada**, o que significa que você não precisa declarar o tipo de uma variável. Vamos ver alguns exemplos.

In [None]:
nome = "Goku"
altura = 1.85
filhos = ['Gohan', 'Goten']

print(f'Oi eu sou {nome}. Tenho {altura} e meus filhos sao: {filhos}')
print(f'Nome {type(nome)}, Altura {type(altura)}, Filhos {type(filhos)}')

## Dicionários
Python fornece uma estrutura de dados muito útil para cientistas de dados (e programadores no geral) chamada de Dicionário. Você pode enchergar ela como um `Hash Map`, onde temos pares *chave-valor*. O grande diferencial é que esses pares podem ser de tipos diferentes.

In [None]:
meu_dict = {
    'nome':'Goku',
    'altura': 1.85,
    'filhos': ['Gohan', 'Goten']
}

print(meu_dict)

## Condicionais, loops e escopo
Códigos em Python não utilizam chaves para identificação de escopo. a identificação é feita a partir da **identação** do código. Para ilustrar, vamos fazer um código que detecta valores pares ou ímpares em uma lista:

In [None]:
nums = [1,2,3,4,5,6,7,8,9,10]

for num in nums:
  if num % 2 == 0:
    print(f"{num} eh par")
  else:
    print(f'{num} eh impar')

## Funções e compreensão de lista

Python trabalha com funções da mesma forma que outras linguagens. Você declara suas funções e as utiliza onde for necessário, passando os parâmetros corretos. 
Python ainda fornece uma facilidade para a manipulação de listas, chamada **compreensão de listas**. Nisso, você pode converter um `for` explícito em apenas uma linha de código. Vamos ver um exemplo de compreensão de listas com

In [None]:
def par_ou_impar(num):
  if num % 2 == 0:
    return f"{num} eh par"
  else:
    return f'{num} eh impar'

resultado = [ par_ou_impar(num) for num in nums]
print(resultado)

def eh_par(num):
  if num % 2 == 0:
    return True
  return False

pares = [num for num in nums if eh_par(num)]
print(pares)


# Analisando e transformando dados com Spark

Iremos usar a biblioteca de Ciência de Dados Distribuída chamada PySpark (versão Python do Spark). O Spark possui diversas funcionalidades implementadas para manipulação de dados e execução de *pipelines* de Aprendizado de Máquina em **Sistemas Distribuídos**. Um Sistema Distribuído é um que engloba vários computadores interconectados operando como uma única unidade. Sistemas assim são necessários para projetos de Big Data, pois o volume de dados excede a capacidade de uma única máquina processar. 

Computação distribuída foge do escopo deste Notebook, portanto iremos abstrair o conceito. A razão de trabalharmos com Spark é que esta é uma biblioteca extremamente simples e didática para ensinar as principais operações de ciência de dados. Spark é construído em Scala, porém fornece aplicações em diversas linguagens. Uma delas é Python, chamada PySpark. Spark fornece uma série de módulos:

![Pilha de APIs, disponível em: https://www.oreilly.com/library/view/spark-the-definitive/9781491912201/](https://izhangzhihao.github.io/assets/images/spark-05.png)

Na aula de hoje veremos um pouco sobre `Dataframes` e `SparkSQL`, pois operam "em conjunto".

**Dataframes** são estruturas tabulares, assim como os DataFrames de Pandas. A diferença, aqui, é que Dataframes em Spark são _distribuídos_ e construídos em cima de RDDs (a unidade básica e de mais baixo nível onde as demais APIs de Spark são construídas). 

**SparkSQL** é um conjunto de funcionalidades que são operadas em Dataframes. Veremos que podemos manipular Dataframes tanto programaticamente quanto por linguagem SQL. Além disso, SparkSQL oferece uma série de outras ferramentas para a realização de operações tabulares distribuídas.


## Instalando e importando PySpark

Para instalá-la, basta fazer o download  e seguir os passos em: http://spark.apache.org/downloads.html, ou instalar usando Pypi:

	pip install pyspark

Para utilizá-la em conjunto com um Jupyter Notebook, você precisa baixar a biblioteca Jupyter.

	pip install jupyter
    
Depois, baixar também a biblioteca findspark

	pip install findspark

E pronto! Agora, no começo de cada notebook você deverá importar tanto `findspark` quanto `pyspark`:

In [None]:
! pip install pyspark
! pip install findspark

import findspark
findspark.init()

import pyspark

## O objeto SparkSession

Para manipularmos Dataframes e utilizar as funções do SparkSQL, porém, precisamos criar um objeto `SparkSession`. O `SparkSession` é um objeto que serve como ponto de entrada para as APIs estruturadas do Spark. 

![SparkSession, disponível em: https://www.dcc.fc.up.pt/~edrdo/aulas/bdcc/classes/spark_arch/images/python_and_spark.png](https://www.dcc.fc.up.pt/~edrdo/aulas/bdcc/classes/spark_arch/images/python_and_spark.png)

Para inicializar um SparkSession, importamos sua definição de `pyspark.sql` e criamos um objeto com `SparkSession.builder.getOrCreate()`. Esse método checa se já existe um `SparkSession` ativo, e se não existir, cria um novo.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
print(spark)

## Criando Dataframes

Dataframes são objetos tabulares distribuídos. Você pode criar um dataframe lendo arquivos csv, JSON, [ORC](https://orc.apache.org/) e [Parquet](https://github.com/apache/parquet-format). Vamos começar fazendo um exemplo de leitura simplificada de um arquivo csv. Vamos ler um conjunto de dados sobre vôos realizados nos Estados Unidos no ano de 2014.

In [None]:
flights_df = spark.read.csv('./Datasets/Flight/flights.csv', header=True)
flights_df.show(10)


## Schemas

O Spark pode fazer múltiplas leituras de um arquivo CSV para automaticamente tentar **inferir** qual é o _Schema_ do Dataframe, ou seja, qual é a tipagem de cada coluna e se ela pode ter ou não valores _null_. 

In [None]:
flights_df.printSchema()

flights_df = spark.read.csv('./Datasets/Flight/flights.csv', header=True, inferSchema=True)
flights_df.printSchema()

## Detectando valores errados

Parece que o Spark não inferiu corretamente o tipo de algumas colunas! Isso pode acontecer se tivermos múltiplos tipos de dados numa única coluna. Quando isso acontece, ele atribui por padrão o tipo `string`. Vamos tentar detectar quais valores estão dando problema.

Para isso, iremos utilizar duas funções de Dataframes do Spark: `select()` e `filter()`. Vamos ver um pouco de cada.

### Select

O comando `select()` opera em nível de **coluna**, filtrando quais colunas você quer exibir. Ele pode ainda criar novas colunas, com métodos que analisam valores de uma ou mais colunas linha a linha. É o equivalente ao comando `SELECT` de uma consulta SQL. 

In [None]:
flights_df.select(flights_df.tailnum, 'origin', flights_df['dest']).show(5)

flights_df.select('tailnum', 'origin', 'dest', (flights_df.air_time / 60).alias('air_time_hours')).show()

### Filter

A função `filter()` opera em nível de **linhas**, e utiliza operações lógicas que retornam resultados _binários_. É similar ao comando `WHERE` de uma consulta SQL.

In [None]:
flights_df.filter(flights_df.dep_delay < 0).show()

air_hours = (flights_df.air_time / 60).alias('air_time_hours')

flights_df.select('tailnum', 'origin', 'dest', air_hours).filter('air_time_hours < 2').show(3)



### Usando funções customizadas em `filter()`

Para podermos entender o que está errado com os valores que deveriam ser `Integers` em algumas das colunas do nosso `Dataframe`, nós vamos primeiro ter que criar uma função que verifique se os valores de uma coluna são números ou não. O Spark fornece um mecanismo chamado `UDF` (_User-Defined Functions_) para que seus usuários criem funções customizadas. Para tal, precisamos importar o pacote `pyspark.sql.functions`, definir nossas funções e encapsulá-las em um `UDF` usando o seguinte formato:

`F.udf(funcão, tipo_de_retorno)`

Vamos criar uma função que verifica se um valor é inteiro ou não, e aplicar esta função em um filtro para exibir dados que **não** são inteiros.

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as types

def is_not_integer(num):
  num = num.replace('-','')
  return not num.isdigit()

is_not_integer_udf = F.udf(is_not_integer, types.BooleanType())
flights_df.filter(is_not_integer_udf(flights_df.dep_time)).show(10)

## Removendo valores errôneos

Detectamos que em nosso `Dataframe`, valores nulos (`NaN`s) estão representados pelo String **NA**. Vamos tentar remover todas as linhas que tiverem valores **NA**.

Para facilitar nossa vida, Spark fornece opções para a definição de valores nulos durante a leitura de arquivos. Vamos reler nosso _flights.csv_ definindo como `null` células da nossa tabela que tiverem o String **NA**.

Uma outra opção seria filtrar o Dataframe coluna a coluna para valores `!= NA`, mas isso é um trabalho repetitivo e entediante que pode ser resolvido de uma maneira mais simples.

In [None]:
flights_df = spark.read.csv('./Datasets/Flight/flights.csv', header=True, inferSchema=True, nullValue='NA')
flights_df.printSchema()

flights_df.filter(F.isnull(flights_df.dep_time)).show()
clean_flights_df = flights_df.dropna()
clean_flights_df.filter(F.isnull(clean_flights_df.dep_time)).show()

## Juntando tabelas diferentes

A função `join()` une dois Dataframes a partir de uma coluna em comum. Há vários tipos diferentes de joins no Spark:

![Tipos de join. Disponível em: https://medium.com/bild-journal/pyspark-joins-explained-9c4fba124839](https://miro.medium.com/max/622/1*6d7MzkjxS0eBWjOJN5TaAQ.jpeg)

Todos os tipos de join mostrados acima podem ser usados junto com o método. Vamos ver dois exemplos: `'left_outer'` e `'inner'`. 

In [None]:
airplanes_df = spark.read.csv('./Datasets/Flight/airplanes.csv', header=True, inferSchema=True, nullValue='NA')
airplanes_df = airplanes_df.withColumnRenamed('year', 'year_plane')
airplanes_df.show(10)

flights_planes_df = clean_flights_df.join(airplanes_df, on='tailnum', how='inner')
flights_planes_df.show(5)

## Removendo colunas desnecessárias e linhas duplicadas

Algumas colunas de nosso Dataframe podem, na verdade, influenciar negativamente no treinamento de um modelo preditivo. Algumas informações podem de fato ser removidas. Para tal, iremos utilizar o método `drop()`.

Só para nos precavermos, vamos remover também linhas duplicadas utilizando o método `distinct()`, e remover linhas com valores nulos que podem têr vindo de `airplanes.csv`.

## Adicionando e transformando colunas

Vamos fazer mais algumas modificações que irão auxiliar futuramente nosso modelo a prever se um vôo irá atrasar ou não. Algoritmos de aprendizado de máquina não funcionam bem quando dados estão em dimensões diferentes entre colunas. É importante que façamos modificações para que as colunas estejam na mesma dimensão e assim uma não acabe introduzindo viés sobre a outra. 

Iremos trabalhar com normalização dos dados em algumas colunas aqui. Outras, que representam dados **categóricos**, serão manipuladas a seguir. 

### Normalização em _z-score_

Esse tipo de normalização transforma os dados em valores que representam sua variação em relação à média. 

$$ z = \frac{x_i - \mu}{\delta} $$

![Normalização em z-score, disponível em: https://miro.medium.com/max/692/1*er9yh82tMZ85RWOSkKb-xA.png](https://miro.medium.com/max/692/1*er9yh82tMZ85RWOSkKb-xA.png)

Para transformar nossos dados, vamos precisar definir uma `UDF` que realize esse cálculo. Vamos precisar também definir a média e o desvio padrão de nossas colunas. Podemos utilizar o comando `agg()` pra isso. Finalmente, precisamos criar colunas com esses valores transformados utilizando o método `withColumn()`.

### Criando a coluna _alvo_

Vamos aproveitar esse momento e criar a nossa coluna _alvo_. Essa coluna será o que iremos utilizar como objetivo para o algoritmo de aprendizado tentar prever. Essa coluna alvo será uma coluna com valores binários que irá informar se o vôo atrasou ou não.

## Visualizando Dados graficamente

Infelizmente, Spark não suporta visualização de dados nativamente. Para visualizá-los, podemos:
- Utilizar bibliotecas de terceiros, como a `pyspark_dist_explore` (não recomendado, última versão em 2019), ou
- Retirar uma amostra dos dados e transformá-la em `Pandas Dataframes`, e aproveitar as bibliotecas `Matplotlib` ou `Seaborn`

# Aprendizado de Máquina com Spark

Spark oferece um ambiente completo para aprendizado de máquina em sua biblioteca `MLlib`, que implementa diversas tarefas de modo distribuído e escalável. Veremos alguns exemplos de suas funcionalidades, que podem ser divididas em três grandes categorias:
- Transformações de Características
- Algoritmos
- Otimização 

## Transformações de Características

Spark oferece uma grande quantidade de transformações de características que podem ser aplicadas em Dataframes. Essas transformações vão além das funcionalidades vistas antes na biblioteca SQL. 


As transformações de características estão localizadas no módulo `pyspark.ml.feature`. No nosso caso, nós iremos ver quatro transformações necessárias para executarmos nosso algoritmo preditor de atrasos de vôos:
- Indexador de String
- Transformador de valores em _buckets_
- Transformador de representações _One-Hot_
- Criador de vetores

### StringIndexer

O Indexador de String transforma o conteúdo de cada célula de uma coluna de Strings em um valor categórico. Essa decisão é feita com base na frequência do elemento. 

Vamos modificar as colunas String de nossa tabela.

### Bucketizer

Às vezes é interessante transformar valores contínuos em discretos para uma generalização melhor do modelo. O modelo assim passa a diferenciar por _categorias_ de valores e não tentar entender um comportamento a partir da variação em um valor. Ou ainda, é interessante diminuir o número de categorias presentes em uma variável já discreta.

Essa tarefa é conhecida como _bucketing_ ou _binning_. Vamos fazer essa transformação com as colunas `dep_time` e `arr_time`.

### OneHotEncoder

Dados categóricos não podem ser manipulados pela maioria de algoritmos de aprendizado de máquina, pelo simples fato de que eles não possuem relação matemática alguma entre si. Para podermos usar dados categóricos na maioria dos algoritmos, precisamos transformá-los em uma representação **one-hot**. 

Spark realiza essa conversão através do `OneHotEncoder`, porém o que faz de fato é gerar uma representação **dummy**: 

![Representação one-hot. Retirado de: https://www.kaggle.com/getting-started/187540](https://www.googleapis.com/download/storage/v1/b/kaggle-forum-message-attachments/o/inbox%2F5315434%2Fa9886ea90db74aad0b2f86d2686c337b%2Fohe-vs-dummy.png?generation=1601465979026694&alt=media)

Vamos converter os índices e buckets que criamos em representações one-hot.

### VectorAssembler

Spark requer que toda a informação que será passada para um algoritmo de Aprendizado de Máquina seja convertida em um único vetor. Para fazer isso, usaremos `VectorAssembler`.

## Algoritmos de aprendizado de máquina em Spark

Spark possui três grandes categorias de algoritmos de aprendizado de máquina. 
- Classificação e Regressão (Aprendizado Supervisionado)
- Agrupamento (Aprendizado Não-Supervisionado)
- Filtragem Colaborativa (Sistemas de Recomendação)

Para nosso exemplo de preditor de atraso em vôos, iremos considerar a primeira categoria. Iremos utilizar o algoritmo de Regressão Logística, uma das unidades básicas para a criação das famosas Redes Neurais.

### Divindo a base de dados entre treino e teste

Para rodarmos nosso algoritmo de aprendizado, nós precisamos primeiro dividir nossa tabela em um conjunto de `treino` (que será aplicado ao algoritmo para ele gerar um modelo), e um conjunto de `teste` (onde iremos avaliar sua eficácia). Outra possibilidade é realizar uma `validação cruzada` dos dados. 

![Avaliação de algoritmos de ML. Retirado de: https://miro.medium.com/max/720/1*4G__SV580CxFj78o9yUXuQ.png](https://miro.medium.com/max/720/1*4G__SV580CxFj78o9yUXuQ.png)



### Regressão Logística

O algoritmo de regressão logística tenta estimar um modelo que trace uma curva em formato de S nos nossos dados, utilizando uma função chamada sigmoide para realizar classificação binária:  

![Disponível em: https://www.javatpoint.com/logistic-regression-in-machine-learning](https://static.javatpoint.com/tutorial/machine-learning/images/logistic-regression-in-machine-learning.png)



### Avaliando o modelo

Para avaliar a performance dos algoritmos, podemos utilizar dois objetos:`MulticlassClassificationEvaluator` e `BinaryClassificationEvaluator`. O primeiro lida com avaliação de modelos capazes discretizar entre múltiplas classes e contém métricas como `precisão` e `revocação`, entre outras. Já o segundo, foca em análise de classificação binária e possui implementações específicas para esse caso, como `AUC` (área abaixo da curva ROC). 

Precisão e Revocação:

![Disponível em: https://miro.medium.com/max/878/1*Ub0nZTXYT8MxLzrz0P7jPA.png](https://miro.medium.com/max/878/1*Ub0nZTXYT8MxLzrz0P7jPA.png)


Área abaixo da curva ROC (AUC):

![Disponível em: https://glassboxmedicine.files.wordpress.com/2019/02/roc-curve-v2.png](https://glassboxmedicine.files.wordpress.com/2019/02/roc-curve-v2.png)


# O que vem a seguir?

Vimos que nosso sistema está relativamente bom, pelas métricas analizadas, com quase 86% de precisão, e com uma AUC de 92.5% (ou seja, detectou muito mais instâncias corretas do que erradas, a medida que ambas taxas sobem). Porém, o trabalho de um cientista de dados não para por aí: ele deve investigar se consegue melhorar ainda mais essas taxas. Para tal, ele pode tentar algumas coisas a mais:

- **Explorar ainda mais os dados:** combinar características, excluir características desnecessárias ou adicionar características de outras fontes de dados (por exemplo, informação sobre os aeroportos em si).
- **Tentar outros parâmetros manualmente:** nós usamos aqui os valores padrões dos parâmetros da Regressão Logística. Podemos manipulá-los e verificar se há uma melhora no modelo. 
- **Realizar validação cruzada e exploração de parâmetros em grade:** utilizar da validação cruzada para reduzir viés ao explorar múltiplas combinações de conjuntos de treino, teste e validação. Aplicar exploração em grade dentro desse cenário para analizar todas as possíveis combinações de valores de parâmetros e escolher a opção mais relevante.
- **Utilizar outros algoritmos:** verificar como o sistema se comporta aplicando outros algoritmos, como árvores de decisão, algoritmos de vizinhança ou redes neurais.