In [1]:
#Importação de bibliotecas

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, when, mean
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import col
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: 6492f62a-0f0d-415e-8eba-4978beb8dc3b
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session 6492f62a-0f0d-415e-8eba-4978beb8dc3b to get into ready status...
Session 6492f62a-0f0d-415e-8eba-4978beb8dc3b has been created.



In [2]:
spark = SparkSession.builder.appName("bicicletas").master('local[*]').getOrCreate()




In [3]:
#Especificação da origem do csv, que no caso está na s3 em um bucket chamado rodrigo-beltrao
bucket_s3 = "s3://rodrigo-beltrao/"




In [4]:
#Carregamento dos dados

df1 = spark.read.csv(bucket_s3 + "Person.Person.csv", sep=';', encoding='utf-8', header=True, inferSchema=True)
df2 = spark.read.csv(bucket_s3 + "Production.Product.csv", sep=';', encoding='utf-8', header=True, inferSchema=True)
df3 = spark.read.csv(bucket_s3 + "Sales.Customer.csv", sep=';', encoding='utf-8', header=True, inferSchema=True)
df4 = spark.read.csv(bucket_s3 + "Sales.SalesOrderDetail.csv", sep=';', encoding='utf-8', header=True, inferSchema=True)
df5 = spark.read.csv(bucket_s3 + "Sales.SalesOrderHeader.csv", sep=';', encoding='utf-8', header=True, inferSchema=True)
df6 = spark.read.csv(bucket_s3 + "Sales.SpecialOfferProduct.csv", sep=';', encoding='utf-8', header=True, inferSchema=True)




In [5]:
#Exibição dos esquemas dos dataframes

df1.printSchema()
df2.printSchema()
df3.printSchema()
df4.printSchema()
df5.printSchema()
df6.printSchema()


root
 |-- BusinessEntityID: integer (nullable = true)
 |-- PersonType: string (nullable = true)
 |-- NameStyle: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- MiddleName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Suffix: string (nullable = true)
 |-- EmailPromotion: integer (nullable = true)
 |-- AdditionalContactInfo: string (nullable = true)
 |-- Demographics: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: timestamp (nullable = true)

