# Setup Geral

Se estiver executando este exercício no Google Colab, execute as próximas duas células. 

Caso esteja executando localmente, não é necessário executar mas certifique-se de que o **pyspark** está instalado e configurado em sua máquina.

In [64]:
%%bash

# Instal Java
apt-get update && apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Install PySpark
pip install -q pyspark

Get:1 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:4 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:5 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:6 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:7 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:8 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:9 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:10 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:12 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic/main Sources [2,216 kB]
Hit:13 https://developer.download.nvidia.com/compute/

In [65]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

# Teste

O teste a ser realizado é composto de 3 partes:

- um exercício de programação em Python
- alguns exercícios de programação em SQL
- alguns exercícios de programação com PySpark

Você pode escolher qual das partes do exercício vai fazer primeiro. Todo o exercício deve ser completado no período de 90 minutos.

# Python

In [67]:
# SETUP
nomes_alunos = [
    ('Maria', 1),
    ('João', 2),
    ('Pedro', 3),
    ('Gabriella', 4),
    ('Giovana', 5),
    ('Arthur', 6)
]

notas_alunos = {
    1: 9.5,
    2: 5.1,
    3: 8.7,
    4: 7.1,
    5: 4.8,
    6: 6.3
}

Implemente uma função que recebe uma lista de nomes de alunos, um dicionário de notas dos mesmo, sendo que essas estruturas se relacionam por um ID.

A função deve retornar em ordem alfabética, o nome dos alunos que obtiveram notas maior ou igual de uma nota de corte informada.

In [68]:
import pandas as pd


def filtra_alunos_acima_corte(alunos, notas, nota_corte):
    # Desenvolva aqui

    tabelaAlunos = construindo_df_alunos(alunos)
    tabelaNotas = construindo_df_notas(notas)

    tabelaNotasAlunos = pd.concat([tabelaAlunos, tabelaNotas], axis=1, join="inner")
    tabelaNotasAlunos = tabelaNotasAlunos.query("Notas >= @nota_corte")

    listaNomes = tabelaNotasAlunos["Nomes"].tolist()
    listaNomes = sorted(listaNomes)

    return listaNomes


def construindo_df_alunos(nomes_alunos):
  dfAlunos = pd.DataFrame(nomes_alunos, columns =["Nomes", "ID"])
  dfAlunos = dfAlunos.set_index("ID")

  return dfAlunos

def construindo_df_notas(notas_alunos):
  dfNotas = pd.DataFrame.from_dict(notas_alunos, orient="index", columns=["Notas"])
  
  return dfNotas


    
filtra_alunos_acima_corte(nomes_alunos, notas_alunos, 6)

['Arthur', 'Gabriella', 'Maria', 'Pedro']

# SQL

**Setup**

Neste momento você deverá ler os arquivos .csv em anexo para solucionar os problemas abaixo

In [88]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AtividadeSQL").getOrCreate()

In [76]:
from google.colab import drive 
drive.mount('/content/gdrive')



Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [130]:
from pyspark.sql.types import IntegerType,BooleanType,DateType


def cria_tabela(path, nome_tabela):
    # Desenvolva aqui
    df = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/gdrive/MyDrive/ATM/" + path)

    return df

def tratamentoUsuarios(usuariosRaw):
  usuariosWithoutNulls = usuariosRaw.na.drop("all")
  usuariosTreat = usuariosWithoutNulls.withColumn("cod_usuario",usuariosWithoutNulls.cod_usuario.cast(IntegerType()))

  return usuariosTreat
  

usuariosRaw = cria_tabela("usuarios.csv", "usuarios")
produtos = cria_tabela("produtos.csv", "produtos")
vendas = cria_tabela("vendas.csv", "vendas")


usuarios = tratamentoUsuarios(usuariosRaw)

**Função para execução de queries**

In [132]:
usuarios.createOrReplaceTempView("usuarios")

query = ("""
    SELECT *
    FROM usuarios
""")

newdf = spark.sql(query)
newdf.show()
newdf.printSchema()

