## Instalações e bibliotecas necessárias

In [5]:
!pip install PYSPARK &> /dev/null

In [6]:
!pip install gcsfs &> /dev/null

In [7]:
!pip install sqlalchemy &> /dev/null

In [8]:
!pip install pymysql &> /dev/null

In [9]:
!pip install mysql-connector-python &> /dev/null

In [98]:
!pip install colab-file-browser

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
[31mERROR: Could not find a version that satisfies the requirement colab-file-browser (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for colab-file-browser[0m[31m
[0m

In [2]:
# Bibliotecas Necessárias
from pyspark.sql import SparkSession
from pyspark import SparkConf
import pyspark.sql.functions as F
import os

import pandas as pd

import pymysql
from sqlalchemy import create_engine
import mysql.connector
from mysql.connector import Error
from IPython.lib import passwd

# Bibliotecas do cloud storage
from google.cloud import storage

## Conectores e Funções

In [3]:
# Configuração de chave de segurança

serviceAccount = '/content/desafio-rox-375303-3b169fea64c6.json'
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = serviceAccount

In [4]:
# Ip colab necessário para acesso ao MySQL na GCP

!curl ipecho.net/plain

34.81.120.183

In [5]:
# Acesso ao bucket na gcp
client = storage.Client()

# Variável para receber o nome da bucket
bucket = client.get_bucket('desafio-rox-leandro')

# Variável que vai receber o caminho dos arquivos RAW

path_Person = 'gs://desafio-rox-leandro/RAW/Person.Person.csv'
path_Product = 'gs://desafio-rox-leandro/RAW/Production.Product.csv'
path_Customer = 'gs://desafio-rox-leandro/RAW/Sales.Customer.csv'
path_SalesOrderDetail = 'gs://desafio-rox-leandro/RAW/Sales.SalesOrderDetail.csv'
path_SalesOrderHeader = 'gs://desafio-rox-leandro/RAW/Sales.SalesOrderHeader.csv'
path_SpecialOfferProduct = 'gs://desafio-rox-leandro/RAW/Sales.SpecialOfferProduct.csv'

In [6]:
# Executa o comando e altera o banco no mysql

def executar_sql(conexao, sql): 
  cursor = conexao.cursor(buffered=True)
  try:
    cursor.execute(sql)
    conexao.commit() 
    print('Query executada com sucesso!')
  except mysql.connector.Error as err:
    print('Erro ao executar a Query!',err)

In [7]:
# Executa o comando e retorna uma tupla como resultado da query

def retorno_sql(conexao, sql): 
  cursor = conexao.cursor(buffered=True)
  retorno = None
  try:
    cursor.execute(sql)
    retorno = cursor.fetchall()
    return retorno
  except mysql.connector.Error as err:
    print('Erro ao listar os dados da Query!',err)

## Analises em pyspark para tratamento 

In [8]:
# Criando a sparkSession

spark = (
    SparkSession.builder
         .master('local')
         .appName('desafio_rox')
         .config('spark.ui.port', '4050')
         .config("spark.jars", 'https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop2-latest.jar')
         .getOrCreate()
         )

spark

### df_Person

In [9]:
# Criando o dataframe em spark

df_Person = (
    spark.read.format('csv')
              .option('delimiter', ';')
              .option('header', 'true')
              .option('inferschema', 'true')
              .load(path_Person)
)

In [10]:
# Analisando o esquema do dataframe

df_Person.printSchema()

root
 |-- BusinessEntityID: integer (nullable = true)
 |-- PersonType: string (nullable = true)
 |-- NameStyle: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Suffix: string (nullable = true)
 |-- EmailPromotion: integer (nullable = true)
 |-- AdditionalContactInfo: string (nullable = true)
 |-- Demographics: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)



In [11]:
# Exibindo a tabela para início da análise

df_Person.show(3)

+----------------+----------+---------+-----+---------+----------+----------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|  LastName|Suffix|EmailPromotion|AdditionalContactInfo|        Demographics|             rowguid|       ModifiedDate|
+----------------+----------+---------+-----+---------+----------+----------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|               1|        EM|        0| NULL|      Ken|         J|   Sánchez|  NULL|             0|                 NULL|"<IndividualSurve...|92C4279F-1207-48A...|2009-01-07 00:00:00|
|               2|        EM|        0| NULL|    Terri|       Lee|     Duffy|  NULL|             1|                 NULL|"<IndividualSurve...|D8763459-8AA8-47C...|2008-01-24 00:00:00|
|               3|        EM|        0| NULL|  Roberto|      NULL|Tamburello|  N

In [12]:
# Verificando linhas repetidas no dataframe

linhas_repetidas = (df_Person.count() - df_Person.distinct().count())
print(linhas_repetidas)

0


In [13]:
# Buscando por por erros graves como por exemplo /*@#$% em todas as colunas
# trazendo valores únicos e ordenando asc e desc eles estão no topo ou no final

colunas = df_Person.columns
for i in range(len(colunas)):
  df_Person.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=False).show(5)
  df_Person.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=True).show(5)


+----------------+
|BusinessEntityID|
+----------------+
|           20777|
|           20776|
|           20775|
|           20774|
|           20773|
+----------------+
only showing top 5 rows

+----------------+
|BusinessEntityID|
+----------------+
|               1|
|               2|
|               3|
|               4|
|               5|
+----------------+
only showing top 5 rows

+----------+
|PersonType|
+----------+
|        VC|
|        SP|
|        SC|
|        IN|
|        GC|
+----------+
only showing top 5 rows

+----------+
|PersonType|
+----------+
|        EM|
|        GC|
|        IN|
|        SC|
|        SP|
+----------+
only showing top 5 rows

+---------+
|NameStyle|
+---------+
|        0|
+---------+

+---------+
|NameStyle|
+---------+
|        0|
+---------+

+-----+
|Title|
+-----+
| Sra.|
|  Sr.|
| NULL|
|  Ms.|
|   Ms|
+-----+
only showing top 5 rows

+-----+
|Title|
+-----+
|  Mr.|
| Mrs.|
|   Ms|
|  Ms.|
| NULL|
+-----+
only showing top 5 rows

+-------

In [14]:
# COLUNA MiddleName aparenta ter um número 1
# Procurando por número dentro da coluna MiddleName
# Letra L minúscula, curiosamente sendo considerada após a letra Z no código anterior

df_Person.select(F.col('MiddleName')).filter(F.col('MiddleName') == 'l').show(50)


+----------+
|MiddleName|
+----------+
|         l|
|         l|
|         l|
|         l|
|         l|
+----------+



In [15]:
# Pré análise PersonType e Quantidade.

df_Person.groupBy('PersonType').count().show(10)

+----------+-----+
|PersonType|count|
+----------+-----+
|        SC|  753|
|        SP|   17|
|        IN|18484|
|        EM|  273|
|        GC|  289|
|        VC|  156|
+----------+-----+



In [16]:
'''
Dataframe ok, sem alteracao
'''

'\nDataframe ok, sem alteracao\n'

### df_Product

In [17]:
# Criando o dataframe em spark

df_Product = (
    spark.read.format('csv')
              .option('delimiter', ';')
              .option('header', 'true')
              .option('inferschema', 'true')
              .load(path_Product)
)

In [18]:
# Analisando o esquema do dataframe

df_Product.printSchema()

root
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: integer (nullable = true)
 |-- FinishedGoodsFlag: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLevel: integer (nullable = true)
 |-- ReorderPoint: integer (nullable = true)
 |-- StandardCost: string (nullable = true)
 |-- ListPrice: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- SizeUnitMeasureCode: string (nullable = true)
 |-- WeightUnitMeasureCode: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- DaysToManufacture: integer (nullable = true)
 |-- ProductLine: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- ProductSubcategoryID: string (nullable = true)
 |-- ProductModelID: string (nullable = true)
 |-- SellStartDate: timestamp (nullable = true)
 |-- SellEndDate: string (nullable = true)
 |-- DiscontinuedDate: string (

In [19]:
# Exibindo a tabela para início da análise


df_Product.show(3)

+---------+---------------+-------------+--------+-----------------+-----+----------------+------------+------------+---------+----+-------------------+---------------------+------+-----------------+-----------+-----+-----+--------------------+--------------+-------------------+-----------+----------------+--------------------+--------------------+
|ProductID|           Name|ProductNumber|MakeFlag|FinishedGoodsFlag|Color|SafetyStockLevel|ReorderPoint|StandardCost|ListPrice|Size|SizeUnitMeasureCode|WeightUnitMeasureCode|Weight|DaysToManufacture|ProductLine|Class|Style|ProductSubcategoryID|ProductModelID|      SellStartDate|SellEndDate|DiscontinuedDate|             rowguid|        ModifiedDate|
+---------+---------------+-------------+--------+-----------------+-----+----------------+------------+------------+---------+----+-------------------+---------------------+------+-----------------+-----------+-----+-----+--------------------+--------------+-------------------+-----------+-------

In [20]:
# Verificando linhas repetidas no dataframe

linhas_repetidas = (df_Product.count() - df_Product.distinct().count())
print(linhas_repetidas)

0


In [21]:
# Buscando por por erros graves como por exemplo /*@#$% em todas as colunas
# trazendo valores únicos e ordenando asc e desc eles estão no topo ou no final

colunas = df_Product.columns
for i in range(len(colunas)):
  df_Product.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=False).show(5)
  df_Product.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=True).show(5)


+---------+
|ProductID|
+---------+
|      999|
|      998|
|      997|
|      996|
|      995|
+---------+
only showing top 5 rows

+---------+
|ProductID|
+---------+
|        1|
|        2|
|        3|
|        4|
|      316|
+---------+
only showing top 5 rows

+--------------------+
|                Name|
+--------------------+
|   Women's Tights, S|
|   Women's Tights, M|
|   Women's Tights, L|
|Women's Mountain ...|
|Women's Mountain ...|
+--------------------+
only showing top 5 rows

+--------------------+
|                Name|
+--------------------+
|        AWC Logo Cap|
|     Adjustable Race|
|All-Purpose Bike ...|
|     BB Ball Bearing|
|        Bearing Ball|
+--------------------+
only showing top 5 rows

+-------------+
|ProductNumber|
+-------------+
|      WB-H098|
|    VE-C304-S|
|    VE-C304-M|
|    VE-C304-L|
|      TT-T092|
+-------------+
only showing top 5 rows

+-------------+
|ProductNumber|
+-------------+
|      AR-5381|
|      BA-8327|
|      BB-7421|
|    

In [22]:
# COLUNA DiscontinuedDate aparenta ter um único valor no caso NULL
# Decidi nao dropar

df_Product.select(F.col('DiscontinuedDate')).filter(F.col('DiscontinuedDate') != 'NULL').show(50)


+----------------+
|DiscontinuedDate|
+----------------+
+----------------+



In [23]:
# Pré análise DaysToManufacture e Quantidade.

df_Product.groupBy('DaysToManufacture').count().show(10)

+-----------------+-----+
|DaysToManufacture|count|
+-----------------+-----+
|                1|  154|
|                4|   97|
|                2|    7|
|                0|  246|
+-----------------+-----+



In [24]:
'''
Dataframe ok, 
alterar , para . em (StandardCost, ListPrice)
'''

'\nDataframe ok, \nalterar , para . em (StandardCost, ListPrice)\n'

### df_Customer

In [25]:
# Criando o dataframe em spark

df_Customer = (
    spark.read.format('csv')
              .option('delimiter', ';')
              .option('header', 'true')
              .option('inferschema', 'true')
              .load(path_Customer)
)

In [26]:
# Analisando o esquema do dataframe

df_Customer.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- PersonID: string (nullable = true)
 |-- StoreID: string (nullable = true)
 |-- TerritoryID: integer (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)



In [27]:
# Exibindo a tabela para início da análise

df_Customer.show(3)

+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|CustomerID|PersonID|StoreID|TerritoryID|AccountNumber|             rowguid|        ModifiedDate|
+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|         1|    NULL|    934|          1|   AW00000001|3F5AE95E-B87D-4AE...|2014-09-12 11:15:...|
|         2|    NULL|   1028|          1|   AW00000002|E552F657-A9AF-4A7...|2014-09-12 11:15:...|
|         3|    NULL|    642|          4|   AW00000003|130774B1-DB21-4EF...|2014-09-12 11:15:...|
+----------+--------+-------+-----------+-------------+--------------------+--------------------+
only showing top 3 rows



In [28]:
# Verificando linhas repetidas no dataframe

linhas_repetidas = (df_Customer.count() - df_Customer.distinct().count())
print(linhas_repetidas)

0


In [29]:
# Buscando por por erros graves como por exemplo /*@#$% em todas as colunas
# trazendo valores únicos e ordenando asc e desc eles estão no topo ou no final

colunas = df_Customer.columns
for i in range(len(colunas)):
  df_Customer.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=False).show(5)
  df_Customer.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=True).show(5)


