# **Módulo 40**

___

###  Pandas

In [61]:
import pandas as pd

Criei uma lista com as colunas de interesse, usei pandas para ler o arquivo CSV.

In [62]:
columns = ['Description', 'Population (GB+NI)', 'Unemployment rate']

data = pd.read_csv('https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv', usecols=columns)

Salvei o Arquivo Bruto sem nenhum tipo de tratamento para utiliza-lo com o Apache Spark.

In [63]:
data.to_csv('uk-macroeconomic-data.csv', index=False)

##### Passos realizados:
1. Remoção da primeira linha do DataFrame, que continha dados irrelevantes.
2. Renomeação das colunas.
3. Eliminação de linhas com valores nulos para garantir a integridade dos dados.
4. Ordenação do DataFrame com base na coluna `Year` em ordem decrescente, seguida do reset do índice.


In [64]:
data.drop(index=0, inplace=True)

In [65]:
data =  data.rename(columns={'Description': 'year', 'Population (GB+NI)': 'population', 'Unemployment rate': 'unemployment_rate'})

In [66]:
data = data.dropna()

In [67]:
data = data.sort_values('year', ascending=False).reset_index(drop=True)

In [68]:
data.head()

Unnamed: 0,year,population,unemployment_rate
0,2016,65573,4.9
1,2015,65110,5.38
2,2014,64597,6.18
3,2013,64106,7.61
4,2012,63705,7.97


___

### PySpark

In [69]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import col

Fiz o download do Java e do Spark no meu computador. Em seguida, criei as variáveis de ambiente no sistema, apontando para os diretórios onde os arquivos foram instalados. Por fim, criei o objeto `spark`.

In [70]:
spark = SparkSession.builder.master('local[*]').appName('pyspark-notebook').getOrCreate()

Fiz a leitura do arquivo CSV. Nesse caso, não foi necessário selecionar as colunas de interesse,  
pois isso já havia sido feito no momento do download, no início do código. Foi necessário apenas renomear as colunas.

In [71]:
data_spark = spark.read.csv('uk-macroeconomic-data.csv', sep=',', header=True)

In [72]:
data_spark = data_spark.withColumnRenamed('Description', 'year').\
                        withColumnRenamed('Population (GB+NI)', 'population').\
                        withColumnRenamed('Unemployment rate', 'unemployment_rate')

Filter, Join e Dropna como ensinado nas aulas.

In [73]:
data_drop = data_spark.filter(data_spark.year == 'Units')

data_drop.show()

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



In [74]:
data_spark =  data_spark.join(other=broadcast(data_drop), on=['year'], how='left_anti')

In [75]:
data_spark = data_spark.dropna()

Ordenei o Dataframe com base na coluna `year` utilizando a função `col` combinada com `desc()` para aplicar a ordenação de forma decrescente.

In [76]:
data_spark = data_spark.orderBy(col('year').desc())

In [77]:
data_spark.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|2016|     65573|             4.90|
|2015|     65110|             5.38|
|2014|     64597|             6.18|
|2013|     64106|             7.61|
|2012|     63705|             7.97|
|2011|     63285|             8.11|
|2010|     62759|             7.87|
|2009|     62260|             7.61|
|2008|     61824|             5.69|
|2007|     61319|             5.33|
+----+----------+-----------------+
only showing top 10 rows

