### CARREGANDO OS DADOS NA CAMADA CURATED

Nessa etapa do processo iremos responder alguns questionamentos usando SPARK SQL, iremos criar novas tabelas com JOIN´s, agregações e agrupamentos para disponilizar na camada CURATED.

### Análise de dados

Com base na solução implantada responda aos seguintes questionamentos:

1.	Escreva uma query que retorna a quantidade de linhas na tabela Sales.SalesOrderDetail pelo campo SalesOrderID, desde que tenham pelo menos três linhas de detalhes.
2.	Escreva uma query que ligue as tabelas Sales.SalesOrderDetail, Sales.SpecialOfferProduct e Production.Product e retorne os 3 produtos (Name) mais vendidos (pela soma de OrderQty), agrupados pelo número de dias para manufatura (DaysToManufacture).
3.	Escreva uma query ligando as tabelas Person.Person, Sales.Customer e Sales.SalesOrderHeader de forma a obter uma lista de nomes de clientes e uma contagem de pedidos efetuados.
4.	Escreva uma query usando as tabelas Sales.SalesOrderHeader, Sales.SalesOrderDetail e Production.Product, de forma a obter a soma total de produtos (OrderQty) por ProductID e OrderDate.
5.	Escreva uma query mostrando os campos SalesOrderID, OrderDate e TotalDue da tabela Sales.SalesOrderHeader. Obtenha apenas as linhas onde a ordem tenha sido feita durante o mês de setembro/2011 e o total devido esteja acima de 1.000. Ordene pelo total devido decrescente.



In [1]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as Func 
from pyspark.sql.functions import *

In [2]:
# Iniciando as SparkSession
spark = SparkSession.builder.master("local[1]").appName('load_curated').getOrCreate()

In [3]:
# Carregando todos os arquivos parquet da camada REFINED
df_person = spark.read.load('gs://bike-factory-datalake/02.REFINED/person.parquet')
df_product = spark.read.load('gs://bike-factory-datalake/02.REFINED/product.parquet')
df_customer = spark.read.load('gs://bike-factory-datalake/02.REFINED/sales.customer.parquet')
df_order_detail = spark.read.load('gs://bike-factory-datalake/02.REFINED/sales.salesorderdetail.parquet')
df_order_header = spark.read.load('gs://bike-factory-datalake/02.REFINED/sales.salesorderheader.parquet')
df_special_offer = spark.read.load('gs://bike-factory-datalake/02.REFINED/sales.specialofferproduct.parquet')

                                                                                

In [6]:
# Criando as tabela para usar Spark SQL
df_person.write.saveAsTable("Person_Person")
df_product.write.saveAsTable("Production_Product")
df_customer.write.saveAsTable("Sales_Customer")
df_order_detail.write.saveAsTable("Sales_SalesOrderDetail")
df_order_header.write.saveAsTable("Sales_SalesOrderHeader")
df_special_offer.write.saveAsTable("Sales_SpecialOfferProduct")

                                                                                

1.	Escreva uma query que retorna a quantidade de linhas na tabela Sales.SalesOrderDetail pelo campo SalesOrderID, desde que tenham pelo menos três linhas de detalhes.

In [7]:
question_1 = spark.sql(
    """SELECT SalesOrderID, count(1) as Qtdy
        FROM Sales_SalesOrderDetail
        GROUP BY SalesOrderID having count(SalesOrderDetailID) >= 3 order by Qtdy asc""")

In [8]:
question_1.show(5)



+------------+----+
|SalesOrderID|Qtdy|
+------------+----+
|       55999|   3|
|       58888|   3|
|       56018|   3|
|       52617|   3|
|       56993|   3|
+------------+----+
only showing top 5 rows



                                                                                

In [9]:
"""Agora vamos gerar um arquivo novo no formato PARQUET para disponibilizar na nossa camada CURATED.
Esse arquivo pode ser carregado no BigQuery para geração de relatórios e insigths porém, 
o arquivo que já se encontra na camara REFINED já poderia ser carregado no BigQuery para rodar queries diretamente nele."""
question_1.write.mode("overwrite").save("gs://bike-factory-datalake/03.CURATED/question_1")

                                                                                