root
 |-- ProductID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- ProductNumber: string (nullable = true)
 |-- MakeFlag: integer (nullable = true)
 |-- FinishedGoodsFlag: integer (nullable = true)
 |-- Color: string (nullable = true)
 |-- SafetyStockLevel: integer (nullable = true)
 |-- ReorderPoint: integer (nullable = true)
 |-- StandardCost: string (nullable = true)
 |-- ListPrice: string (nullable = true

In [6]:
#Contagem de linhas de cada dataframe

print("Número de linhas em df1:", df1.count())
print("Número de linhas em df2:", df2.count())
print("Número de linhas em df3:", df3.count())
print("Número de linhas em df4:", df4.count())
print("Número de linhas em df5:", df5.count())
print("Número de linhas em df6:", df6.count())

Número de linhas em df1: 19972
Número de linhas em df2: 504
Número de linhas em df3: 19820
Número de linhas em df4: 121317
Número de linhas em df5: 31465
Número de linhas em df6: 538


In [7]:
#Exibição das 5 primeiras linhas de cada dataframe

df1.show(5)
df2.show(5)
df3.show(5)
df4.show(5)
df5.show(5)
df6.show(5)

+----------------+----------+---------+-----+---------+----------+----------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|  LastName|Suffix|EmailPromotion|AdditionalContactInfo|        Demographics|             rowguid|       ModifiedDate|
+----------------+----------+---------+-----+---------+----------+----------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|               1|        EM|        0| NULL|      Ken|         J|   Sánchez|  NULL|             0|                 NULL|"<IndividualSurve...|92C4279F-1207-48A...|2009-01-07 00:00:00|
|               2|        EM|        0| NULL|    Terri|       Lee|     Duffy|  NULL|             1|                 NULL|"<IndividualSurve...|D8763459-8AA8-47C...|2008-01-24 00:00:00|
|               3|        EM|        0| NULL|  Roberto|      NULL|Tamburello|  N

In [8]:
#Análise estatística descritiva básica

df1.describe().show()
df2.describe().show()
df3.describe().show()
df4.describe().show()
df5.describe().show()
df6.describe().show()

+-------+------------------+----------+---------+-----+---------+----------+--------+------+------------------+---------------------+--------------------+--------------------+
|summary|  BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|LastName|Suffix|    EmailPromotion|AdditionalContactInfo|        Demographics|             rowguid|
+-------+------------------+----------+---------+-----+---------+----------+--------+------+------------------+---------------------+--------------------+--------------------+
|  count|             19972|     19972|    19972|19972|    19972|     19972|   19972| 19972|             19972|                19972|               19972|               19972|
|   mean|10763.079411175646|      null|      0.0| null|     null|      null|    null|  null|0.6300821149609453|                 null|                null|                null|
| stddev| 5814.133271948063|      null|      0.0| null|     null|      null|    null|  null|0.7814331436634213|         

In [9]:
#Identificação de valores únicos em cada coluna

df1.select([countDistinct(col).alias(col) for col in df1.columns]).show()
df2.select([countDistinct(col).alias(col) for col in df2.columns]).show()
df3.select([countDistinct(col).alias(col) for col in df3.columns]).show()
df4.select([countDistinct(col).alias(col) for col in df4.columns]).show()
df5.select([countDistinct(col).alias(col) for col in df5.columns]).show()
df6.select([countDistinct(col).alias(col) for col in df6.columns]).show()

+----------------+----------+---------+-----+---------+----------+--------+------+--------------+---------------------+------------+-------+------------+
|BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|LastName|Suffix|EmailPromotion|AdditionalContactInfo|Demographics|rowguid|ModifiedDate|
+----------------+----------+---------+-----+---------+----------+--------+------+--------------+---------------------+------------+-------+------------+
|           19972|         6|        1|    7|     1018|        72|    1206|     7|             3|                   11|       19114|  19972|        1285|
+----------------+----------+---------+-----+---------+----------+--------+------+--------------+---------------------+------------+-------+------------+

+---------+----+-------------+--------+-----------------+-----+----------------+------------+------------+---------+----+-------------------+---------------------+------+-----------------+-----------+-----+-----+-----------------

In [10]:
"""Obs: Apesar do código feito em pyspark não retornar nenhum valor nulo, foi identicado na
análise exploratória valores "NULL" e "null". Isso possívelmente deve significar
que os dataframes realmente não devem ter valores nulos ou quebrados, mas que alguém
digitou as palavras "NULL" e "null" para identificar que não foi possível obter
essas informações. Vou verificar se isso se concretiza mais adiante no código."""


#Verificação de valores nulos

df1.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df1.columns)).show()
df2.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df2.columns)).show()
df3.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df3.columns)).show()
df4.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df4.columns)).show()
df5.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df5.columns)).show()
df6.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df6.columns)).show()

+----------------+----------+---------+-----+---------+----------+--------+------+--------------+---------------------+------------+-------+------------+
|BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|LastName|Suffix|EmailPromotion|AdditionalContactInfo|Demographics|rowguid|ModifiedDate|
+----------------+----------+---------+-----+---------+----------+--------+------+--------------+---------------------+------------+-------+------------+
|               0|         0|        0|    0|        0|         0|       0|     0|             0|                    0|           0|      0|           0|
+----------------+----------+---------+-----+---------+----------+--------+------+--------------+---------------------+------------+-------+------------+

+---------+----+-------------+--------+-----------------+-----+----------------+------------+------------+---------+----+-------------------+---------------------+------+-----------------+-----------+-----+-----+-----------------

In [11]:
#Preenchimento de valores nulos com um valor padrão

df1_filled = df1.fillna('N/A')
df2_filled = df2.fillna('N/A')
df3_filled = df3.fillna('N/A')
df4_filled = df4.fillna('N/A')
df5_filled = df5.fillna('N/A')
df6_filled = df6.fillna('N/A')




In [12]:
df3_filled.show()