+----------+
|CustomerID|
+----------+
|     30118|
|     30117|
|     30116|
|     30115|
|     30114|
+----------+
only showing top 5 rows

+----------+
|CustomerID|
+----------+
|         1|
|         2|
|         3|
|         4|
|         5|
+----------+
only showing top 5 rows

+--------+
|PersonID|
+--------+
|    NULL|
|    9999|
|    9998|
|    9997|
|    9996|
+--------+
only showing top 5 rows

+--------+
|PersonID|
+--------+
|   10000|
|   10001|
|   10002|
|   10003|
|   10004|
+--------+
only showing top 5 rows

+-------+
|StoreID|
+-------+
|   NULL|
|    998|
|    996|
|    994|
|    992|
+-------+
only showing top 5 rows

+-------+
|StoreID|
+-------+
|   1000|
|   1002|
|   1004|
|   1006|
|   1008|
+-------+
only showing top 5 rows

+-----------+
|TerritoryID|
+-----------+
|         10|
|          9|
|          8|
|          7|
|          6|
+-----------+
only showing top 5 rows

+-----------+
|TerritoryID|
+-----------+
|          1|
|          2|
|          3|
|  

In [30]:
# COLUNA ModifiedDate aparenta ter um único valor no caso uma data específica
# Decidi nao dropar

