-sandbox
<img width = '500' src="https://databricks.com/wp-content/uploads/2021/06/db-pride-logo.svg" style="float: left: margin: 30px"/>        <img src="https://files.training.databricks.com/images/Apache-Spark-Logo_TM_200px.png" style="float: left: margin: 10px"/>                

<br>
> # Bronze para Silver

<br>
## 05 - Transações de estoque

- Ler a tabela Delta bronze de pedidos de estoque. 
- Desduplique e grave o DataFrame resultante na tabela Delta silver logo após executar algumas transformações de data e hora.

-sandbox
<h2 style="color:red">Informações:</h2> <p>


** - Objetivo:**
   - o que vamos desenvolver: tabela de declarações desduplicadas com colunas de data/hora fixas
   - por que é útil: resolver erros de duplicação selecionando o valor de saída desejado 
    - neste caso: entrada de dados mais antiga
- descrever como o Spark Windows funciona 
- como usar a função de classificação para desduplicação

## ![Spark Logo Tiny](https://files.training.databricks.com/images/105/logo_spark_tiny.png) Etapas a serem concluídas:

<br>
- Ler a tabela de Pedidos de Estoque de **`bronzeStockOrdersPath`** em um DataFrame.
- Mostrar registors duplicatos: 
  - alguns pedidos de estoque (**`transaction_id`**) foram duplicados 
    - para cada **`transaction_id`**, mantenha o registro mais antigo usando em **`ordertime`**
- Atribua o resultado a um novo nome de variável e faça as seguintes transformações:
  - Atualmente **`ordertime`** está no formato unixtime. Crie uma nova coluna chamada de **`order_timestamp`** a partir da coluna **`ordertime`** e converta para timestamp.
- Existem funções internas úteis no Spark. Aqui podemos usar alguns deles:
  - Crie uma nova coluna chamada **`year`** a partir da coluna **`order_timestamp`**, que mostra o ano.
  - Crie uma nova coluna chamada **`month`** a partir da coluna **`order_timestamp`**, que mostra o mês.
  - Crie uma nova coluna chamada **`day`** a partir da coluna **`order_timestamp`**, que mostra o dia.
  - Crie uma nova coluna chamada **`dow`** a partir da coluna **`order_timestamp`**, que mostra o dia da semana.
- Grave o DataFrame final em **`targetDirectory`** como uma tabela Delta usando o modo de substituição.


O esquema do DataFrame deve ser:

|name|type|
|---|---|
|clicked_items|ArrayType(ArrayType(StringType,true),true)|
|investor|LongType|
|ordertime|LongType|
|price|DoubleType|
|ticker|StringType|
|transaction_id|LongType|
|type|StringType|
|volume|LongType|
|order_timestamp|TimestampType|
|year|IntegerType|
|month|IntegerType|
|day|IntegerType|
|dow|IntegerType|

In [0]:
def func_balancasComerciais(spark, bronzeStockOrdersPath, targetDirectory):
  
  
  from pyspark.sql.window import Window
  from pyspark.sql.functions import rank, col, from_unixtime, year, month, dayofmonth, dayofweek

  
  # Ler a tabela Delta bronze de pedidos de estoque.
  stockOrders = (spark.read
                      .format("delta")
                      .load(bronzeStockOrdersPath))
  
  
  # a coluna "transaction_id" que está em "stockOrders" tem dados duplicados 
  # mantenha os dados mais antiga por "ordertime", atribua um novo nome de variável.
  # Crie uma nova coluna chamada "order_timestamp" a partir da coluna "ordertime" e converta em timestamp.
  # Extraia "year" a partir da coluna "order_timestamp", que mostra o ano.
  # Extraia "month" a partir da coluna "order_timestamp", que mostra o mês.
  # Extraia "day" a partir da coluna "order_timestamp", que mostra o dia.
  # Extraia "dow" a partir da coluna "order_timestamp", que mostra o dia da semana.
  result = (stockOrders.withColumn("rank", rank().over(Window.partitionBy("transaction_id").orderBy(col("ordertime"))))
                       .filter(col("rank") == 1)
                       .drop("rank")
                       .withColumn("order_timestamp", from_unixtime("ordertime").cast("timestamp"))
                       .withColumn("year", year(col("order_timestamp")))
                       .withColumn("month", month(col("order_timestamp")))
                       .withColumn("day", dayofmonth(col("order_timestamp")))
                       .withColumn("dow", dayofweek(col("order_timestamp"))))
  

  # Grave o DataFrame final em "targetDirectory" como uma tabela Delta usando o modo de substituição e a opção de substituição de esquema.
  result.write \
        .mode("overwrite") \
        .format("delta") \
        .option("overwriteSchema", "true") \
        .save(targetDirectory)   


  # Retorna o DataFrame
  return result

In [0]:
finalDF = func_balancasComerciais(spark, bronzeStockOrdersPath, targetDirectory)

In [0]:
display(finalDF)

clicked_items,investor,ordertime,price,ticker,transaction_id,type,volume,order_timestamp,year,month,day,dow
"List(List(RSG, 47), List(ADBE, 40), List(ALLE, 46))",608,1567331046,111.6824514218388,ADSK,581346334,BUY,6,2019-09-01T09:44:06.000+0000,2019,9,1,1
"List(List(PKG, 21))",409,1567331224,111.69668307782608,ADSK,581346335,SELL,22,2019-09-01T09:47:04.000+0000,2019,9,1,1
"List(List(URI, 17))",462,1567332454,111.76945039914756,ADSK,581346340,BUY,2,2019-09-01T10:07:34.000+0000,2019,9,1,1
"List(List(PBCT, 38), List(DVN, 48), List(PGR, 39))",994,1567335136,110.91184489949394,ADSK,581346350,SELL,8,2019-09-01T10:52:16.000+0000,2019,9,1,1
"List(List(CMG, 10), List(DRI, 8))",148,1567338719,111.9635185543684,ADSK,581346362,BUY,8,2019-09-01T11:51:59.000+0000,2019,9,1,1
"List(List(AET, 10))",1324,1567346970,111.41901572263458,ADSK,581346387,SELL,5,2019-09-01T14:09:30.000+0000,2019,9,1,1
"List(List(T, 10), List(EXC, 51), List(JNPR, 35))",1297,1567348386,110.79998968963808,ADSK,581346392,SELL,7,2019-09-01T14:33:06.000+0000,2019,9,1,1
"List(List(ESRX, 5))",982,1567353556,111.64089266775686,ADSK,581346406,SELL,19,2019-09-01T15:59:16.000+0000,2019,9,1,1
"List(List(UA, 24), List(NDAQ, 10), List(MAA, 47))",859,1567353752,110.98051551473723,ADSK,581346407,BUY,27,2019-09-01T16:02:32.000+0000,2019,9,1,1
"List(List(NEM, 49))",1059,1567414800,111.71,ADSK,581346417,BUY,25,2019-09-02T09:00:00.000+0000,2019,9,2,2


<h2><img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Validação</h2>

In [0]:
realityCheck(func_balancasComerciais, spark, bronzeStockOrdersPath, targetDirectory)

Points,Test,Result
1,Returns correct schema,
1,Returns DataFrame with correct number of rows,
1,Returns DataFrame with correct results,
1,Silver table in place,
1,Silver table has correct content,


## <img src="https://files.training.databricks.com/images/105/logo_spark_tiny.png"> Próximo <p>

> * [Balanças comerciais]($./06-Balancas comerciais)

-sandbox

<a href="http://www.apache.org/">Apache Software Foundation</a> <p>
<a href="https://databricks.com/privacy-policy">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use">Terms of Use</a>