In [5]:
# Install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 46 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 46.6 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=9221dd9e43f9d694041a58d53ad8cf01d13671ff0281e86ecf6d28c9a88bc486
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [85]:
import os
from datetime import datetime
from functools import reduce

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
import pyspark


# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Check Spark Session Information
spark

In [7]:
# id, nome, sexo, estado, cidade
clientes = [
    (1, "Joao", "M", "MG", "Betim"),
    (2, "Luiza", "F", "SP", "São Caetano"),
    (3, "Gabriela", "F", "MG", "Belo Horizonte"),
    (4, "Rafael", "M", "RJ", "Niteroi"),
    (5, "Jean", "M", "RJ", "Cabo Frio"),
    (6, "Daniela", "F", "PR", "Curitiba"),
]

# id_cliente: (id_conta, dt_criacao)
contas = {
    2: (191821, 1608987600),
    1: (191822, 1608552000),
    3: (191823, 1609761600),
    6: (191824, 1609769460),
    5: (191825, 1610449920),
    4: (191826, 1610712000)
}

# conta_id, fl_cartao_credito, fl_investidor, fl_credito
produtos = [
    { "conta_id": 191821, "fl_cartao_credito": 1, "fl_investidor": 0, "fl_credito": 1},
    { "conta_id": 191822, "fl_cartao_credito": 1, "fl_investidor": 1, "fl_credito": 0},
    { "conta_id": 191823, "fl_cartao_credito": 0, "fl_investidor": 0, "fl_credito": 1},
    { "conta_id": 191824, "fl_cartao_credito": 0, "fl_investidor": 1, "fl_credito": 0},
    { "conta_id": 191825, "fl_cartao_credito": 1, "fl_investidor": 1, "fl_credito": 1},
    { "conta_id": 191826, "fl_cartao_credito": 1, "fl_investidor": 1, "fl_credito": 0}
]

# conta_id, valor, dt_transacao
transacoes = [
    {"conta_id": 191821, "valor": 10.0, "dt_transacao": "2021-01-03 16:32:01"},
    {"conta_id": 191821, "valor": 100.0, "dt_transacao": "2021-01-10 17:00:00"},
    {"conta_id": 191821, "valor": 300.0, "dt_transacao": "2021-03-03 09:11:12"},
    {"conta_id": 191821, "valor": 132.0, "dt_transacao": "2021-03-03 10:15:10"},
    {"conta_id": 191822, "valor": 5.0, "dt_transacao": "2021-01-03 23:12:11"},
    {"conta_id": 191822, "valor": 2.0, "dt_transacao": "2021-01-03 11:18:19"},
    {"conta_id": 191823, "valor": 49.99, "dt_transacao": "2021-01-03 15:18:19"},
    {"conta_id": 191823, "valor": 40.0, "dt_transacao": "2021-01-03 18:19:21"},
    {"conta_id": 191823, "valor": 9.30, "dt_transacao": "2021-01-03 22:11:10"},
    {"conta_id": 191826, "valor": 1000.0, "dt_transacao": "2021-01-16 08:12:11"},
    {"conta_id": 191825, "valor": 23.43, "dt_transacao": "2021-01-03 09:11:56"},
    {"conta_id": 191825, "valor": 10.09, "dt_transacao": "2021-01-03 18:19:18"}
]

# Dataframes
df_clientes = spark.createDataFrame(clientes, schema=['id', 'nome', 'sexo', 'estado', 'cidade'])
df_contas = spark.createDataFrame([Row(k, *v) for k, v in contas.items()], schema=['cliente_id', 'conta_id', 'dt_criacao']).withColumn('dt_criacao', col('dt_criacao').cast('timestamp'))
df_produtos = spark.createDataFrame(produtos)
df_transacoes = spark.createDataFrame(transacoes).withColumn('dt_transacao', col('dt_transacao').cast('timestamp'))


# SQL
df_clientes.createOrReplaceTempView('CLIENTES')
df_contas.createOrReplaceTempView('CONTAS')
df_produtos.createOrReplaceTempView('PRODUTOS')
df_transacoes.createOrReplaceTempView('TRANSACOES')

# Pandas
pd_clientes = df_clientes.toPandas()
pd_contas = df_contas.toPandas()
pd_produtos = df_produtos.toPandas()
pd_transacoes = df_transacoes.toPandas()

# Python

**1.** Implemente uma função que retorne a soma do valor transacionado.