df_Customer.select(F.col('ModifiedDate')).distinct().show(5)


+--------------------+
|        ModifiedDate|
+--------------------+
|2014-09-12 11:15:...|
+--------------------+



In [31]:
'''
Dataframe ok, sem alteracao
'''

'\nDataframe ok, sem alteracao\n'

### df_SalesOrderDetail

In [32]:
# Criando o dataframe em spark

df_SalesOrderDetail = (
    spark.read.format('csv')
              .option('delimiter', ';')
              .option('header', 'true')
              .option('inferschema', 'true')
              .load(path_SalesOrderDetail)
)

In [33]:
# Analisando o esquema do dataframe

df_SalesOrderDetail.printSchema()

root
 |-- SalesOrderID: integer (nullable = true)
 |-- SalesOrderDetailID: integer (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- OrderQty: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- SpecialOfferID: integer (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- UnitPriceDiscount: string (nullable = true)
 |-- LineTotal: double (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)



In [34]:
# Exibindo a tabela para início da análise

df_SalesOrderDetail.show(3)

+------------+------------------+---------------------+--------+---------+--------------+---------+-----------------+---------+--------------------+-------------------+
|SalesOrderID|SalesOrderDetailID|CarrierTrackingNumber|OrderQty|ProductID|SpecialOfferID|UnitPrice|UnitPriceDiscount|LineTotal|             rowguid|       ModifiedDate|
+------------+------------------+---------------------+--------+---------+--------------+---------+-----------------+---------+--------------------+-------------------+
|       43659|                 1|         4911-403C-98|       1|      776|             1| 2024,994|             0,00| 2024.994|B207C96D-D9E6-402...|2011-05-31 00:00:00|
|       43659|                 2|         4911-403C-98|       3|      777|             1| 2024,994|             0,00| 6074.982|7ABB600D-1E77-41B...|2011-05-31 00:00:00|
|       43659|                 3|         4911-403C-98|       1|      778|             1| 2024,994|             0,00| 2024.994|475CF8C6-49F6-486...|2011-05

In [35]:
# Verificando linhas repetidas no dataframe

linhas_repetidas = (df_SalesOrderDetail.count() - df_SalesOrderDetail.distinct().count())
print(linhas_repetidas)

0


In [36]:
# Buscando por por erros graves como por exemplo /*@#$% em todas as colunas
# trazendo valores únicos e ordenando asc e desc eles estão no topo ou no final

colunas = df_SalesOrderDetail.columns
for i in range(len(colunas)):
  df_SalesOrderDetail.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=False).show(5)
  df_SalesOrderDetail.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=True).show(5)


+------------+
|SalesOrderID|
+------------+
|       75123|
|       75122|
|       75121|
|       75120|
|       75119|
+------------+
only showing top 5 rows

+------------+
|SalesOrderID|
+------------+
|       43659|
|       43660|
|       43661|
|       43662|
|       43663|
+------------+
only showing top 5 rows

+------------------+
|SalesOrderDetailID|
+------------------+
|            121317|
|            121316|
|            121315|
|            121314|
|            121313|
+------------------+
only showing top 5 rows

+------------------+
|SalesOrderDetailID|
+------------------+
|                 1|
|                 2|
|                 3|
|                 4|
|                 5|
+------------------+
only showing top 5 rows

+---------------------+
|CarrierTrackingNumber|
+---------------------+
|                 NULL|
|         FFF9-4C3E-A9|
|         FFE9-4F36-B2|
|         FFE3-4820-88|
|         FFD2-4B06-B4|
+---------------------+
only showing top 5 rows

+----------

In [37]:
# Pré análise Ordens Produto e Total.

df_SalesOrderDetail.groupBy('ProductID').agg(F.round(F.sum('LineTotal'), 2).alias('Somatorio_total'), 
                                             F.max('OrderQty').alias('Maximo_ordens')).show(10)


+---------+---------------+-------------+
|ProductID|Somatorio_total|Maximo_ordens|
+---------+---------------+-------------+
|      833|       68167.51|            6|
|      858|       36490.55|           15|
|      897|         800.21|            3|
|      808|       22008.27|           16|
|      883|       98472.72|           21|
|      879|        39591.0|            1|
|      799|      932039.59|            9|
|      804|       61034.61|           14|
|      970|      438867.48|           11|
|      898|        3000.78|            3|
+---------+---------------+-------------+
only showing top 10 rows