+-----------+-------------------+------------------+--------------------+-------------------+
|cod_usuario|      data_cadastro|      faixa_etaria|              cidade|             estado|
+-----------+-------------------+------------------+--------------------+-------------------+
|         69|2020-07-24 00:00:00|Entre 16 a 21 anos|           São Romão|       Minas Gerais|
|        176|2021-03-19 00:00:00|Entre 28 a 36 anos|            Guanambi|              Bahia|
|        185|2021-03-11 00:00:00|Entre 16 a 21 anos|             Martins|Rio Grande do Norte|
|        191|2021-01-16 00:00:00|Entre 37 a 49 anos|               Icatu|           Maranhão|
|        464|2020-02-19 00:00:00|Entre 22 a 27 anos|            Paulista|            Paraíba|
|        479|2020-02-09 00:00:00|Entre 50 a 61 anos|          Ubaporanga|       Minas Gerais|
|        796|2020-12-19 00:00:00|Entre 62 a 70 anos| Santa Rita de Minas|       Minas Gerais|
|       1691|2021-07-10 00:00:00|Entre 10 a 15 anos|        

In [119]:
vendas.createOrReplaceTempView("vendas")

query = ("""
    SELECT *
    FROM vendas
""")

newdf = spark.sql(query)
newdf.show()
newdf.printSchema()

+-----------+-----------+-------------------+----------+--------+
|cod_usuario|cod_produto|        data_compra|quantidade|   valor|
+-----------+-----------+-------------------+----------+--------+
|        927|         10|2020-10-19 00:00:00|         4|  2799.6|
|       1544|         10|2021-03-28 00:00:00|         2|  1399.8|
|       2833|          2|2020-09-13 00:00:00|        20| 26915.6|
|       2253|         14|2021-06-21 00:00:00|         1|  138.99|
|       1724|         10|2021-07-10 00:00:00|         6|  4199.4|
|       1534|          5|2019-10-17 00:00:00|        19|  8303.0|
|       1977|         14|2020-05-08 00:00:00|         2|  277.98|
|       1916|          3|2020-12-01 00:00:00|        14| 28693.0|
|       2446|          1|2021-06-02 00:00:00|        17| 78045.3|
|       2041|          4|2021-07-25 00:00:00|        19|  1119.1|
|       1731|         11|2021-05-27 00:00:00|         5|   249.5|
|       3076|         12|2021-04-29 00:00:00|        11|   852.5|
|       15

1) Qual foi a quantidade de vendas nos estados de Minas Gerais e São Paulo para cada ano e mês?

In [232]:
vendas.createOrReplaceTempView("vendas")
usuarios.createOrReplaceTempView("usuarios")

query = ("""
  
  SELECT COUNT (data_compra), YEAR(data_compra), MONTH(data_compra)
  FROM (
    SELECT usuarios.cod_usuario, usuarios.estado, vendas.data_compra
    FROM usuarios
    INNER JOIN vendas 
    ON usuarios.cod_usuario = vendas.cod_usuario
    WHERE estado = 'Minas Gerais' or estado = 'São Paulo'
)
 GROUP BY YEAR(data_compra), MONTH(data_compra)
 ORDER BY YEAR(data_compra), MONTH(data_compra)

""")


newdf = spark.sql(query)
newdf.show()



+------------------+-----------------+------------------+
|count(data_compra)|year(data_compra)|month(data_compra)|
+------------------+-----------------+------------------+
|                 1|             2018|                 6|
|                 1|             2018|                 7|
|                 1|             2018|                 8|
|                 2|             2018|                10|
|                 1|             2018|                11|
|                 2|             2018|                12|
|                 2|             2019|                 1|
|                 1|             2019|                 2|
|                 2|             2019|                 4|
|                 1|             2019|                 5|
|                 2|             2019|                 6|
|                 2|             2019|                 7|
|                 2|             2019|                 8|
|                 2|             2019|                 9|
|             

2) Quais são os usuários por Estado que mais compraram em todo o período analisado e qual foi o número de compras realizadas, a quantidade total de itens comprados e valor total pago por usuário?

In [305]:
vendas.createOrReplaceTempView("vendas")
usuarios.createOrReplaceTempView("usuarios")