2.	Escreva uma query que ligue as tabelas Sales.SalesOrderDetail, Sales.SpecialOfferProduct e Production.Product e retorne os 3 produtos (Name) mais vendidos (pela soma de OrderQty), agrupados pelo número de dias para manufatura (DaysToManufacture).


In [10]:
question_2 = spark.sql("""
                       SELECT PP.Name, PP.DaysToManufacture, sum(SOD.orderqty) as SumOrderQty
                       FROM Production_Product as PP
                       INNER JOIN Sales_SpecialOfferProduct as SOP on SOP.ProductID = PP.ProductID
                       INNER JOIN Sales_SalesOrderDetail as SOD on SOD.ProductID = PP.ProductID
                       GROUP BY PP.Name, PP.DaysToManufacture
                       ORDER BY 3 desc""")

In [11]:
# Mostrando apenas o 3 produtos mais vendidos
question_2.show(3)

+--------------------+-----------------+-----------+
|                Name|DaysToManufacture|SumOrderQty|
+--------------------+-----------------+-----------+
|Sport-100 Helmet,...|                0|      33715|
|        AWC Logo Cap|                0|      33244|
|Sport-100 Helmet,...|                0|      32660|
+--------------------+-----------------+-----------+
only showing top 3 rows



In [12]:
# Vamos gravar essa tabela completa com os JOIN´s em uma nova tabela na nossa camada CURATED.
question_2.write.mode("overwrite").save("gs://bike-factory-datalake/03.CURATED/question_2")

                                                                                

3.	Escreva uma query ligando as tabelas Person.Person, Sales.Customer e Sales.SalesOrderHeader de forma a obter uma lista de nomes de clientes e uma contagem de pedidos efetuados.


In [13]:
# Ao avaliar o modelo relacional disponibilizada, o mesmo informa que a FK da tabela Sales.Customer na Person.Person é o campo PersonID, porém essa campo não existe.
# Contudo, foi possível fazer o relacionamento entre elas usando o campo BusinessEntityID da tabela Person.Person.
question_3 = spark.sql("""
                        SELECT PE.CompleteName, count(SOH.SalesOrderID) as OrderQtdy
                        FROM Sales_SalesOrderHeader as SOH
                        INNER JOIN Sales_Customer as C on C.CustomerID = SOH.CustomerID
                        INNER JOIN Person_Person as PE on PE.BusinessEntityID = C.CustomerID
                        GROUP BY PE.CompleteName ORDER BY PE.CompleteName""") 


In [14]:
question_3.show()

+-----------------+---------+
|     CompleteName|OrderQtdy|
+-----------------+---------+
|  Aaron  Edwards |        3|
|Aaron  Hernandez |        2|
|     Aaron  Hill |        2|
|  Aaron  Roberts |        2|
|   Aaron A Allen |        1|
|   Aaron B Adams |        1|
|Aaron C Campbell |        2|
|   Aaron C Scott |        2|
|   Aaron E Baker |        1|
|   Aaron E Evans |        3|
|  Aaron J Carter |        2|
|Aaron J McDonald |        1|
|    Aaron K Hall |        1|
|    Aaron L King |        2|
|   Aaron L Perez |        1|
|  Aaron L Wright |        1|
|Aaron M Gonzalez |        2|
|   Aaron M Young |        1|
| Aaron P Collins |        4|
|   Aaron R Green |        1|
+-----------------+---------+
only showing top 20 rows



In [15]:
# Persistindo os dados dessa nova tabela com os JOIN´s na camada CURATED
question_3.write.mode("overwrite").save("gs://bike-factory-datalake/03.CURATED/question_3")

                                                                                

4.	Escreva uma query usando as tabelas Sales.SalesOrderHeader, Sales.SalesOrderDetail e Production.Product, de forma a obter a soma total de produtos (OrderQty) por ProductID e OrderDate.

In [16]:
question_4 = spark.sql("""
                       SELECT SOD.ProductID ,SOH.OrderDate,sum(SOD.OrderQty) as OrderQty
                        FROM Sales_SalesOrderHeader as SOH
                        INNER JOIN Sales_SalesOrderDetail as SOD on SOD.SalesOrderID  = SOH.SalesOrderID
                        INNER JOIN Production_Product as PP on PP.ProductID = SOD.ProductID
                        GROUP BY SOD.ProductID, SOH.OrderDate
                        ORDER BY OrderDate""")