In [38]:
'''
Dataframe ok, 
Alterar , pra . em (UnitPrice UnitPriceDiscount)
'''

'\nDataframe ok, \nAlterar , pra . em (UnitPrice UnitPriceDiscount)\n'

### df_SalesOrderHeader

In [39]:
# Criando o dataframe em spark

df_SalesOrderHeader = (
    spark.read.format('csv')
              .option('delimiter', ';')
              .option('header', 'true')
              .option('inferschema', 'true')
              .load(path_SalesOrderHeader)
)

In [40]:
# Analisando o esquema do dataframe

df_SalesOrderHeader.printSchema()

root
 |-- SalesOrderID: integer (nullable = true)
 |-- RevisionNumber: integer (nullable = true)
 |-- OrderDate: timestamp (nullable = true)
 |-- DueDate: timestamp (nullable = true)
 |-- ShipDate: timestamp (nullable = true)
 |-- Status: integer (nullable = true)
 |-- OnlineOrderFlag: integer (nullable = true)
 |-- SalesOrderNumber: string (nullable = true)
 |-- PurchaseOrderNumber: string (nullable = true)
 |-- AccountNumber: string (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- SalesPersonID: string (nullable = true)
 |-- TerritoryID: integer (nullable = true)
 |-- BillToAddressID: integer (nullable = true)
 |-- ShipToAddressID: integer (nullable = true)
 |-- ShipMethodID: integer (nullable = true)
 |-- CreditCardID: string (nullable = true)
 |-- CreditCardApprovalCode: string (nullable = true)
 |-- CurrencyRateID: string (nullable = true)
 |-- SubTotal: string (nullable = true)
 |-- TaxAmt: string (nullable = true)
 |-- Freight: string (nullable = true)
 |-- Tota

In [41]:
# Exibindo a tabela para início da análise

df_SalesOrderHeader.show(3)

+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+-------------+-----------+---------------+---------------+------------+------------+----------------------+--------------+----------+---------+--------+----------+-------+--------------------+-------------------+
|SalesOrderID|RevisionNumber|          OrderDate|            DueDate|           ShipDate|Status|OnlineOrderFlag|SalesOrderNumber|PurchaseOrderNumber| AccountNumber|CustomerID|SalesPersonID|TerritoryID|BillToAddressID|ShipToAddressID|ShipMethodID|CreditCardID|CreditCardApprovalCode|CurrencyRateID|  SubTotal|   TaxAmt| Freight|  TotalDue|Comment|             rowguid|       ModifiedDate|
+------------+--------------+-------------------+-------------------+-------------------+------+---------------+----------------+-------------------+--------------+----------+-------------+-----------+---------------+-------

In [42]:
# Verificando linhas repetidas no dataframe

linhas_repetidas = (df_SalesOrderHeader.count() - df_SalesOrderHeader.distinct().count())
print(linhas_repetidas)

0


In [43]:
# Buscando por por erros graves como por exemplo /*@#$% em todas as colunas
# trazendo valores únicos e ordenando asc e desc eles estão no topo ou no final

colunas = df_SalesOrderHeader.columns
for i in range(len(colunas)):
  df_SalesOrderHeader.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=False).show(5)
  df_SalesOrderHeader.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=True).show(5)


+------------+
|SalesOrderID|
+------------+
|       75123|
|       75122|
|       75121|
|       75120|
|       75119|
+------------+
only showing top 5 rows

+------------+
|SalesOrderID|
+------------+
|       43659|
|       43660|
|       43661|
|       43662|
|       43663|
+------------+
only showing top 5 rows

+--------------+
|RevisionNumber|
+--------------+
|             9|
|             8|
+--------------+

+--------------+
|RevisionNumber|
+--------------+
|             8|
|             9|
+--------------+

+-------------------+
|          OrderDate|
+-------------------+
|2014-06-30 00:00:00|
|2014-06-29 00:00:00|
|2014-06-28 00:00:00|
|2014-06-27 00:00:00|
|2014-06-26 00:00:00|
+-------------------+
only showing top 5 rows

+-------------------+
|          OrderDate|
+-------------------+
|2011-05-31 00:00:00|
|2011-06-01 00:00:00|
|2011-06-02 00:00:00|
|2011-06-03 00:00:00|
|2011-06-04 00:00:00|
+-------------------+
only showing top 5 rows

+-------------------+
|     

In [44]:
# COLUNA Comment e Status aparenta ter um único valor 
# Decidi nao dropar

df_SalesOrderHeader.select(F.col('Comment')).distinct().show(5)
df_SalesOrderHeader.select(F.col('Status')).distinct().show(5)

+-------+
|Comment|
+-------+
|   NULL|
+-------+

+------+
|Status|
+------+
|     5|
+------+



In [45]:
# Pré análise TerritoryID e Quantidade.

df_SalesOrderHeader.groupBy('TerritoryID').count().show(10)

+-----------+-----+
|TerritoryID|count|
+-----------+-----+
|          1| 4594|
|          6| 4067|
|          3|  385|
|          5|  486|
|          9| 6843|
|          4| 6224|
|          8| 2623|
|          7| 2672|
|         10| 3219|
|          2|  352|
+-----------+-----+



In [46]:
'''
Dataframe ok, 
Alterar , para . em  (SubTotal TaxAmt Freight TotalDue)
'''

'\nDataframe ok, \nAlterar , para . em  (SubTotal TaxAmt Freight TotalDue)\n'

### df_SpecialOfferProduct

In [47]:
# Criando o dataframe em spark

df_SpecialOfferProduct = (
    spark.read.format('csv')
              .option('delimiter', ';')
              .option('header', 'true')
              .option('inferschema', 'true')
              .load(path_SpecialOfferProduct)
)

In [48]:
# Analisando o esquema do dataframe

df_SpecialOfferProduct.printSchema()

root
 |-- SpecialOfferID: integer (nullable = true)
 |-- ProductID: integer (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)



In [49]:
# Exibindo a tabela para início da análise
# SpecialOfferID com valores iguais.

df_SpecialOfferProduct.show(3)