+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|CustomerID|PersonID|StoreID|TerritoryID|AccountNumber|             rowguid|        ModifiedDate|
+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|         1|    NULL|    934|          1|   AW00000001|3F5AE95E-B87D-4AE...|2014-09-12 11:15:...|
|         2|    NULL|   1028|          1|   AW00000002|E552F657-A9AF-4A7...|2014-09-12 11:15:...|
|         3|    NULL|    642|          4|   AW00000003|130774B1-DB21-4EF...|2014-09-12 11:15:...|
|         4|    NULL|    932|          4|   AW00000004|FF862851-1DAA-404...|2014-09-12 11:15:...|
|         5|    NULL|   1026|          4|   AW00000005|83905BDC-6F5E-4F7...|2014-09-12 11:15:...|
|         6|    NULL|    644|          4|   AW00000006|1A92DF88-BFA2-467...|2014-09-12 11:15:...|
|         7|    NULL|    930|          1|   AW00000007|03E9273E-B193-448...|2014-09-12 11:15:...|
|         8|    NULL

In [13]:
#Preenchimento de valores nulos na coluna 'PersonID' com -1
df3_filled = df3_filled.fillna({'PersonID': -1})

#Verificar se os valores nulos foram corrigidos
df3_filled.show()

+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|CustomerID|PersonID|StoreID|TerritoryID|AccountNumber|             rowguid|        ModifiedDate|
+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|         1|    NULL|    934|          1|   AW00000001|3F5AE95E-B87D-4AE...|2014-09-12 11:15:...|
|         2|    NULL|   1028|          1|   AW00000002|E552F657-A9AF-4A7...|2014-09-12 11:15:...|
|         3|    NULL|    642|          4|   AW00000003|130774B1-DB21-4EF...|2014-09-12 11:15:...|
|         4|    NULL|    932|          4|   AW00000004|FF862851-1DAA-404...|2014-09-12 11:15:...|
|         5|    NULL|   1026|          4|   AW00000005|83905BDC-6F5E-4F7...|2014-09-12 11:15:...|
|         6|    NULL|    644|          4|   AW00000006|1A92DF88-BFA2-467...|2014-09-12 11:15:...|
|         7|    NULL|    930|          1|   AW00000007|03E9273E-B193-448...|2014-09-12 11:15:...|
|         8|    NULL

In [14]:
"""Foi descoberto que, neste caso, "NULL" não é um espaço em branco ou mal formatado, é apenas uma
palavra digitada para idencificar que a informação está auzente. Vou substituir a palavra "NULL" por 
uma string vazia."""

# Lista de colunas do DataFrame
columnsdf3 = df3_filled.columns

#Iterar sobre as colunas e aplicar a substituição para 'NULL' e 'null'
for col_name in columnsdf3:
    df3_filled = df3_filled.withColumn(col_name, \
                     when((col(col_name) == 'NULL') | (col(col_name) == 'null'), '') \
                     .otherwise(col(col_name)))




In [15]:
#Verificar se os valores nulos foram corrigidos
df3_filled.show()

+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|CustomerID|PersonID|StoreID|TerritoryID|AccountNumber|             rowguid|        ModifiedDate|
+----------+--------+-------+-----------+-------------+--------------------+--------------------+
|         1|        |    934|          1|   AW00000001|3F5AE95E-B87D-4AE...|2014-09-12 11:15:...|
|         2|        |   1028|          1|   AW00000002|E552F657-A9AF-4A7...|2014-09-12 11:15:...|
|         3|        |    642|          4|   AW00000003|130774B1-DB21-4EF...|2014-09-12 11:15:...|
|         4|        |    932|          4|   AW00000004|FF862851-1DAA-404...|2014-09-12 11:15:...|
|         5|        |   1026|          4|   AW00000005|83905BDC-6F5E-4F7...|2014-09-12 11:15:...|
|         6|        |    644|          4|   AW00000006|1A92DF88-BFA2-467...|2014-09-12 11:15:...|
|         7|        |    930|          1|   AW00000007|03E9273E-B193-448...|2014-09-12 11:15:...|
|         8|        

In [16]:
#Lista de colunas do DataFrame

