<a href="https://colab.research.google.com/github/Arcanaido/Arcanaido/blob/main/TransformacaoDados.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
# Abilita o pyspark
!pip install pyspark



In [4]:
# Importa o pyspark, SparkSession e outras funções do pyspark.sql
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper

In [5]:
#Iniciar sessão Spark
spark = SparkSession.builder.getOrCreate()

In [6]:
# Guarda: Leitura do arquivo cvs "vendedores"
vendedores = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/vendedores.csv", header=True, inferSchema=True)
# Guarda: Leitura do arquivo csv "itens_pedidos"
itens_pedido = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/data/itens_pedido.csv", header=True, inferSchema=True)

In [7]:
# Apresenta os itens do dataframe vendedores de forma tabular
vendedores.show()
# Printa o schema das colunas (o tipo de dados que cada uma está configurada)
vendedores.printSchema()

+--------------------+------------+-----------------+---------------+
|         id_vendedor|cep_vendedor|  cidade_vendedor|estado_vendedor|
+--------------------+------------+-----------------+---------------+
|3442f8959a84dea7e...|       13023|         campinas|             SP|
|d1b65fc7debc3361e...|       13844|       mogi guacu|             SP|
|ce3ad9de960102d06...|       20031|   rio de janeiro|             RJ|
|c0f3eea2e14555b6f...|        4195|        sao paulo|             SP|
|51a04a8a6bdcb23de...|       12914|braganca paulista|             SP|
|c240c4061717ac180...|       20920|   rio de janeiro|             RJ|
|e49c26c3edfa46d22...|       55325|           brejao|             PE|
|1b938a7ec6ac5061a...|       16304|        penapolis|             SP|
|768a86e36ad6aae3d...|        1529|        sao paulo|             SP|
|ccc4bbb5f32a6ab2b...|       80310|         curitiba|             PR|
|8cb7c5ddf41f4d506...|       75110|         anapolis|             GO|
|a7a9b880c49781da6..

In [8]:
# tabela_alvo.withColumn("coluna_desejada", tabela_alvo.coluna_alvo.funçãoDesejada())
# Obs: Coluna_alvo pode ser a coluna_desejada ou outra coluna existente na tabela_alvo
# Obs: caso a coluna desejada não exista será criada uma na tabela alvo, se já existir ele vai usá-la
vendedores_tratados_df = vendedores.withColumn("cep_vendedor", vendedores.cep_vendedor.cast('string'))

# Segunda forma de usar o withColumn
# tabela_alvo.withColumn("coluna_desejada", funçãoDesejada(tabela_alvo['coluna_alvo']))
vendedores_tratados_df = vendedores_tratados_df.withColumn("cidade_vendedor_tratado", upper(vendedores['cidade_vendedor']))

vendedores_tratados_df.show()
vendedores_tratados_df.printSchema()

+--------------------+------------+-----------------+---------------+-----------------------+
|         id_vendedor|cep_vendedor|  cidade_vendedor|estado_vendedor|cidade_vendedor_tratado|
+--------------------+------------+-----------------+---------------+-----------------------+
|3442f8959a84dea7e...|       13023|         campinas|             SP|               CAMPINAS|
|d1b65fc7debc3361e...|       13844|       mogi guacu|             SP|             MOGI GUACU|
|ce3ad9de960102d06...|       20031|   rio de janeiro|             RJ|         RIO DE JANEIRO|
|c0f3eea2e14555b6f...|        4195|        sao paulo|             SP|              SAO PAULO|
|51a04a8a6bdcb23de...|       12914|braganca paulista|             SP|      BRAGANCA PAULISTA|
|c240c4061717ac180...|       20920|   rio de janeiro|             RJ|         RIO DE JANEIRO|
|e49c26c3edfa46d22...|       55325|           brejao|             PE|                 BREJAO|
|1b938a7ec6ac5061a...|       16304|        penapolis|       

In [9]:
itens_pedido.show()
itens_pedido.printSchema()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-----------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|valor_frete|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|      13.29|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|      19.93|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|      17.87|
|00024acbcdf0a6daa...|             1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|      12.79|
|00042b26cf59d7ce6...|             1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|      18.14|
|00048cc3ae777c65d...|             1|ef92defde84

In [10]:
itens_pedido_tratados_df = itens_pedido. \
    withColumn('preco', col('preco').cast('float')). \
    withColumn('calor_frete', col('valor_frete').cast('float')). \
    withColumnRenamed('valor_frete', 'frete') .\
    withColumn('valor_total', col('preco') + col('frete'))