+--------------+---------+--------------------+-------------------+
|SpecialOfferID|ProductID|             rowguid|       ModifiedDate|
+--------------+---------+--------------------+-------------------+
|             1|      680|BB30B868-D86C-455...|2011-04-01 00:00:00|
|             1|      706|B3C9A4B1-2AE6-4CB...|2011-04-01 00:00:00|
|             1|      707|27B711FE-0B77-4EA...|2011-04-01 00:00:00|
+--------------+---------+--------------------+-------------------+
only showing top 3 rows



In [50]:
# Verificando linhas repetidas no dataframe

linhas_repetidas = (df_SpecialOfferProduct.count() - df_SpecialOfferProduct.distinct().count())
print(linhas_repetidas)

0


In [51]:
# Buscando por por erros graves como por exemplo /*@#$% em todas as colunas
# trazendo valores únicos e ordenando asc e desc eles estão no topo ou no final

colunas = df_SpecialOfferProduct.columns
for i in range(len(colunas)):
  df_SpecialOfferProduct.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=False).show(5)
  df_SpecialOfferProduct.select(F.col(colunas[i])).distinct().orderBy(colunas[i], ascending=True).show(5)


+--------------+
|SpecialOfferID|
+--------------+
|            16|
|            15|
|            14|
|            13|
|            12|
+--------------+
only showing top 5 rows

+--------------+
|SpecialOfferID|
+--------------+
|             1|
|             2|
|             3|
|             4|
|             5|
+--------------+
only showing top 5 rows

+---------+
|ProductID|
+---------+
|      999|
|      998|
|      997|
|      996|
|      995|
+---------+
only showing top 5 rows

+---------+
|ProductID|
+---------+
|      680|
|      706|
|      707|
|      708|
|      709|
+---------+
only showing top 5 rows

+--------------------+
|             rowguid|
+--------------------+
|FFE24AE4-9E46-433...|
|FFC1F8D0-F9C4-452...|
|FF93AC1F-49DC-4A6...|
|FE9976C4-2B12-4F7...|
|FE70F9BC-B3F6-44B...|
+--------------------+
only showing top 5 rows

+--------------------+
|             rowguid|
+--------------------+
|0020931C-087C-42F...|
|00365938-3422-494...|
|005D8818-0E25-477...|
|00C21E7

In [52]:
# COLUNA SpecialOfferID aparenta ter somente alguns numeros e estao se repetindo - nao vai servir como pk
# Procurando por número dentro da coluna SpecialOfferID
# muitos id`s se repetem

df_SpecialOfferProduct.select(F.col('SpecialOfferID')).distinct().show(40)


+--------------+
|SpecialOfferID|
+--------------+
|            12|
|             1|
|            13|
|            16|
|             3|
|             5|
|            15|
|             9|
|             4|
|             8|
|             7|
|            10|
|            11|
|            14|
|             2|
+--------------+



In [53]:
'''
Dataframe sem SpecialOfferID correto.
'''

'\nDataframe sem SpecialOfferID correto.\n'

## Tratamento em pandas e Exportacao dos arquivos do bucket para mysql 


### Person.Person

In [54]:
#Escolhendo o arquivo dentro do bucket

bucket.blob('Person.Person.csv')

<Blob: desafio-rox-leandro, Person.Person.csv, None>

In [55]:
# Criando o df em pandas para enviar para o mysql

df_Person = pd.read_csv(path_Person,sep=';')

In [56]:
# Codigo repetido em todos os passos, colab dando erro, foi preciso sempre redefinir o conector.
# Conector host:ip_banco, user:usuario, passwd: senha, db:banco criado

con = mysql.connector.connect(host='34.95.218.222',user='desafio-username',passwd='1234',db='bicicletas')
cur = con.cursor()
engine = create_engine('mysql+pymysql://desafio-username:1234@34.95.218.222/bicicletas')

In [57]:
# Enviando a tabela Person.Person para o MySql 

dfsql_Person = df_Person.to_sql('PersonPerson',con=engine,if_exists='replace',index=False)

In [None]:
# Verificando o conteúdo enviado

cur.execute('select * from PersonPerson')
cur.fetchall() 

### Production.Product

In [59]:
#Escolhendo o arquivo dentro do bucket

bucket.blob('Production.Product.csv')

<Blob: desafio-rox-leandro, Production.Product.csv, None>

In [60]:
# Criando o df em pandas para enviar para o mysql e ajustando colunas se necessário com o pandas
# alterar , para . em (StandardCost, ListPrice)

df_Product = pd.read_csv(path_Product,sep=';')

# Alteração das "," por "." nas colunas abaixo
df_Product["StandardCost"] = df_Product["StandardCost"].replace(",", ".", regex=True)
df_Product["ListPrice"] = df_Product["ListPrice"].replace(",", ".", regex=True)

In [61]:
# Codigo repetido em todos os passos, colab dando erro, foi preciso sempre redefinir o conector.
# Conector host:ip_banco, user:usuario, passwd: senha, db:banco criado

con = mysql.connector.connect(host='34.95.218.222',user='desafio-username',passwd='1234',db='bicicletas')
cur = con.cursor()
engine = create_engine('mysql+pymysql://desafio-username:1234@34.95.218.222/bicicletas')

In [62]:
# Enviando a tabela Production.Product para o MySql  

dfsql_Product = df_Product.to_sql('ProductionProduct',con=engine,if_exists='replace',index=False)

In [None]:
# Verificando o conteúdo enviado

cur.execute('select * from ProductionProduct')
cur.fetchall() 

### Sales.Customer

In [64]:
#Escolhendo o arquivo dentro do bucket

bucket.blob('Sales.Customer.csv')

<Blob: desafio-rox-leandro, Sales.Customer.csv, None>

In [65]:
# Criando o df em pandas para enviar para o mysql

df_Customer = pd.read_csv(path_Customer,sep=';')

In [66]:
# Codigo repetido em todos os passos, colab dando erro, foi preciso sempre redefinir o conector.
# Conector host:ip_banco, user:usuario, passwd: senha, db:banco criado

con = mysql.connector.connect(host='34.95.218.222',user='desafio-username',passwd='1234',db='bicicletas')
cur = con.cursor()
engine = create_engine('mysql+pymysql://desafio-username:1234@34.95.218.222/bicicletas')

