<a href="https://colab.research.google.com/github/Deriss/Tutoriais-Big-Data/blob/main/Tutorial_PySpark_Manipula%C3%A7%C3%A3o_de_Dataframes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tutorial PySpark

O Apache Spark é um mecanismo para a análise e processamento de Big Data. O Spark facilita o processamento distribuido e em paralelo, além de ter uma biblioteca própria para aplicar técnicas de Machine Learning em grandes volumes de dados.
Neste tutorial serão apresentadas as funções básicas do PySpark para a manipulação de dados utilizando DataFrames.

## Dependencias

Para poder utilizar o PySpark no Google Colab, primeiro precisamos instalar algumas dependencias

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null # java 8
!wget -q http://archive.apache.org/dist/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz #baixar o spark com Hadoop


In [None]:
!tar xf spark-3.3.3-bin-hadoop3.tgz # extrair os archivos do Spark
!pip install -q findspark # instalar biblioteca para achar o Spark no sistema

In [None]:
# Criando as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.3-bin-hadoop3"

# Inicializar o PySpark

Para testar o PySpark criaremos uma sesão local

In [None]:
# iniciando uma sessão local
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()  # O  * em local[*] indica que vamos utilizar todas as unidades de processamento disponíveis
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

# Carregar dados

Podemos carregar dados de diferentes fontes. Nesse tutorial vamos trabalhar com dados de exemplo do Spark disponibilizados em um arquivo csv.

In [None]:

# carregar dados exemplo
df_spark = spark.read.csv("./sample_data/california_housing_train.csv", inferSchema=True, header=True)

# ver o Schema indicando os tipos de dados de cada coluna
df_spark.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



# Exploração do dataset

Como o PySpark é focado em análise de grandes volumes de dados, o ideal é visualizar só uma parte do Dataframe para evitar problemas de memória. Existem diferentes métodos que podemos utilizar para obter algumas linhas do Dataframe.

O método `take(n)` retorna as primeiras n linhas do Dataframe

In [None]:
df_spark.take(5)

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=80100.0),
 Row(longitude=-114.56, latitude=33.69, housing_median_age=17.0, total_rooms=720.0, total_bedrooms=174.0, population=333.0, households=117.0, median_income=1.6509, median_house_value=85700.0),
 Row(longitude=-114.57, latitude=33.64, housing_median_age=14.0, total_rooms=1501.0, total_bedrooms=337.0, population=515.0, households=226.0, median_income=3.1917, median_house_value=73400.0),
 Row(longitude=-114.57, latitude=33.57, housing_median_age=20.0, total_rooms=1454.0, total_bedrooms=326.0, population=624.0, households=262.0, median_income=1.925, median_house_value=65500.0)]

O método `show()` pode ser utilizado para visualizar as primeiras n linhas em forma de tabela.

In [None]:
df_spark.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

O método `collect()` é utilizado para retornar todos os elementos de um DataFrame. Por esse motivo, só deve ser utilizado para DataFrames pequenos, geralmente depois de ter processado os dados iniciais para evitar problemas de falta de memória. Também podemos utilizar ele para obter só algumas linhas.

In [None]:
# Obter a primeira linha do dataframe utilizando o collect()
df_spark.collect()[0]


Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0)

In [None]:
# Obter as três primeiras linhas do dataframe utilizando o collect()
df_spark.collect()[0:3]

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=80100.0),
 Row(longitude=-114.56, latitude=33.69, housing_median_age=17.0, total_rooms=720.0, total_bedrooms=174.0, population=333.0, households=117.0, median_income=1.6509, median_house_value=85700.0)]

In [None]:
# Obter as três primeiras colunas da primeira linha do dataframe utilizando o collect()
df_spark.collect()[1][0:3]

(-114.47, 34.4, 19.0)

# Seleção de variáveis

Para selecionar só algumas variáveis do Dataframe podemos utilizar o método `select()` com diferentes formas para determinar as variáveis:

In [None]:
df_spark.select('housing_median_age').show(5)

+------------------+
|housing_median_age|
+------------------+
|              15.0|
|              19.0|
|              17.0|
|              14.0|
|              20.0|
+------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col
df_spark.select(col('housing_median_age')).show(5)

+------------------+
|housing_median_age|
+------------------+
|              15.0|
|              19.0|
|              17.0|
|              14.0|
|              20.0|
+------------------+
only showing top 5 rows



In [None]:
from pyspark.sql.functions import col
df_spark.select(df_spark.housing_median_age).show(5)

+------------------+
|housing_median_age|
+------------------+
|              15.0|
|              19.0|
|              17.0|
|              14.0|
|              20.0|
+------------------+
only showing top 5 rows