itens_pedido_tratados_df.show()
itens_pedido_tratados_df.printSchema()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+------------------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|frete|calor_frete|       valor_total|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+------------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|13.29|      13.29|  72.1900015258789|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|19.93|      19.93| 259.8299938964844|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|17.87|      17.87|            216.87|
|00024acbcdf0a6daa...|             1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|12.79|      12.79|25.779999771

In [12]:
from pyspark.sql.functions import to_date, date_format

# to_date -> Converte uma string ou um stimestamp em um objeto do tipo DateType
# Para que serve: Útil quando você quer tratar apenas a data (sem hora) e precisa filtrar, agrupar ou ordernar por datas

# date_format -> Converte um campo de data ou um timestamp em uma string formatada
# Para que serve: Quando você quer extrair partes específicas de uma data (como "ano-mês", "dia", "Hora", etc.) ou exibir em um formato customizado

# Guarda: Um dataframe que teve a coluna "data_limite_envio" covertida para o tipo DateType e depois nomeada para data
itens_pedido_data_df = itens_pedido_tratados_df.withColumn("data", to_date(col("data_limite_envio")))

# Guarda: Um dataframe cque teve a coluna data_limite_envio para uma string organizada em dia - mês - ano e nomeada data_hr
itens_pedido_data_df = itens_pedido_data_df.withColumn('data_hr', date_format(col('data_limite_envio'), 'dd/MM/yyyy'))
# Guarda: Um dataframe que teve a coluna data_limite_envio para uma string organizada em hora:minuto:segundo
itens_pedido_data_df = itens_pedido_data_df.withColumn('hora', date_format(col('data_limite_envio'), "HH:mm:ss"))
# Obs: 'MM' é para mês enquanto 'mm' é para minutos

itens_pedido_data_df.show()
itens_pedido_data_df.printSchema()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+------------------+----------+----------+--------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|frete|calor_frete|       valor_total|      data|   data_hr|    hora|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+------------------+----------+----------+--------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|13.29|      13.29|  72.1900015258789|2017-09-19|19/09/2017|09:45:35|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|19.93|      19.93| 259.8299938964844|2017-05-03|03/05/2017|11:05:13|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|17.87|      17.87| 

In [14]:
from pyspark.sql.functions import unix_timestamp, from_unixtime

# unix_timestamp -> Converte uma string de data/hora em um timestamp
# Para que serve: Qando você precisa trabalhar com tempo como número (por exemplo, calcular diferença entre em segundos entre duas datas)

# from_unitime -> Converte um número (segundos desde a época Unix) de volta para um timestamp legível
# Para que serve: Para transformar de volta valores de segundos em data legível - útil após cálculos com unix_timestamp

itens_pedido_data_reverso_df = itens_pedido_data_df.withColumn('timestamp', unix_timestamp(col('data_hr'), "dd/MM/yyyy"))

itens_pedido_data_reverso_df = itens_pedido_data_reverso_df.withColumn('data_formatada', from_unixtime('timestamp', 'yyyy-MM-dd')).withColumn('data_formatada', col('data_formatada').cast('date'))

itens_pedido_data_reverso_df.show()
itens_pedido_data_reverso_df.printSchema()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+------------------+----------+----------+--------+----------+--------------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|frete|calor_frete|       valor_total|      data|   data_hr|    hora| timestamp|data_formatada|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+------------------+----------+----------+--------+----------+--------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|13.29|      13.29|  72.1900015258789|2017-09-19|19/09/2017|09:45:35|1505779200|    2017-09-19|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|19.93|      19.93| 259.8299938964844|2017-05-03|03/05/2017|11:05:13|1493769600|    2017-05

In [18]:
itens_pedido_data_df.write.mode('overwrite').option('header', 'true').csv('/content/drive/MyDrive/Colab Notebooks/data/output/itens_pedido_tratado_csv')

In [19]:
spark.read.option('header', 'true').csv('/content/drive/MyDrive/Colab Notebooks/data/output/itens_pedido_tratado_csv').show(5)

+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----+-----------+------------------+----------+----------+--------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|   data_limite_envio|preco|frete|calor_frete|       valor_total|      data|   data_hr|    hora|
+--------------------+--------------+--------------------+--------------------+--------------------+-----+-----+-----------+------------------+----------+----------+--------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19T09:45:...| 58.9|13.29|      13.29|  72.1900015258789|2017-09-19|19/09/2017|09:45:35|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03T11:05:...|239.9|19.93|      19.93| 259.8299938964844|2017-05-03|03/05/2017|11:05:13|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18T14:48:...|199.0|17.87|      17.87| 

In [20]:
spark.stop()