In [67]:
# Enviando a tabela SalesCustomer para o MySql 

dfsql_Customer = df_Customer.to_sql('SalesCustomer',con=engine,if_exists='replace',index=False)

In [None]:
# Verificando o conteúdo enviado

cur.execute('select * from SalesCustomer')
cur.fetchall() 

### Sales.SalesOrderDetail

In [69]:
#Escolhendo o arquivo dentro do bucket

bucket.blob('Sales.SalesOrderDetail.csv')

<Blob: desafio-rox-leandro, Sales.SalesOrderDetail.csv, None>

In [70]:
# Criando o df em pandas para enviar para o mysql
# Alterar , pra . em (UnitPrice UnitPriceDiscount)
df_SalesOrderDetail = pd.read_csv(path_SalesOrderDetail,sep=';')

# Alteração das "," por "." nas colunas abaixo
df_SalesOrderDetail["UnitPrice"] = df_SalesOrderDetail["UnitPrice"].replace(",", ".", regex=True)
df_SalesOrderDetail["UnitPriceDiscount"] = df_SalesOrderDetail["UnitPriceDiscount"].replace(",", ".", regex=True)

# Garantindo para OrderQty o tipo INT
df_SalesOrderDetail["OrderQty"] = df_SalesOrderDetail["OrderQty"].apply(int)

In [71]:
# Codigo repetido em todos os passos, colab dando erro, foi preciso sempre redefinir o conector.
# Conector host:ip_banco, user:usuario, passwd: senha, db:banco criado

con = mysql.connector.connect(host='34.95.218.222',user='desafio-username',passwd='1234',db='bicicletas')
cur = con.cursor()
engine = create_engine('mysql+pymysql://desafio-username:1234@34.95.218.222/bicicletas')

In [72]:
# Enviando a tabela SalesSalesOrderDetail para o MySql 

dfsql_SalesOrderDetail = df_SalesOrderDetail.to_sql('SalesSalesOrderDetail',con=engine,if_exists='replace',index=False)

In [None]:
# Verificando o conteúdo enviado

cur.execute('select * from SalesSalesOrderDetail')
cur.fetchall() 

### Sales.SalesOrderHeader

In [74]:
#Escolhendo o arquivo dentro do bucket

bucket.blob('Sales.SalesOrderHeader.csv')

<Blob: desafio-rox-leandro, Sales.SalesOrderHeader.csv, None>

In [75]:
# Criando o df em pandas para enviar para o mysql
# Alterar , para . em  (SubTotal TaxAmt Freight TotalDue)

df_SalesOrderHeader = pd.read_csv(path_SalesOrderHeader,sep=';')

# Alteração das "," por "." nas colunas abaixo
df_SalesOrderHeader["SubTotal"] = df_SalesOrderHeader["SubTotal"].replace(",", ".", regex=True)
df_SalesOrderHeader["TaxAmt"] = df_SalesOrderHeader["TaxAmt"].replace(",", ".", regex=True)
df_SalesOrderHeader["Freight"] = df_SalesOrderHeader["Freight"].replace(",", ".", regex=True)

In [76]:
# Codigo repetido em todos os passos, colab dando erro, foi preciso sempre redefinir o conector.
# Conector host:ip_banco, user:usuario, passwd: senha, db:banco criado

con = mysql.connector.connect(host='34.95.218.222',user='desafio-username',passwd='1234',db='bicicletas')
cur = con.cursor()
engine = create_engine('mysql+pymysql://desafio-username:1234@34.95.218.222/bicicletas')

In [77]:
# Enviando a tabela SalesSalesOrderHeader para o MySql 

dfsql_SalesOrderHeader = df_SalesOrderHeader.to_sql('SalesSalesOrderHeader',con=engine,if_exists='replace',index=False)

In [None]:
# Verificando o conteúdo enviado

cur.execute('select * from SalesSalesOrderHeader')
cur.fetchall() 

### Sales.SpecialOfferProduct

In [79]:
#Escolhendo o arquivo dentro do bucket

bucket.blob('Sales.SpecialOfferProduct.csv')

<Blob: desafio-rox-leandro, Sales.SpecialOfferProduct.csv, None>

In [80]:
# Criando o df em pandas para enviar para o mysql
# SpecialOfferID com valores iguais.

df_SpecialOfferProduct = pd.read_csv(path_SpecialOfferProduct,sep=';')

# Definindo uma nova numeração de ID para SpecialOfferID
for i in range(len(df_SpecialOfferProduct)):
    df_SpecialOfferProduct.at[i, 'SpecialOfferID'] = i+1

In [81]:
# Codigo repetido em todos os passos, colab dando erro, foi preciso sempre redefinir o conector.
# Conector host:ip_banco, user:usuario, passwd: senha, db:banco criado

con = mysql.connector.connect(host='34.95.218.222',user='desafio-username',passwd='1234',db='bicicletas')
cur = con.cursor()
engine = create_engine('mysql+pymysql://desafio-username:1234@34.95.218.222/bicicletas')

In [82]:
# Enviando a tabela SalesSpecialOfferProduct para o MySql 

dfsql_SpecialOfferProduct = df_SpecialOfferProduct.to_sql('SalesSpecialOfferProduct',con=engine,if_exists='replace',index=False)

In [None]:
# Verificando o conteúdo enviado

cur.execute('select * from SalesSpecialOfferProduct')
cur.fetchall() 

## Ligacoes PK FK


In [84]:
# Primary Key para ProductID em ProductionProduct

query = 'ALTER TABLE ProductionProduct ADD PRIMARY KEY (ProductID)'

executar_sql(con,query)

Query executada com sucesso!


In [85]:
# Primary Key para BusinessEntityID em PersonPerson

query = 'ALTER TABLE PersonPerson ADD PRIMARY KEY (BusinessEntityID)'

executar_sql(con,query)

Query executada com sucesso!


In [86]:
# Primary Key para CustomerID em SalesCustomer

query = 'ALTER TABLE SalesCustomer ADD PRIMARY KEY (CustomerID)'

executar_sql(con,query)

Query executada com sucesso!


In [87]:
# Foreign Key na SalesSpecialOfferProduct vindo de ProductID em ProductionProduct 

