In [1]:
%%sh
sudo pip install spark
sudo pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 41.0/41.0 kB 2.4 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: spark
  Building wheel for spark (setup.py): started
  Building wheel for spark (setup.py): finished with status 'done'
  Created wheel for spark: filename=spark-0.2.1-py3-none-any.whl size=58747 sha256=9c026bd0dcbcd0701852acc27820bdc2c77d89aea1c5840e7814cfb7761980ed
  Stored in directory: /root/.cache/pip/wheels/63/88/77/b4131110ea4094540f7b47c6d62a649807d7e94800da5eab0b
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar

In [2]:
import pandas as pd
import spark,pyspark
from pyspark.sql import *
from pyspark.sql import functions as f
from pyspark.sql import types as t 
from datetime import datetime, date
from pyspark.sql.functions import trim
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master('local').appName('AC5 DataEng').getOrCreate()

In [4]:
dfapark = spark.read.format('parquet').load('/content/vendas.parquet')
dfapark.show()

+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+
|      cod_ean|cod_pessoa|cod_transacao|      data|vlr|qtd|      nome| sms|email|classe|sexo|          cidade|         des_produto|des_familia|   des_secao|des_categoria|des_sub_categoria|__index_level_0__|
+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+
|7896901200013|      4644|      8064284|2020-10-26| 67|  3|NOME_a4644|true|false|  Ouro|   F|       São Paulo|                null|       null|        null|         null|             null|                0|
|7897001010014|      3578|      8067745|2020-10-26| 54|  7|NOME_o3578|true|false|  Ouro|   M|  Belo Horizonte|                null|       null|        null|         null|  

In [5]:
dfapark.printSchema()