tempView1 = ("""
  CREATE OR REPLACE TEMP VIEW view1
  AS
  SELECT usuarios.cod_usuario, usuarios.estado, vendas.data_compra, vendas.valor
    FROM usuarios
    INNER JOIN vendas 
    ON usuarios.cod_usuario = vendas.cod_usuario;
""")


tempView2 = ("""
  CREATE OR REPLACE TEMP VIEW view2
  AS
  SELECT SUM(valor) total_vendido, cod_usuario, estado
  FROM view1
  GROUP BY estado, cod_usuario
""")


query = ("""
  SELECT MAX (total_vendido) maior_venda, estado
  FROM view2
  GROUP BY estado
  ORDER BY maior_venda DESC
""")


spark.sql(tempView1)
spark.sql(tempView2)
newdf = spark.sql(query)
newdf.show()

+------------------+-------------------+
|       maior_venda|             estado|
+------------------+-------------------+
|         221747.56|Rio Grande do Norte|
|         187513.66|     Santa Catarina|
|184101.88999999998|           Maranhão|
|          165197.3|        Mato Grosso|
|         152271.96| Mato Grosso do Sul|
|         142389.09|              Goiás|
|         139997.47|              Piauí|
|         137228.01|             Paraná|
|          135556.7|         Pernambuco|
|          134413.0|          São Paulo|
|          132507.8|       Minas Gerais|
|          126282.4|              Bahia|
|         121494.67|              Ceará|
|          97028.45|          Tocantins|
|          92027.37|  Rio Grande do Sul|
| 73556.09999999999|            Paraíba|
|          51359.34|              Amapá|
+------------------+-------------------+



3) Quais são os usuários que não fizeram nenhuma compra?

In [190]:
vendas.createOrReplaceTempView("vendas")
usuarios.createOrReplaceTempView("usuarios")

query = ("""
    SELECT usuarios.cod_usuario
    FROM usuarios
    LEFT JOIN vendas 
    ON usuarios.cod_usuario = vendas.cod_usuario
    WHERE vendas.cod_usuario IS NULL
""")


newdf = spark.sql(query)
newdf.show()

+-----------+
|cod_usuario|
+-----------+
|       5098|
|       5229|
|       5482|
|       5504|
|       5614|
|       5923|
|       6985|
|       7901|
|       9980|
|      10080|
|      10152|
|      10170|
|      10929|
|      11002|
|      11546|
|      12298|
|      12354|
|      13117|
|      13476|
|      14486|
+-----------+
only showing top 20 rows

3342


4) Qual é o ticket médio (média de valor gasto) e o número total de usuários que fizeram pelo menos uma compra por faixa etária?

In [282]:
vendas.createOrReplaceTempView("vendas")
usuarios.createOrReplaceTempView("usuarios")

tempView1 = ("""
  CREATE OR REPLACE TEMP VIEW view1
  AS
  SELECT COUNT(cod_usuario) compradores_unicos, faixa_etaria, SUM (valor) valor_gasto
    FROM(   
      SELECT usuarios.cod_usuario, faixa_etaria, valor
      FROM usuarios
      LEFT JOIN vendas 
      ON usuarios.cod_usuario = vendas.cod_usuario
      WHERE vendas.cod_usuario IS NOT NULL)
      GROUP BY faixa_etaria
""")

query = ("""
    SELECT faixa_etaria, valor_gasto, compradores_unicos,  valor_gasto/compradores_unicos AS ticket_medio
    FROM view1
""")


spark.sql(tempView1)
newdf = spark.sql(query)
newdf.show()


+------------------+------------------+------------------+------------------+
|      faixa_etaria|       valor_gasto|compradores_unicos|      ticket_medio|
+------------------+------------------+------------------+------------------+
|Entre 22 a 27 anos| 780993.4700000001|                64|12203.022968750001|
|Entre 10 a 15 anos|         382275.39|                43|  8890.12534883721|
|Entre 16 a 21 anos| 636297.3500000001|                50|12725.947000000002|
|Entre 50 a 61 anos| 867958.0599999999|                80|       10849.47575|
|Entre 37 a 49 anos| 702542.0099999998|                42| 16727.19071428571|
|Entre 62 a 70 anos|         390492.43|                39|10012.626410256411|
|Entre 28 a 36 anos|461050.07999999984|                56|  8233.03714285714|
|   Mais de 70 anos|327790.00999999995|                29|11303.103793103446|
+------------------+------------------+------------------+------------------+