columnsdf1 = df1_filled.columns
columnsdf2 = df2_filled.columns
columnsdf4 = df4_filled.columns
columnsdf5 = df5_filled.columns
columnsdf6 = df6_filled.columns




In [17]:
#Iterar sobre as colunas e aplicar a substituição para 'NULL' e 'null'
for col_name in columnsdf1:
    df1_filled = df1_filled.withColumn(col_name, \
                     when((col(col_name) == 'NULL') | (col(col_name) == 'null'), '') \
                     .otherwise(col(col_name)))




In [18]:
#Iterar sobre as colunas e aplicar a substituição para 'NULL' e 'null'
for col_name in columnsdf2:
    df2_filled = df2_filled.withColumn(col_name, \
                     when((col(col_name) == 'NULL') | (col(col_name) == 'null'), '') \
                     .otherwise(col(col_name)))




In [19]:
#Iterar sobre as colunas e aplicar a substituição para 'NULL' e 'null'
for col_name in columnsdf4:
    df4_filled = df4_filled.withColumn(col_name, \
                     when((col(col_name) == 'NULL') | (col(col_name) == 'null'), '') \
                     .otherwise(col(col_name)))




In [20]:
#Iterar sobre as colunas e aplicar a substituição para 'NULL' e 'null'
for col_name in columnsdf5:
    df5_filled = df5_filled.withColumn(col_name, \
                     when((col(col_name) == 'NULL') | (col(col_name) == 'null'), '') \
                     .otherwise(col(col_name)))




In [21]:
#Iterar sobre as colunas e aplicar a substituição para 'NULL' e 'null'
for col_name in columnsdf6:
    df6_filled = df6_filled.withColumn(col_name, \
                     when((col(col_name) == 'NULL') | (col(col_name) == 'null'), '') \
                     .otherwise(col(col_name)))




In [22]:
#Verificar se os valores nulos foram corrigidos
df1_filled.show()
df2_filled.show()
df4_filled.show()
df5_filled.show()
df6_filled.show()

+----------------+----------+---------+-----+---------+----------+----------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|BusinessEntityID|PersonType|NameStyle|Title|FirstName|MiddleName|  LastName|Suffix|EmailPromotion|AdditionalContactInfo|        Demographics|             rowguid|       ModifiedDate|
+----------------+----------+---------+-----+---------+----------+----------+------+--------------+---------------------+--------------------+--------------------+-------------------+
|               1|        EM|        0|     |      Ken|         J|   Sánchez|      |             0|                     |"<IndividualSurve...|92C4279F-1207-48A...|2009-01-07 00:00:00|
|               2|        EM|        0|     |    Terri|       Lee|     Duffy|      |             1|                     |"<IndividualSurve...|D8763459-8AA8-47C...|2008-01-24 00:00:00|
|               3|        EM|        0|     |  Roberto|          |Tamburello|   

In [24]:
#Lugar onde os arquivos serão salvos
salvar = "s3://rodrigo-beltrao/arquivos-tratados-com-spark/"




In [25]:
#Dando nomes para subpastas onde serão salvos os arquivos

df1_nome = 'Person.Person.csv'
df2_nome = 'Production.Product.csv'
df3_nome = 'Sales.Customer.csv'
df4_nome = 'Sales.SalesOrderDetail.csv'
df5_nome = 'Sales.SalesOrderHeader.csv'
df6_nome = 'Sales.SpecialOfferProduct.csv'




In [26]:
#Salvar os DataFrames tratados no Amazon S3

df1_filled.coalesce(1).write.mode('overwrite').csv(salvar + df1_nome, header=True)
df2_filled.coalesce(1).write.mode('overwrite').csv(salvar + df2_nome, header=True)
df3_filled.coalesce(1).write.mode('overwrite').csv(salvar + df3_nome, header=True)
df4_filled.coalesce(1).write.mode('overwrite').csv(salvar + df4_nome, header=True)
df5_filled.coalesce(1).write.mode('overwrite').csv(salvar + df5_nome, header=True)
df6_filled.coalesce(1).write.mode('overwrite').csv(salvar + df6_nome, header=True)