root
 |-- cod_ean: string (nullable = true)
 |-- cod_pessoa: long (nullable = true)
 |-- cod_transacao: long (nullable = true)
 |-- data: date (nullable = true)
 |-- vlr: long (nullable = true)
 |-- qtd: long (nullable = true)
 |-- nome: string (nullable = true)
 |-- sms: boolean (nullable = true)
 |-- email: boolean (nullable = true)
 |-- classe: string (nullable = true)
 |-- sexo: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- des_produto: string (nullable = true)
 |-- des_familia: string (nullable = true)
 |-- des_secao: string (nullable = true)
 |-- des_categoria: string (nullable = true)
 |-- des_sub_categoria: string (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [6]:
dfapark = dfapark.withColumn("data",dfapark.data.cast("date"))
dfapark.show()

+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+
|      cod_ean|cod_pessoa|cod_transacao|      data|vlr|qtd|      nome| sms|email|classe|sexo|          cidade|         des_produto|des_familia|   des_secao|des_categoria|des_sub_categoria|__index_level_0__|
+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+
|7896901200013|      4644|      8064284|2020-10-26| 67|  3|NOME_a4644|true|false|  Ouro|   F|       São Paulo|                null|       null|        null|         null|             null|                0|
|7897001010014|      3578|      8067745|2020-10-26| 54|  7|NOME_o3578|true|false|  Ouro|   M|  Belo Horizonte|                null|       null|        null|         null|  

In [7]:
dfapark = dfapark.withColumn("ano",f.year('data'))
dfapark = dfapark.withColumn("mes",f.month('data'))
dfapark = dfapark.withColumn("dia",f.dayofmonth('data'))
dfapark.show()

+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+----+---+---+
|      cod_ean|cod_pessoa|cod_transacao|      data|vlr|qtd|      nome| sms|email|classe|sexo|          cidade|         des_produto|des_familia|   des_secao|des_categoria|des_sub_categoria|__index_level_0__| ano|mes|dia|
+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+----+---+---+
|7896901200013|      4644|      8064284|2020-10-26| 67|  3|NOME_a4644|true|false|  Ouro|   F|       São Paulo|                null|       null|        null|         null|             null|                0|2020| 10| 26|
|7897001010014|      3578|      8067745|2020-10-26| 54|  7|NOME_o3578|true|false|  Ouro|   M|  Belo Horizonte|          

In [9]:
f.when(dfapark.sms.isNull(),False).otherwise(dfapark.sms)


Column<'CASE WHEN (sms IS NULL) THEN false ELSE sms END'>

In [10]:
dfapark = dfapark.withColumn("total",f.lit(dfapark.vlr*dfapark.qtd))

In [11]:
dfapark.show()

+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+----+---+---+-----+
|      cod_ean|cod_pessoa|cod_transacao|      data|vlr|qtd|      nome| sms|email|classe|sexo|          cidade|         des_produto|des_familia|   des_secao|des_categoria|des_sub_categoria|__index_level_0__| ano|mes|dia|total|
+-------------+----------+-------------+----------+---+---+----------+----+-----+------+----+----------------+--------------------+-----------+------------+-------------+-----------------+-----------------+----+---+---+-----+
|7896901200013|      4644|      8064284|2020-10-26| 67|  3|NOME_a4644|true|false|  Ouro|   F|       São Paulo|                null|       null|        null|         null|             null|                0|2020| 10| 26|  201|
|7897001010014|      3578|      8067745|2020-10-26| 54|  7|NOME_o3578|true|false|  Ouro|   M|  B

In [12]:
dfapark.write.partitionBy('ano','mes','dia').parquet('/content/vendas_tratadas')

In [13]:
dfapark.registerTempTable('vendas_table')



In [14]:
dfaparkclon = dfapark 
produto = dfaparkclon[['des_produto','des_familia', 'des_secao', 'des_categoria','des_sub_categoria']]
produto=produto.dropna (how='all')
produto = produto.distinct()
produto.show()

+--------------------+-----------+-----------+-------------------+-----------------+
|         des_produto|des_familia|  des_secao|      des_categoria|des_sub_categoria|
+--------------------+-----------+-----------+-------------------+-----------------+
|          CENOURA KG| PERECIVEIS|      F L V|TUBERCULOS E RAIZES|                 |
|COXA SOBRECOXA FR...| PERECIVEIS|    ACOUGUE|               AVES|                 |
| COSTELINHA SUINA KG| PERECIVEIS|    ACOUGUE|        CARNE SUINA|                 |
| TOMATE PIZZADORO KG| PERECIVEIS|      F L V|            LEGUMES|                 |
|   ALHO GRANEL T6 KG| PERECIVEIS|      F L V|               ALHO|                 |
|LEITE UHT CATIVA ...|  MERCEARIA|  ALTO GIRO|   LEITE LONGA VIDA|                 |
|COXAO MOLE BOVINO...| PERECIVEIS|    ACOUGUE|       CARNE BOVINA|                 |
|LING TOSC PERDIGA...| PERECIVEIS|FRIOS GERAL|    FRIOS EMBUTIDOS|                 |
|CERVEJA SKOL 350 ...|  MERCEARIA|    BEBIDAS|           CERVEJAS

In [17]:
sparks = SparkSession.builder.getOrCreate()
subsdfspk = spark.sql('select distinct cidade from vendas_table')
subsdfspk.write.mode("overwrite").format("parquet").save("/content/cidade")
subsdfspk.show()

+----------------+
|          cidade|
+----------------+
|          Santos|
|        Curitiba|
|            null|
|        Campinas|
|  Belo Horizonte|
|        Salvador|
|     Campo Largo|
|       São Paulo|
|          Recife|
|          Manaus|
|  Rio de Janeiro|
|    Campo Alegre|
|São Bento do Sul|
|    Porto Alegre|
+----------------+



In [18]:
produto = produto.select(
    trim(produto.des_produto).alias("des_produto"),
    trim(produto.des_familia).alias("des_familia"),
    trim(produto.des_secao).alias("des_secao"),
    trim(produto.des_categoria).alias("des_categoria"),
    trim(produto.des_sub_categoria).alias("des_sub_categoria"))
produto.show()

+--------------------+-----------+-----------+-------------------+-----------------+
|         des_produto|des_familia|  des_secao|      des_categoria|des_sub_categoria|
+--------------------+-----------+-----------+-------------------+-----------------+
|          CENOURA KG| PERECIVEIS|      F L V|TUBERCULOS E RAIZES|                 |
|COXA SOBRECOXA FR...| PERECIVEIS|    ACOUGUE|               AVES|                 |
| COSTELINHA SUINA KG| PERECIVEIS|    ACOUGUE|        CARNE SUINA|                 |
| TOMATE PIZZADORO KG| PERECIVEIS|      F L V|            LEGUMES|                 |
|   ALHO GRANEL T6 KG| PERECIVEIS|      F L V|               ALHO|                 |
|LEITE UHT CATIVA ...|  MERCEARIA|  ALTO GIRO|   LEITE LONGA VIDA|                 |
|COXAO MOLE BOVINO...| PERECIVEIS|    ACOUGUE|       CARNE BOVINA|                 |
|LING TOSC PERDIGA...| PERECIVEIS|FRIOS GERAL|    FRIOS EMBUTIDOS|                 |
|CERVEJA SKOL 350 ...|  MERCEARIA|    BEBIDAS|           CERVEJAS

In [19]:
produto.write.format("parquet").mode("overwrite").save("/content/produto")

In [20]:
venda_mulheres=dfapark.where(dfapark.sexo=='F')
venda_mulheres.write.format("json").mode("overwrite").save("/content/venda_mulheres.json")

In [21]:
venda_unicas = dfapark.filter("qtd == 1 AND vlr<10")
venda_unicas.write.format('csv').save('venda_unica')