> ```python
> # conta_id, valor, dt_transacao
> transacoes = [
>     {"conta_id": 191821, "valor": 10.0, "dt_transacao": "2021-01-03 16:32:01"},
>     {"conta_id": 191821, "valor": 100.0, "dt_transacao": "2021-01-10 17:00:00"},
>     {"conta_id": 191821, "valor": 300.0, "dt_transacao": "2021-03-03 09:11:12"},
>     {"conta_id": 191821, "valor": 132.0, "dt_transacao": "2021-03-03 10:15:10"},
>     {"conta_id": 191822, "valor": 5.0, "dt_transacao": "2021-01-03 23:12:11"},
>     {"conta_id": 191822, "valor": 2.0, "dt_transacao": "2021-01-03 11:18:19"},
>     {"conta_id": 191823, "valor": 49.99, "dt_transacao": "2021-01-03 15:18:19"},
>     {"conta_id": 191823, "valor": 40.0, "dt_transacao": "2021-01-03 18:19:21"},
>     {"conta_id": 191823, "valor": 9.30, "dt_transacao": "2021-01-03 22:11:10"},
>     {"conta_id": 191826, "valor": 1000.0, "dt_transacao": "2021-01-16 08:12:11"},
>     {"conta_id": 191825, "valor": 23.43, "dt_transacao": "2021-01-03 09:11:56"},
>     {"conta_id": 191825, "valor": 10.09, "dt_transacao": "2021-01-03 18:19:18"}
> ]
> ```

In [8]:
def sum_vl_transacoes(transacoes):
  # TODO: Implemente sua solucao aqui

  res = 0
  for i in transacoes:
    res += i["valor"]

  return res

sum_vl_transacoes(transacoes)

1681.81

# SQL

> CLIENTES
> 
> 
> |    |   id | nome     | sexo   | estado   | cidade         |
> |---:|-----:|:---------|:-------|:---------|:---------------|
> |  0 |    1 | Joao     | M      | MG       | Betim          |
> |  1 |    2 | Luiza    | F      | SP       | São Caetano    |
> |  2 |    3 | Gabriela | F      | MG       | Belo Horizonte |
> |  3 |    4 | Rafael   | M      | RJ       | Niteroi        |
> |  4 |    5 | Jean     | M      | RJ       | Cabo Frio      |
> |  5 |    6 | Daniela  | F      | PR       | Curitiba       |
> 
> CONTAS
> 
> |    |   cliente_id |   conta_id | dt_criacao          |
> |---:|-------------:|-----------:|:--------------------|
> |  0 |            2 |     191821 | 2020-12-26 13:00:00 |
> |  1 |            1 |     191822 | 2020-12-21 12:00:00 |
> |  2 |            3 |     191823 | 2021-01-04 12:00:00 |
> |  3 |            6 |     191824 | 2021-01-04 14:11:00 |
> |  4 |            5 |     191825 | 2021-01-12 11:12:00 |
> |  5 |            4 |     191826 | 2021-01-15 12:00:00 |
> 
> PRODUTOS
> 
> |    |   conta_id |   fl_cartao_credito |   fl_credito |   fl_investidor |
> |---:|-----------:|--------------------:|-------------:|----------------:|
> |  0 |     191821 |                   1 |            1 |               0 |
> |  1 |     191822 |                   1 |            0 |               1 |
> |  2 |     191823 |                   0 |            1 |               0 |
> |  3 |     191824 |                   0 |            0 |               1 |
> |  4 |     191825 |                   1 |            1 |               1 |
> |  5 |     191826 |                   1 |            0 |               1 |
> 
> 
> TRANSACOES
> 
> |    |   conta_id | dt_transacao        |   valor |
> |---:|-----------:|:--------------------|--------:|
> |  0 |     191821 | 2021-01-03 16:32:01 |   10    |
> |  1 |     191821 | 2021-01-10 17:00:00 |  100    |
> |  2 |     191821 | 2021-03-03 09:11:12 |  300    |
> |  3 |     191821 | 2021-03-03 10:15:10 |  132    |
> |  4 |     191822 | 2021-01-03 23:12:11 |    5    |
> |  5 |     191822 | 2021-01-03 11:18:19 |    2    |
> |  6 |     191823 | 2021-01-03 15:18:19 |   49.99 |
> |  7 |     191823 | 2021-01-03 18:19:21 |   40    |
> |  8 |     191823 | 2021-01-03 22:11:10 |    9.3  |
> |  9 |     191826 | 2021-01-16 08:12:11 | 1000    |
> | 10 |     191825 | 2021-01-03 09:11:56 |   23.43 |
> | 11 |     191825 | 2021-01-03 18:19:18 |   10.09 |



### Resposta

**1.** Retorne o nome do cliente e a data de criação da conta.

In [13]:
# TODO: Implemente sua solucao aqui
spark.sql('''
    SELECT DISTINCT nome, dt_criacao
    FROM CLIENTES
    LEFT JOIN CONTAS ON CLIENTES.id = CONTAS.cliente_id
    '''
).toPandas()

Unnamed: 0,nome,dt_criacao
0,Joao,2020-12-21 12:00:00
1,Jean,2021-01-12 11:12:00
2,Gabriela,2021-01-04 12:00:00
3,Rafael,2021-01-15 12:00:00
4,Daniela,2021-01-04 14:11:00
5,Luiza,2020-12-26 13:00:00


**2.** Retorne o nome dos clientes que possuem pelo menos 2 transações em janeiro de 2021