query ='ALTER TABLE SalesSpecialOfferProduct \
        ADD CONSTRAINT fk_ProductionProduct FOREIGN KEY(ProductID) \
        REFERENCES ProductionProduct(ProductID)'

executar_sql(con,query)

Query executada com sucesso!


In [88]:
# Primary Key para SpecialOfferID em SalesSpecialOfferProduct
# Erro ao executar a Query! 1062 (23000): Duplicate entry '1' for key 'SalesSpecialOfferProduct.PRIMARY' (Resolvido) 

query = 'ALTER TABLE SalesSpecialOfferProduct ADD PRIMARY KEY (SpecialOfferID)'

executar_sql(con,query)

Query executada com sucesso!


In [89]:
# Primary Key para SalesOrderID em SalesSalesOrderHeader

query ='ALTER TABLE SalesSalesOrderHeader ADD PRIMARY KEY (SalesOrderID)'

executar_sql(con,query)

Query executada com sucesso!


In [90]:
# Primary Key para SalesOrderDetailID em SalesSalesOrderDetail

query ='ALTER TABLE SalesSalesOrderDetail ADD PRIMARY KEY (SalesOrderDetailID)'

executar_sql(con,query)

Query executada com sucesso!


In [91]:
# Foreign Key na SalesSalesOrderDetail vindo de SalesOrderID em SalesSalesOrderHeader 

query ='ALTER TABLE SalesSalesOrderDetail \
        ADD CONSTRAINT fk_SalesOrder FOREIGN KEY(SalesOrderID) \
        REFERENCES SalesSalesOrderHeader(SalesOrderID)'

executar_sql(con,query)

Query executada com sucesso!


## Análise de dados - Querys

In [92]:
#1 Escreva uma query que retorna a quantidade de linhas na tabela Sales.SalesOrderDetail 
#  pelo campo SalesOrderID, desde que tenham pelo menos três linhas de detalhes. 

query = "SELECT SalesOrderID, COUNT(*) \
         FROM SalesSalesOrderDetail \
         GROUP BY SalesOrderID \
         HAVING COUNT(SalesOrderID) >= 3"

query1 = retorno_sql(con,query)
print(query1)

