In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import *
from pyspark.sql import functions as Func
import datetime

In [2]:
# Fazendo a leitura dos dados da tabela raw (Brazilian E-Commerce Public Dataset by Olist.csv)
df = spark.read.format("csv").load("/home/anthony/Brazilian E-Commerce Public Dataset by Olist.csv", header = True)
df.schema

                                                                                

StructType([StructField('_c0', StringType(), True), StructField('order_id', StringType(), True), StructField('order_item_id', StringType(), True), StructField('customer_id', StringType(), True), StructField('customer_unique_id', StringType(), True), StructField('customer_zip_code_prefix', StringType(), True), StructField('customer_city', StringType(), True), StructField('customer_state', StringType(), True), StructField('product_id', StringType(), True), StructField('product_category_name', StringType(), True), StructField('product_name_lenght', StringType(), True), StructField('product_description_lenght', StringType(), True), StructField('product_photos_qty', StringType(), True), StructField('product_weight_g', StringType(), True), StructField('product_length_cm', StringType(), True), StructField('product_height_cm', StringType(), True), StructField('product_width_cm', StringType(), True), StructField('seller_id', StringType(), True), StructField('seller_city', StringType(), True), S

In [3]:
# Criamos o primeiro tratamento, gerando um novo DF com as colunas que iremos trabalhar e gerar os insights
dff = df.select("order_id","customer_id","customer_unique_id","customer_city","customer_state","product_category_name","seller_city","seller_state","payment_type","payment_installments","price","freight_value","payment_value","order_purchase_timestamp","order_approved_at","day_of_purchase","month_of_purchase","year_of_purchase","order_unique_id")

# Modificamos as colunas com data para tipo timestamp e assim conseguirmos alterar somente o ano dentro da coluna.
dff = dff.withColumn("order_approved_at", to_timestamp(Func.col("order_approved_at"), "yyyy-MM-dd HH:mm:ss"))
dff = dff.withColumn("order_purchase_timestamp", to_timestamp(Func.col("order_purchase_timestamp"), "yyyy-MM-dd HH:mm:ss"))

In [4]:
# Definimos uma função(def) para conseguir alterar o ano de 2018 para 2023
def change_year2018(date):
    if date.year == 2018:
        return datetime.datetime(2023, date.month, date.day, date.hour, date.minute, date.second)
    return date

# Definimos uma função(def) para conseguir alterar o ano de 2017 para 2022
def change_year2017(date):
    if date.year == 2017:
        return datetime.datetime(2022, date.month, date.day, date.hour, date.minute, date.second)
    return date

# Definimos uma função(def) para conseguir alterar o ano de 2016 para 2021
def change_year2016(date):
    if date.year == 2016:
        return datetime.datetime(2021, date.month, date.day, date.hour, date.minute, date.second)
    return date

# Definido as funçes criamos as variaveis separadas de tranformação de 2018, 2017 e 2016
change_year_udf2018 = udf(change_year2018, TimestampType())
change_year_udf2017 = udf(change_year2017, TimestampType())
change_year_udf2016 = udf(change_year2016, TimestampType())

In [5]:
# Com o withColumn alteramos as colunas sobrescrevendo seus valores com o ano atualizado definido na funçao
dff = dff.withColumn("order_approved_at", change_year_udf2018(col("order_approved_at")))
dff = dff.withColumn("order_approved_at", change_year_udf2017(col("order_approved_at")))
dff = dff.withColumn("order_approved_at", change_year_udf2016(col("order_approved_at")))

In [6]:
dff = dff.withColumn("order_purchase_timestamp", change_year_udf2018(col("order_purchase_timestamp")))
dff = dff.withColumn("order_purchase_timestamp", change_year_udf2017(col("order_purchase_timestamp")))
dff = dff.withColumn("order_purchase_timestamp", change_year_udf2016(col("order_purchase_timestamp")))

In [7]:
# Utilizamos o withColumn para atualizar o ano da coluna "year_of_purchase'" e deixar o mesmo que as outras

dff = dff.withColumn("year_of_purchase", when(col("year_of_purchase") == "2018", "2023").otherwise(col("year_of_purchase")))
dff = dff.withColumn("year_of_purchase", when(col("year_of_purchase") == "2017", "2022").otherwise(col("year_of_purchase")))
dff = dff.withColumn("year_of_purchase", when(col("year_of_purchase") == "2016", "2021").otherwise(col("year_of_purchase")))

In [8]:
# Alteramos a coluna "day_of_purchase" para o dia do ano correto, ja que o dia em 2017 e um e em 2022 e outro.
dff = dff.withColumn("day_of_purchase", date_format(col("order_approved_at"), "EEEE"))

In [9]:
# Alteramos a formataço da coluna para / inves de -
#dff = dff.withColumn("order_approved_at", date_format(col("order_approved_at"), "yyyy/MM/dd HH:mm:ss"))
dff = dff.withColumn("order_purchase_timestamp", date_format(col("order_purchase_timestamp"), "yyyy/MM/dd HH:mm:ss"))

In [10]:
# salvamos a tabela já tratada com os dados que iremos utilizar.
dff.write.csv("/home/anthony/pyspark/tratamento/", header=True)
dff.printSchema()

[Stage 1:>                                                          (0 + 2) / 2]

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: string (nullable = true)
 |-- price: string (nullable = true)
 |-- freight_value: string (nullable = true)
 |-- payment_value: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- day_of_purchase: string (nullable = true)
 |-- month_of_purchase: string (nullable = true)
 |-- year_of_purchase: string (nullable = true)
 |-- order_unique_id: string (nullable = true)



                                                                                

In [None]:
dff.schema