In [None]:
spark.stop()

# PySpark

**setup**:

In [25]:
from google.colab import drive 
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [23]:
# Setup Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AtividadePySpark").getOrCreate()

In [26]:
from pyspark.sql.types import IntegerType, BooleanType, DateType


def cria_tabela(path, nome_tabela):
    # Desenvolva aqui
    df = spark.read.option("header", "true").option("inferSchema", "true").csv("/content/gdrive/MyDrive/ATM/" + path)

    return df

def tratamentoUsuarios(usuariosRaw):
  usuariosWithoutNulls = usuariosRaw.na.drop("all")
  usuariosTreat = usuariosWithoutNulls.withColumn("cod_usuario",usuariosWithoutNulls.cod_usuario.cast(IntegerType()))

  return usuariosTreat
  

usuariosRaw = cria_tabela("usuarios.csv", "usuarios")
produtos = cria_tabela("produtos.csv", "produtos")
vendas = cria_tabela("vendas.csv", "vendas")


usuarios = tratamentoUsuarios(usuariosRaw)

Responda às perguntas a seguir utilizando **Spark DATAFRAMES**.

1) Qual foi o total de compras realizadas, o total de itens comprados e a receita total obtida em todo o período analisado?

In [28]:

df = vendas.agg({"valor": "sum", "quantidade":"sum"})

df.withColumnRenamed("sum(valor)", "Receita Total").withColumnRenamed("sum(quantidade)", "Total de Produtos Vendidos").show()
rows = vendas.count()
print("Numero de vendas: ", rows)




+--------------------+--------------------------+
|       Receita Total|Total de Produtos Vendidos|
+--------------------+--------------------------+
|2.1584900165999958E8|                    209149|
+--------------------+--------------------------+

Numero de vendas:  20000


2) Quais são os 3 produtos mais comprados dos estados da região Sul e Sudeste, a quantidade de itens comprados, o valor total pago e a média de preço paga?

In [216]:
from pyspark.sql.functions import when, rank, col
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType

listaSudeste = ["São Paulo", "Minas Gerais", "Rio de Janeiro", "Espírito Santo"]
listaSul = ["Santa Catarina", "Rio Grande do Sul", "Paraná"]
regioesPermitidas = ["Sul", "Sudeste"]
columnasDrop = ("data_cadastro", "faixa_etaria", "cidade",  "estado", "data_compra")

def funcaoWindow(df, coluna):

  windowValor = Window.partitionBy(df["Regiao"]).orderBy(df[coluna].desc())
  df = df.select('*', rank().over(windowValor).alias('Ordem')).filter(col("Ordem") <= 3)

  dfMedia = df.withColumn(
      "Média_valor_vendido",
       df['Valor_total_pago'] / df['Quantidade_total_vendida'])
  
  dfCasted = dfMedia.withColumn("Média_valor_vendido",col("Média_valor_vendido").cast(FloatType())
    ).withColumn("Valor_total_pago", col("Valor_total_pago").cast(FloatType()))

  return dfCasted


newDf = usuarios.withColumn("Regiao",
                            when(usuarios['estado'].isin(listaSudeste), "Sudeste"
                            ).when(usuarios['estado'].isin(listaSul), "Sul"
                          ).otherwise("Outro"))

joinedDf = newDf.join(vendas, newDf.cod_usuario == vendas.cod_usuario, "inner")

filterDf = joinedDf.filter(joinedDf["Regiao"].isin(regioesPermitidas))

filterDf = filterDf.drop(*columnasDrop)

groupedDf = filterDf.groupBy("cod_produto", "Regiao").sum("valor", "quantidade"
  ).withColumnRenamed("sum(valor)", "Valor_total_pago"
  ).withColumnRenamed("sum(quantidade)", "Quantidade_total_vendida")

  
print("Ranking por Valor Recebido")
funcaoWindow(groupedDf, 'Valor_total_pago').show()

print("Ranking por Quantidade Vendida")
funcaoWindow(groupedDf, 'Quantidade_total_vendida').show()



