## EXTRAÇÃO DE DADOS


In [None]:
# Vamos trabalhar com pyspark, instalando:
!pip install pyspark



In [None]:
# No meu caso já esta instalado, então para validar consulto assim:
import pyspark
print(pyspark.__version__)

3.5.4


In [None]:
# Precisamos importar SparkSession
from pyspark.sql import SparkSession

In [None]:
# Criando uma nova sessão
spark = SparkSession.builder.appName("sojaSensores").getOrCreate()

In [None]:
# Chamando a sessão que criamos e lendo o arquivo csv; "show(5)" é para mostrar os 5 registros
spark.read.format('csv').option('header',True).load('sample_data/sensores-iot.csv').show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
only showing top 5 rows



In [None]:
# Salvando em um Data Frame
df = spark.read.format('csv').option('header',True).load('sample_data/sensores-iot.csv')

In [None]:
# Mostrando o Data Frame apenas 5
df.show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|
+---+-----------+-----------+--------+--------------------+-----------+-----------+
only showing top 5 rows



## TRANSFORMAÇÃO DE DADOS

### DATA WRANGLING
"Refinando os dados": Etapa de (coleta, limpeza, transformação, enriquecimento, validação e exportação) estruturação e enriquecimento de dados brutos, para torná-los adequados para análise.

In [None]:
#Importando as funções do Pyspark
from pyspark.sql import functions as F

In [None]:
# Função de tratamento de data: Criando a coluna tipo data, convertendo a coluna timestamp para data
df = df.withColumn('date',F.to_date(F.col('timestamp')))

In [None]:
df.show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+
only showi

In [None]:
# Criando uma coluna ano do tipo interiro e uma coluna mês e dia
df = (df.withColumn('year', F.year(F.col('date')))
        .withColumn('month', F.month(F.col('date')))
        .withColumn('day', F.dayofmonth(F.col('date')))
        .withColumn('time', F.date_format(F.col('timestamp'), 'HH:mm:ss')))

In [None]:
df.show(5)

+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+--------+
|_c0|  device_id|temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|    time|
+---+-----------+-----------+--------+--------------------+-----------+-----------+----------+----+-----+---+--------+
|  0|sensor-2231|         25|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|05:19:39|
|  1|sensor-3869|         17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|20:35:34|
|  2|sensor-7079|         35|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|18:10:28|
|  3|sensor-1163|         15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|11:38:19|
|  4|sensor-4483|         14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|07:56:16|
+---+-----------+-----------+--------+----------

#### Tratar Dados Nulos

In [None]:
# Seleciona a coluna de temperatura, usa "F.mean" para pegar a media e coletando a informação; e faz o mesmo com Humidade
mean_temp = df.select(F.mean(F.col('temperature'))).collect()[0][0]
mean_humidity = df.select(F.mean(F.col('humidity'))).collect()[0][0]

print("Média de temperatura -> ",mean_temp)
print("Média de humidade -> ",mean_humidity)

Média de temperatura ->  22.491042344217934
Média de humidade ->  55.031311808605125


In [None]:
# Montando o dicionario: El vai preencher quando null a coluna de temperature com dados de mean_temp; O mesmo com humidity
df = df.fillna({'temperature': mean_temp, 'humidity': mean_humidity})

#### Remoção de OUTLIERS

*   Será aceito temperatura entre 0 e 40, fora isso será um OUTLIERS
*   Item da lista



In [None]:
# When equivalente ao "if" ; otherwise equivalente ao "else"
# Se temperatura estiver entre 0 e 40, tá OK, usa a temperatura, senão usa mean_temp
df = df.withColumn('temperature', F.when(F.col('temperature').between(0, 40), F.col('temperature')).otherwise(F.lit(mean_temp)))

In [None]:
df.show(10)

+---+-----------+------------------+--------+--------------------+-----------+-----------+----------+----+-----+---+--------+
|_c0|  device_id|       temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|    time|
+---+-----------+------------------+--------+--------------------+-----------+-----------+----------+----+-----+---+--------+
|  0|sensor-2231|22.491042344217934|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|05:19:39|
|  1|sensor-3869|                17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|20:35:34|
|  2|sensor-7079|22.491042344217934|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|18:10:28|
|  3|sensor-1163|                15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|11:38:19|
|  4|sensor-4483|                14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|07

In [71]:
#Salvando o arquivo em parquet
df.write.parquet('parquet_tradicional_sensores.parquet')

In [72]:
# salvando comprimindo snappy
df.write.option('compression','snappy').parquet('parquet_snappy_sensores.parquet')

In [73]:
# salvando comprimindo gzip
df.write.option('compression','gzip').parquet('parquet_gzip_sensores.parquet')

In [74]:
spark.read.parquet('parquet_tradicional_sensores.parquet').show(5)
spark.read.parquet('parquet_snappy_sensores.parquet').show(5)
spark.read.parquet('parquet_gzip_sensores.parquet').show(5)

+---+-----------+------------------+--------+--------------------+-----------+-----------+----------+----+-----+---+--------+
|_c0|  device_id|       temperature|humidity|           timestamp|   latitude|  longitude|      date|year|month|day|    time|
+---+-----------+------------------+--------+--------------------+-----------+-----------+----------+----+-----+---+--------+
|  0|sensor-2231|22.491042344217934|      49|2004-10-08 05:19:...| 39.0319385|-167.644666|2004-10-08|2004|   10|  8|05:19:39|
|  1|sensor-3869|                17|      42|2010-06-16 20:35:...| 43.4913955|  76.084671|2010-06-16|2010|    6| 16|20:35:34|
|  2|sensor-7079|22.491042344217934|      58|2010-10-03 18:10:...|-39.9795415|-149.006886|2010-10-03|2010|   10|  3|18:10:28|
|  3|sensor-1163|                15|      64|2005-10-10 11:38:...| 28.1871005|-112.581419|2005-10-10|2005|   10| 10|11:38:19|
|  4|sensor-4483|                14|      65|2020-04-29 07:56:...| 89.2243515| -16.076939|2020-04-29|2020|    4| 29|07