Podemos selecionar mais de uma variável utilizando virgulas para separar o nome de cada variável

In [None]:
df_spark.select('housing_median_age','population').show(5)

+------------------+----------+
|housing_median_age|population|
+------------------+----------+
|              15.0|    1015.0|
|              19.0|    1129.0|
|              17.0|     333.0|
|              14.0|     515.0|
|              20.0|     624.0|
+------------------+----------+
only showing top 5 rows



# Filtrar o Dataframe

Podemos filtrar as linhas de um dataset utilizando o método `filter()` ou o método `where()`. Os dois métodos funcionam da mesma forma.

In [None]:
df_spark.filter(df_spark['housing_median_age'] == 14.0).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -115.37|   32.82|              14.0|     1276.0|         270.0|     867.0|     261.0|       1.9375|           80900.0|
|  -115.53|   32.73|              14.0|     1527.0|         325.0|    1453.0|     332.0|        1.735|           61200.0|
|  -115.58|   32.79|              14.0|     1687.0|         507.0|     762.0|     451.0|       1.6635|           64400.0|
|  -116.22|    36.0|              14.0|     1372.0|         386.0|     436.0|     213.0|       1.1471|           32900.0|
+---------+--------+----

In [None]:
df_spark.where(df_spark['housing_median_age'] < 15.0).show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -115.37|   32.82|              14.0|     1276.0|         270.0|     867.0|     261.0|       1.9375|           80900.0|
|   -115.5|   32.75|              13.0|      330.0|          72.0|     822.0|      64.0|       3.4107|          142500.0|
|  -115.51|   32.68|              11.0|     2872.0|         610.0|    2644.0|     581.0|        2.625|           72700.0|
|  -115.52|   32.97|              10.0|     1879.0|         387.0|    1376.0|     337.0|       1.9911|           67500.0|
+---------+--------+----

# Criar novas variáveis

Para criar novas variáveis a partir de outras variáveis do Dataframe, podemos utilizar o método `withColumn()` que possui dois parâmetros: o nome da (nova) coluna e a função a aplicar para gerar a nova coluna

In [None]:
from pyspark.sql.functions import col

In [None]:
# Criar nova variável
df_spark = df_spark.withColumn('mean_people_per_household', col('population')/col('households'))
df_spark.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|mean_people_per_household|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|       2.1504237288135593|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|       2.4384449244060473|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|       2.8461538461538463|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|          

Para criar uma nova coluna com um único valor,é preciso usar a função lit() com o valor a adicionar como parâmetro

In [34]:
# Criando uma coluna de 0
from pyspark.sql.functions import lit
df_spark.withColumn('zeros_column', lit(0))

longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,mean_people_per_household,zeros_column
-114.31,34.19,15.0,5612.0,1283.0,1015.0,472.0,1.4936,66900.0,2.1504237288135597,0
-114.47,34.4,19.0,7650.0,1901.0,1129.0,463.0,1.82,80100.0,2.4384449244060478,0
-114.56,33.69,17.0,720.0,174.0,333.0,117.0,1.6509,85700.0,2.8461538461538463,0
-114.57,33.64,14.0,1501.0,337.0,515.0,226.0,3.1917,73400.0,2.2787610619469025,0
-114.57,33.57,20.0,1454.0,326.0,624.0,262.0,1.925,65500.0,2.381679389312977,0
-114.58,33.63,29.0,1387.0,236.0,671.0,239.0,3.3438,74000.0,2.807531380753138,0
-114.58,33.61,25.0,2907.0,680.0,1841.0,633.0,2.6768,82400.0,2.9083728278041074,0
-114.59,34.83,41.0,812.0,168.0,375.0,158.0,1.7083,48500.0,2.373417721518988,0
-114.59,33.61,34.0,4789.0,1175.0,3134.0,1056.0,2.1782,58400.0,2.9678030303030303,0
-114.6,34.83,46.0,1497.0,309.0,787.0,271.0,2.1908,48100.0,2.904059040590406,0


# Mudar o nome de uma variável

In [None]:
df_spark = df_spark.withColumnRenamed('mean_people_per_household', 'new_mean_people_per_household')

# Remover variáveis

In [None]:
df_spark = df_spark.drop('new_mean_people_per_household')


In [None]:
df_spark.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

# Cálculos simples

In [38]:
from pyspark.sql.functions import min, max, mean, stddev
df_spark.select(min(col('housing_median_age')))

min(housing_median_age)
1.0


In [40]:
df_spark.select(max(col('housing_median_age')))

max(housing_median_age)
52.0


In [41]:
df_spark.select(mean(col('housing_median_age')),stddev(col('housing_median_age')))

avg(housing_median_age),stddev_samp(housing_median_age)
28.58935294117647,12.586936981660406