Ranking por Valor Recebido
+-----------+-------+----------------+------------------------+-----+-------------------+
|cod_produto| Regiao|Valor_total_pago|Quantidade_total_vendida|Ordem|Média_valor_vendido|
+-----------+-------+----------------+------------------------+-----+-------------------+
|          1|Sudeste|        390226.5|                      85|    1|             4590.9|
|         21|Sudeste|        383942.4|                      64|    2|             5999.1|
|          6|Sudeste|         71992.0|                      80|    3|              899.9|
|         21|    Sul|        335949.6|                      56|    1|             5999.1|
|          1|    Sul|        224954.1|                      49|    2|             4590.9|
|          3|    Sul|         73782.0|                      36|    3|             2049.5|
+-----------+-------+----------------+------------------------+-----+-------------------+

Ranking por Quantidade Vendida
+-----------+-------+----------------+---

3) Para cada produto, quantos usuários fizeram pelo menos uma compra desse produto e qual é o valor mínimo e máximo pago por eles?

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

joinedDf = usuarios.join(vendas, usuarios.cod_usuario == vendas.cod_usuario, "inner").drop(vendas.cod_usuario).orderBy("cod_produto")

maxGroupedDf = joinedDf.groupBy("cod_produto").max("valor").orderBy("cod_produto")
minGroupedDf = joinedDf.groupBy("cod_produto").min("valor").orderBy("cod_produto")

window = Window.partitionBy(joinedDf["cod_produto"]).orderBy(joinedDf["cod_produto"].desc())

df = joinedDf.withColumn('Usuarios_distintos', F.approx_count_distinct("cod_usuario").over(window))

uniqueUser = df.groupBy("cod_produto").max("Usuarios_distintos").orderBy("cod_produto")

print("Maior valor pago em cada produto: ")
maxGroupedDf.show()

print("Menor valor pago em cada produto: ")
minGroupedDf.show()

print("Quantidade de usuarios que compraram cada produto: ")
uniqueUser.show()




4) Aplique um desconto de 10% em todas as vendas dos usuários que fizeram pelo menos 3 compras de produtos na mesma categoria, a partir da 4ª compra realizada. Exiba apenas os usuários que terão o desconto aplicado, mantendo todas as compras, o valor original e o valor com o desconto aplicado.

In [253]:

from pyspark.sql.functions import dense_rank


tmpDf = usuarios.join(vendas, usuarios.cod_usuario == vendas.cod_usuario, "inner").drop(vendas.cod_usuario)
joinedDf = tmpDf.join(produtos, produtos.cod_produto == tmpDf.cod_produto, "inner").drop(produtos.cod_produto)

window = Window.partitionBy(joinedDf["cod_usuario"], joinedDf["categoria_produto"]).orderBy(df["data_compra"].desc())

orderedDf = joinedDf.select("cod_usuario", "valor", "cod_produto","categoria_produto", dense_rank().over(window).alias('Ordem'))

respostaDf = orderedDf.withColumn("Valor_Desconto", 
                                  when(col("Ordem") > 3,
                                       col("valor") * 0.9
                                       ).otherwise(col("valor"))
                                  ).orderBy("cod_usuario", "categoria_produto").show()





+-----------+--------+-----------+--------------------+-----+--------------+
|cod_usuario|   valor|cod_produto|   categoria_produto|Ordem|Valor_Desconto|
+-----------+--------+-----------+--------------------+-----+--------------+
|         28|19507.35|          7|    Casa e bem-estar|    1|      19507.35|
|         28|  1550.0|         12|    Casa e bem-estar|    2|        1550.0|
|         28|  219.57|          8|    Eletrodomesticos|    1|        219.57|
|         28|  2195.7|          8|    Eletrodomesticos|    2|        2195.7|
|         28|  1818.6|         20|         Eletronicos|    1|        1818.6|
|         28|   149.7|         11|               Jogos|    1|         149.7|
|         28|  138.99|         14|       Moda e beleza|    1|        138.99|
|         28|   29.94|         15| Produtos de limpeza|    1|         29.94|
|         28|  3504.6|         17|          Tecnologia|    1|        3504.6|
|         69| 41993.7|         21|               Lazer|    1|       41993.7|

# FIM!