In [17]:
question_4.show()



+---------+-------------------+--------+
|ProductID|          OrderDate|OrderQty|
+---------+-------------------+--------+
|      773|2011-05-31 00:00:00|      17|
|      776|2011-05-31 00:00:00|      16|
|      777|2011-05-31 00:00:00|      23|
|      732|2011-05-31 00:00:00|      16|
|      716|2011-05-31 00:00:00|      19|
|      772|2011-05-31 00:00:00|       5|
|      743|2011-05-31 00:00:00|       1|
|      770|2011-05-31 00:00:00|      29|
|      753|2011-05-31 00:00:00|      14|
|      710|2011-05-31 00:00:00|       5|
|      729|2011-05-31 00:00:00|      16|
|      747|2011-05-31 00:00:00|       4|
|      748|2011-05-31 00:00:00|       2|
|      764|2011-05-31 00:00:00|      14|
|      758|2011-05-31 00:00:00|      46|
|      775|2011-05-31 00:00:00|      22|
|      715|2011-05-31 00:00:00|      49|
|      762|2011-05-31 00:00:00|      44|
|      767|2011-05-31 00:00:00|       1|
|      707|2011-05-31 00:00:00|      24|
+---------+-------------------+--------+
only showing top

                                                                                

In [18]:
# Persistindo os dados dessa nova tabela com os JOIN´s na camada CURATED
question_4.write.mode("overwrite").save("gs://bike-factory-datalake/03.CURATED/question_4")

                                                                                

5.	Escreva uma query mostrando os campos SalesOrderID, OrderDate e TotalDue da tabela Sales.SalesOrderHeader. Obtenha apenas as linhas onde a ordem tenha sido feita durante o mês de setembro/2011 e o total devido esteja acima de 1.000. Ordene pelo total devido decrescente.


In [19]:
question_5 = spark.sql("""
                       SELECT SalesOrderID, OrderDate, TotalDue
                        FROM Sales_SalesOrderHeader
                        WHERE OrderDate BETWEEN '2011-09-01' AND '2011-09-30'
                        AND TotalDue > 1000
                        ORDER BY TotalDue desc""")

In [20]:
question_5.show()

+------------+-------------------+---------+
|SalesOrderID|          OrderDate| TotalDue|
+------------+-------------------+---------+
|       44348|2011-09-07 00:00:00|3953.9884|
|       44372|2011-09-09 00:00:00|3953.9884|
|       44349|2011-09-07 00:00:00|3953.9884|
|       44350|2011-09-07 00:00:00|3953.9884|
|       44371|2011-09-09 00:00:00|3953.9884|
|       44351|2011-09-07 00:00:00|3953.9884|
|       44328|2011-09-02 00:00:00|3953.9884|
|       44352|2011-09-07 00:00:00|3953.9884|
|       44330|2011-09-02 00:00:00|3953.9884|
|       44332|2011-09-03 00:00:00|3953.9884|
|       44370|2011-09-09 00:00:00|3953.9884|
|       44357|2011-09-07 00:00:00|3953.9884|
|       44338|2011-09-04 00:00:00|3953.9884|
|       44358|2011-09-07 00:00:00|3953.9884|
|       44340|2011-09-04 00:00:00|3953.9884|
|       44359|2011-09-08 00:00:00|3953.9884|
|       44347|2011-09-06 00:00:00|3953.9884|
|       44324|2011-09-01 00:00:00|3953.9884|
|       44326|2011-09-01 00:00:00|3953.9884|
|       44

In [21]:
# Persistindo os dados dessa nova tabela com os JOIN´s na camada CURATED
question_5.write.mode("overwrite").save("gs://bike-factory-datalake/03.CURATED/question_5")

                                                                                

In [22]:
spark.sql("DROP TABLE Person_Person")
spark.sql("DROP TABLE Production_Product")
spark.sql("DROP TABLE Sales_Customer")
spark.sql("DROP TABLE Sales_SalesOrderDetail")
spark.sql("DROP TABLE Sales_SalesOrderHeader")
spark.sql("DROP TABLE Sales_SpecialOfferProduct")

DataFrame[]

In [23]:
spark.stop()