In [43]:
# TODO: Implemente sua solucao aqui
spark.sql('''
    SELECT nome, COUNT(dt_transacao)
    FROM CONTAS
    LEFT JOIN CLIENTES ON CLIENTES.id = CONTAS.cliente_id
    LEFT JOIN TRANSACOES ON CONTAS.conta_id = TRANSACOES.conta_id
    GROUP BY nome HAVING COUNT(dt_transacao) >= 2
    '''
).toPandas()

Unnamed: 0,nome,count(dt_transacao)
0,Gabriela,3
1,Joao,2
2,Luiza,4
3,Jean,2


**3.** Retorne a contagem distinta dos clientes ativos (pelo menos uma transação) em janeiro de 2021

In [67]:
# TODO: Implemente sua solucao aqui
spark.sql('''
    SELECT DISTINCT nome
    FROM CONTAS
    LEFT JOIN CLIENTES ON CLIENTES.id = CONTAS.cliente_id
    LEFT JOIN TRANSACOES ON CONTAS.conta_id = TRANSACOES.conta_id
    WHERE dt_transacao IS NOT NULL 
    AND dt_transacao <= "2021-01-31" AND dt_transacao >= "2021-01-01"
    '''
).toPandas()

Unnamed: 0,nome
0,Gabriela
1,Joao
2,Luiza
3,Jean
4,Rafael


**4.** Calcular o valor médio de transação por cliente ativo em janeiro de 2021

In [69]:
# TODO: Implemente sua solucao aqui
spark.sql('''
    SELECT nome, AVG(valor)
    FROM CONTAS
    LEFT JOIN CLIENTES ON CLIENTES.id = CONTAS.cliente_id
    LEFT JOIN TRANSACOES ON CONTAS.conta_id = TRANSACOES.conta_id
    WHERE dt_transacao IS NOT NULL 
    AND dt_transacao <= "2021-01-31" AND dt_transacao >= "2021-01-01"
    GROUP BY nome
    '''
).toPandas()

Unnamed: 0,nome,avg(valor)
0,Gabriela,33.096667
1,Joao,3.5
2,Luiza,55.0
3,Jean,16.76
4,Rafael,1000.0


# Pandas

```python
pd_clientes
pd_contas
pd_produtos
pd_transacoes
```

**1.** Retornar o nome dos clientes que possuem todos os produtos

In [71]:
# TODO: Implemente sua solucao aqui
tmp = pd_clientes.set_index("id").join(pd_contas.set_index("cliente_id"))
tmp.reset_index(inplace = True)
tmp = tmp.set_index("conta_id").join(pd_produtos.set_index("conta_id"))
tmp = tmp[(tmp["fl_cartao_credito"] == 1) & (tmp["fl_credito"] == 1) & (tmp["fl_investidor"] == 1)]
tmp["nome"]

conta_id
191825    Jean
Name: nome, dtype: object

**2.** Retorne a soma do valor transacionado por Estado



In [77]:
# TODO: Implemente sua solucao aqui
tmp = pd_clientes.set_index("id").join(pd_contas.set_index("cliente_id"))
tmp.reset_index(inplace = True)
tmp = tmp.set_index("conta_id").join(pd_transacoes.set_index("conta_id"))
tmp.reset_index(inplace = True)
tmp.groupby("estado").sum()["valor"]

estado
MG     106.29
PR       0.00
RJ    1033.52
SP     542.00
Name: valor, dtype: float64

# Spark

```python
df_clientes
df_contas
df_produtos
df_transacoes
```

**1.** Retorne o nome do cliente e a data de criação da conta.

> **Dica:** Caso necessário utilize `pyspark.sql.functions.join()`.

In [92]:
# TODO: Implemente sua solucao aqui
df_contas.join(df_clientes, df_contas.cliente_id == df_clientes.id, "inner").select("nome", "dt_criacao").show()

+--------+-------------------+
|    nome|         dt_criacao|
+--------+-------------------+
|    Joao|2020-12-21 12:00:00|
|   Luiza|2020-12-26 13:00:00|
|Gabriela|2021-01-04 12:00:00|
|  Rafael|2021-01-15 12:00:00|
|    Jean|2021-01-12 11:12:00|
| Daniela|2021-01-04 14:11:00|
+--------+-------------------+



**2.** Retornar o nome do cliente que mais gastou no mês de janeiro

In [148]:
# TODO: Implemente sua solucao aqui
from pyspark.sql.functions import desc
df = df_contas.join(df_clientes, df_contas.cliente_id == df_clientes.id, "inner").join(df_transacoes, "conta_id")
df.where(df.dt_transacao <= "2021-01-31").where(df.dt_transacao >= "2021-01-01").groupby("nome").sum("valor").sort(desc("sum(valor)")).show(1)

+------+----------+
|  nome|sum(valor)|
+------+----------+
|Rafael|    1000.0|
+------+----------+
only showing top 1 row