[(43659, 12), (43661, 15), (43662, 22), (43664, 8), (43665, 10), (43666, 6), (43667, 4), (43668, 29), (43670, 4), (43671, 11), (43672, 3), (43673, 11), (43675, 9), (43676, 5), (43677, 12), (43678, 19), (43679, 3), (43680, 15), (43681, 21), (43682, 5), (43683, 13), (43684, 6), (43685, 4), (43686, 3), (43688, 11), (43689, 21), (43690, 4), (43692, 28), (43693, 10), (43694, 11), (43695, 8), (43843, 10), (43844, 4), (43845, 10), (43846, 4), (43847, 3), (43848, 5), (43849, 11), (43850, 11), (43851, 12), (43853, 6), (43855, 13), (43857, 14), (43858, 13), (43859, 4), (43860, 12), (43861, 27), (43862, 9), (43863, 4), (43864, 9), (43865, 14), (43866, 3), (43867, 18), (43869, 10), (43870, 5), (43871, 14), (43872, 16), (43873, 28), (43875, 21), (43876, 8), (43877, 10), (43879, 26), (43880, 13), (43881, 8), (43883, 17), (43884, 22), (43886, 21), (43888, 14), (43890, 13), (43891, 9), (43892, 13), (43894, 7), (43895, 15), (43896, 5), (43897, 14), (43898, 28), (43899, 13), (43900, 9), (43901, 10), (43

In [93]:
#2 Escreva uma query que ligue as tabelas Sales.SalesOrderDetail, Sales.SpecialOfferProduct e Production.Product e 
#  retorne os 3 produtos (Name) mais vendidos (pela soma de OrderQty), agrupados pelo número de dias para manufatura (DaysToManufacture).
 

query = 'SELECT ProductionProduct.Name, SUM(SalesSalesOrderDetail.OrderQty) AS TotalQty, ProductionProduct.DaysToManufacture \
          FROM SalesSpecialOfferProduct\
          JOIN SalesSalesOrderDetail \
            ON SalesSpecialOfferProduct.SpecialOfferID = SalesSalesOrderDetail.SpecialOfferID\
          JOIN ProductionProduct \
            ON ProductionProduct.ProductID = SalesSpecialOfferProduct.ProductID\
         GROUP BY ProductionProduct.Name, ProductionProduct.DaysToManufacture\
         ORDER BY TotalQty DESC LIMIT 3'

query2 = retorno_sql(con,query)
print(query2)

[('HL Road Frame - Black, 58', Decimal('238944'), 1), ('HL Road Frame - Red, 58', Decimal('18181'), 1), ('Sport-100 Helmet, Red', Decimal('10713'), 0)]


In [94]:
#3 Escreva uma query ligando as tabelas Person.Person, Sales.Customer e Sales.SalesOrderHeader 
#  de forma a obter uma lista de nomes de clientes e uma contagem de pedidos efetuados. 

query = 'SELECT PersonPerson.FirstName, PersonPerson.LastName, COUNT(SalesSalesOrderHeader.SalesOrderID) AS Qtd_Pedidos \
        FROM SalesCustomer\
        JOIN PersonPerson ON SalesCustomer.PersonID = PersonPerson.BusinessEntityID\
        JOIN SalesSalesOrderHeader ON SalesCustomer.CustomerID = SalesSalesOrderHeader.CustomerID\
        GROUP BY PersonPerson.FirstName, PersonPerson.LastName\
        ORDER BY Qtd_Pedidos DESC'

query3 = retorno_sql(con,query)
print(query3)

[('Dalton', 'Perez', 28), ('Mason', 'Roberts', 28), ('Nancy', 'Chapman', 27), ('Henry', 'Garcia', 27), ('Hailey', 'Patterson', 27), ('Fernando', 'Barnes', 27), ('Charles', 'Jackson', 27), ('Daniel', 'Davis', 27), ('Jason', 'Griffin', 27), ('Samantha', 'Jenkins', 27), ('Ryan', 'Thompson', 27), ('Ashley', 'Henderson', 27), ('Jennifer', 'Simmons', 27), ('April', 'Shan', 25), ('Chloe', 'Campbell', 17), ('Samantha', 'Russell', 17), ('Luke', 'Lal', 17), ('Eduardo', 'Patterson', 17), ('Ana', 'Perry', 17), ('Gina', 'Martin', 17), ('Luis', 'Diaz', 17), ('Andrew', 'Martinez', 16), ('Jared', 'Peterson', 16), ('Alexandra', 'Jenkins', 16), ('Sarah', 'Simmons', 16), ('Antonio', 'Bennett', 16), ('Sierra', 'Young', 16), ('Isabella', 'Russell', 16), ('Jasmine', 'Powell', 16), ('Jada', 'Morgan', 16), ('Arturo', 'Sun', 16), ('José', 'Hernandez', 16), ('Nicholas', 'Brown', 16), ('Jerome', 'Navarro', 16), ('Brandy', 'Chandra', 16), ('Roger', 'Harui', 13), ('Sunil', 'Uppal', 13), ('James', 'Hendergart', 12)

In [95]:
#4 Escreva uma query usando as tabelas Sales.SalesOrderHeader, Sales.SalesOrderDetail e Production.Product, 
#  de forma a obter a soma total de produtos (OrderQty) por ProductID e OrderDate.

query = 'SELECT SalesSalesOrderDetail.ProductID, \
                SUM(SalesSalesOrderDetail.OrderQty) AS OrderQty, \
                SalesSalesOrderHeader.OrderDate \
          FROM SalesSalesOrderHeader\
          JOIN SalesSalesOrderDetail \
            ON SalesSalesOrderHeader.SalesOrderID = SalesSalesOrderDetail.SalesOrderID\
          JOIN ProductionProduct \
            ON ProductionProduct.ProductID = SalesSalesOrderDetail.ProductID\
        GROUP BY SalesSalesOrderDetail.ProductID, SalesSalesOrderHeader.OrderDate\
        ORDER BY OrderQty DESC'

query4 = retorno_sql(con,query)
print(query4)

[(864, Decimal('498'), '2013-06-30 00:00:00.000'), (864, Decimal('465'), '2013-07-31 00:00:00.000'), (884, Decimal('444'), '2013-06-30 00:00:00.000'), (867, Decimal('427'), '2013-06-30 00:00:00.000'), (864, Decimal('424'), '2014-03-31 00:00:00.000'), (884, Decimal('420'), '2013-07-31 00:00:00.000'), (712, Decimal('415'), '2013-06-30 00:00:00.000'), (863, Decimal('409'), '2012-06-30 00:00:00.000'), (715, Decimal('406'), '2013-06-30 00:00:00.000'), (876, Decimal('397'), '2013-07-31 00:00:00.000'), (864, Decimal('383'), '2014-05-01 00:00:00.000'), (864, Decimal('383'), '2013-09-30 00:00:00.000'), (864, Decimal('380'), '2013-10-30 00:00:00.000'), (869, Decimal('374'), '2013-07-31 00:00:00.000'), (712, Decimal('363'), '2013-07-31 00:00:00.000'), (876, Decimal('363'), '2013-06-30 00:00:00.000'), (863, Decimal('358'), '2013-03-30 00:00:00.000'), (863, Decimal('357'), '2012-05-30 00:00:00.000'), (867, Decimal('356'), '2013-07-31 00:00:00.000'), (715, Decimal('354'), '2013-07-31 00:00:00.000'),

In [96]:
#5 Escreva uma query mostrando os campos SalesOrderID, OrderDate e TotalDue da tabela Sales.SalesOrderHeader. 
#  Obtenha apenas as linhas onde a ordem tenha sido feita durante o mês de setembro/2011 e o total devido esteja acima de 1.000. 
#  Ordene pelo total devido decrescente.

query = "SELECT SalesOrderID, OrderDate, ROUND(TotalDue , 2) AS Total_devido \
        FROM SalesSalesOrderHeader \
          WHERE OrderDate \
          BETWEEN '2011-09-01' AND '2011-09-30' AND TotalDue > 1000 \
        ORDER BY TotalDue DESC"

query5 = retorno_sql(con,query)
print(query5)


[(44324, '2011-09-01 00:00:00.000', 3953.0), (44478, '2011-09-29 00:00:00.000', 3953.0), (44326, '2011-09-01 00:00:00.000', 3953.0), (44327, '2011-09-02 00:00:00.000', 3953.0), (44328, '2011-09-02 00:00:00.000', 3953.0), (44329, '2011-09-02 00:00:00.000', 3953.0), (44330, '2011-09-02 00:00:00.000', 3953.0), (44331, '2011-09-03 00:00:00.000', 3953.0), (44332, '2011-09-03 00:00:00.000', 3953.0), (44477, '2011-09-29 00:00:00.000', 3953.0), (44334, '2011-09-04 00:00:00.000', 3953.0), (44476, '2011-09-29 00:00:00.000', 3953.0), (44473, '2011-09-29 00:00:00.000', 3953.0), (44338, '2011-09-04 00:00:00.000', 3953.0), (44339, '2011-09-04 00:00:00.000', 3953.0), (44340, '2011-09-04 00:00:00.000', 3953.0), (44472, '2011-09-29 00:00:00.000', 3953.0), (44343, '2011-09-05 00:00:00.000', 3953.0), (44344, '2011-09-06 00:00:00.000', 3953.0), (44345, '2011-09-06 00:00:00.000', 3953.0), (44347, '2011-09-06 00:00:00.000', 3953.0), (44348, '2011-09-07 00:00:00.000', 3953.0), (44349, '2011-09-07 00:00:00.00

## Salvando os dados na pasta TRUSTED

In [97]:
dfssave = [
          df_Person,
          df_Product,
          df_Customer,
          df_SalesOrderDetail,
          df_SalesOrderHeader,
          df_SpecialOfferProduct]

name_save = [
          'Person.Person',
          'Production.Product',
          'Sales.Customer',
          'Sales.SalesOrderDetail',
          'Sales.SalesOrderHeader',
          'Sales.SpecialOfferProduct'

]

for i in range(len(dfssave)):
  dfssave[i].to_csv(f'gs://desafio-rox-leandro/TRUSTED/{name_save[i]}.trusted.csv', index=False)