In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, upper
spark = SparkSession.builder.getOrCreate()

In [None]:
# conexão manual
from google.colab import drive
drive.mount('/content/drive')

In [2]:
vendedores = spark.read.csv('/content/drive/MyDrive/Material de apoio - M27/vendedores.csv', header=True, inferSchema=True)
itens_pedido = spark.read.csv('/content/drive/MyDrive/Material de apoio - M27/itens_pedido.csv', header=True, inferSchema=True)

In [4]:
vendedores.show()
vendedores.printSchema()

+--------------------+------------+-----------------+---------------+
|         id_vendedor|cep_vendedor|  cidade_vendedor|estado_vendedor|
+--------------------+------------+-----------------+---------------+
|3442f8959a84dea7e...|       13023|         campinas|             SP|
|d1b65fc7debc3361e...|       13844|       mogi guacu|             SP|
|ce3ad9de960102d06...|       20031|   rio de janeiro|             RJ|
|c0f3eea2e14555b6f...|        4195|        sao paulo|             SP|
|51a04a8a6bdcb23de...|       12914|braganca paulista|             SP|
|c240c4061717ac180...|       20920|   rio de janeiro|             RJ|
|e49c26c3edfa46d22...|       55325|           brejao|             PE|
|1b938a7ec6ac5061a...|       16304|        penapolis|             SP|
|768a86e36ad6aae3d...|        1529|        sao paulo|             SP|
|ccc4bbb5f32a6ab2b...|       80310|         curitiba|             PR|
|8cb7c5ddf41f4d506...|       75110|         anapolis|             GO|
|a7a9b880c49781da6..

In [6]:
vendedores_tratados_df = vendedores.withColumn('cep_vendedor', vendedores.cep_vendedor.cast('string'))

vendedores_tratados_df = vendedores_tratados_df.withColumn('cidade_vendedor_tratado', upper(vendedores['cidade_vendedor']))
vendedores_tratados_df.show()
vendedores_tratados_df.printSchema()

+--------------------+------------+-----------------+---------------+-----------------------+
|         id_vendedor|cep_vendedor|  cidade_vendedor|estado_vendedor|cidade_vendedor_tratado|
+--------------------+------------+-----------------+---------------+-----------------------+
|3442f8959a84dea7e...|       13023|         campinas|             SP|               CAMPINAS|
|d1b65fc7debc3361e...|       13844|       mogi guacu|             SP|             MOGI GUACU|
|ce3ad9de960102d06...|       20031|   rio de janeiro|             RJ|         RIO DE JANEIRO|
|c0f3eea2e14555b6f...|        4195|        sao paulo|             SP|              SAO PAULO|
|51a04a8a6bdcb23de...|       12914|braganca paulista|             SP|      BRAGANCA PAULISTA|
|c240c4061717ac180...|       20920|   rio de janeiro|             RJ|         RIO DE JANEIRO|
|e49c26c3edfa46d22...|       55325|           brejao|             PE|                 BREJAO|
|1b938a7ec6ac5061a...|       16304|        penapolis|       

In [7]:
itens_pedido.show()
itens_pedido.printSchema()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-----------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|valor_frete|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|      13.29|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|      19.93|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|      17.87|
|00024acbcdf0a6daa...|             1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|      12.79|
|00042b26cf59d7ce6...|             1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|      18.14|
|00048cc3ae777c65d...|             1|ef92defde84

In [8]:
itens_pedido_tratado_df = itens_pedido.\
withColumn('preco', col('preco').cast('float')).\
withColumn('valor_frete', col('valor_frete').cast('int')).\
withColumnRenamed('valor_frete', 'frete').\
withColumn('valor_total', col('preco') + col('frete'))

itens_pedido_tratado_df.show()
itens_pedido_tratado_df.printSchema()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|frete|valor_total|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|   13|       71.9|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|   19|      258.9|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|   17|      216.0|
|00024acbcdf0a6daa...|             1|7634da152a4610f15...|9d7a1d34a50524090...|2018-08-15 10:10:18| 12.99|   12|      24.99|
|00042b26cf59d7ce6...|             1|ac6c3623068f30de0...|df560393f3a51e745...|2017-02-13 13:57:51| 199.9|   18|      217.9|


In [9]:
# dados temporais
from pyspark.sql.functions import to_date, date_format

itens_pedido_data_df = itens_pedido_tratado_df.withColumn('data', to_date(col('data_limite_envio')))

itens_pedido_data_df = itens_pedido_data_df.withColumn('data_br', date_format(col('data_limite_envio'), 'dd/MM/yyyy'))
itens_pedido_data_df = itens_pedido_data_df.withColumn('hora', date_format(col('data_limite_envio'), 'HH:mm:ss'))

itens_pedido_data_df.show()
itens_pedido_data_df.printSchema()


+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+----------+----------+--------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|frete|valor_total|      data|   data_br|    hora|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+----------+----------+--------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|   13|       71.9|2017-09-19|19/09/2017|09:45:35|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|   19|      258.9|2017-05-03|03/05/2017|11:05:13|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48:30| 199.0|   17|      216.0|2018-01-18|18/01/2018|14:48:30|
|00024acbcdf0a6daa...|             1|7634da152a4610f15...|9d7a1d

In [11]:
from pyspark.sql.functions import unix_timestamp, from_unixtime # Essa função realiza o inverso da função de data comum

itens_pedido_data_reverso_df = itens_pedido_data_df.withColumn('timestamp', unix_timestamp(col('data_br'),'dd/MM/yyyy'))
itens_pedido_data_reverso_df = itens_pedido_data_reverso_df.withColumn('data_formatada', from_unixtime('timestamp', 'yyyy-MM-dd')).withColumn('data_formatada',col('data_formatada').cast('date'))

itens_pedido_data_reverso_df.show()
itens_pedido_data_reverso_df.printSchema()

+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+----------+----------+--------+----------+--------------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|  data_limite_envio| preco|frete|valor_total|      data|   data_br|    hora| timestamp|data_formatada|
+--------------------+--------------+--------------------+--------------------+-------------------+------+-----+-----------+----------+----------+--------+----------+--------------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35|  58.9|   13|       71.9|2017-09-19|19/09/2017|09:45:35|1505779200|    2017-09-19|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13| 239.9|   19|      258.9|2017-05-03|03/05/2017|11:05:13|1493769600|    2017-05-03|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 

In [16]:
itens_pedido_data_df.write.mode('overwrite').option('header', 'true').csv('output/itens_pedido_tratado')

In [17]:
spark.read.option('header', 'true').csv('output/itens_pedido_tratado').show()

+--------------------+--------------+--------------------+--------------------+--------------------+------+-----+-----------+----------+----------+--------+
|           id_pedido|item_id_pedido|          id_produto|         id_vendedor|   data_limite_envio| preco|frete|valor_total|      data|   data_br|    hora|
+--------------------+--------------+--------------------+--------------------+--------------------+------+-----+-----------+----------+----------+--------+
|00010242fe8c5a6d1...|             1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19T09:45:...|  58.9|   13|       71.9|2017-09-19|19/09/2017|09:45:35|
|00018f77f2f0320c5...|             1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03T11:05:...| 239.9|   19|      258.9|2017-05-03|03/05/2017|11:05:13|
|000229ec398224ef6...|             1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18T14:48:...| 199.0|   17|      216.0|2018-01-18|18/01/2018|14:48:30|
|00024acbcdf0a6daa...|             1|7634da152a4610f15...|

In [19]:
spark.stop()