In [0]:
import pandas as pd
from pyspark.sql.functions import col, regexp_replace, cast, substring, desc, when, upper, format_number, countDistinct, to_date, count


In [0]:
# Starting Bronze Layer

In [0]:
# Getting all files names
all_files = dbutils.fs.ls('dbfs:/FileStore/tables/')

# Getting CSV sales files and organizing by date (recent first) - Only SBC
sales_files = [file for file in all_files if file.name.startswith('exportacao_venda') and file.name.endswith('sbc.csv')]
sales_files = sorted(sales_files, key=lambda file: file.modificationTime, reverse=True)

# Getting CSV product files and organizing by date (recent first)
product_files = [file for file in all_files if file.name.startswith('exportacao_produto') and file.name.endswith('csv')]
product_files = sorted(product_files, key=lambda file: file.modificationTime, reverse=True)

# Getting CSV customers files and organizing by date (recent first)
customers_files = [file for file in all_files if file.name.startswith('clientes') and file.name.endswith('csv')]
customers_files = sorted(customers_files, key=lambda file: file.modificationTime, reverse=True)

In [0]:
bronze_sales = spark.read.csv(sales_files[0].path, inferSchema=True, header=True, encoding='latin1')
bronze_products = spark.read.csv(product_files[0].path, inferSchema=True, header=True, encoding='latin1')
bronze_customers = spark.read.csv(customers_files[0].path, inferSchema=True, header=True, encoding='latin1')

In [0]:
# Finishing Bronze Layer
# Starting Silver Layer

In [0]:
# SALES DF

# Selecting only required columns
silver_sales = bronze_sales.select(['Número Venda',
 'Data',
 'Número OS',
 'Funcionário',
 'Cliente',
 'Documento',
 'Telefones',
 'Item - Referência',
 'Item - Descrição',
 'Item - Quantidade',
 'Item - Valor Original',
 'Item - Valor Total Líquido'])

# Renaming columns
silver_sales = silver_sales.withColumnsRenamed(
    {
        'Número Venda':'OrderID',
        'Data':'Date',
        'Número OS':'OS_number',
        'Funcionário':'Employee',
        'Cliente':'Customer',
        'Documento':'CustomerID',
        'Telefones':'Contact',
        'Item - Referência':'Ref',
        'Item - Descrição':'Description',
        'Item - Quantidade':'Qtd',
        'Item - Valor Original':'Original_price',
        'Item - Valor Total Líquido':'Final_price'
    }
)

# Droping Null values (all null lines)
silver_sales = silver_sales.dropna('all')

# Adjusting values ponctuation and Columns types
silver_sales = silver_sales.withColumn('Original_price', regexp_replace('Original_price', ',', '.'))\
    .withColumn('Qtd', regexp_replace('Qtd', ',', '.'))\
    .withColumn('Final_price', regexp_replace('Final_price', ',', '.'))\
    .withColumn('Original_price', col('Original_price').cast('float'))\
    .withColumn('Qtd', col('Qtd').cast('int'))\
    .withColumn('Final_price', col('Final_price').cast('float'))

# Transforming to pandas for filling empty values ​​with ffill method
silver_sales = silver_sales.toPandas()
silver_sales.fillna(method='ffill', inplace=True)

# Creating Spark DF with modified DF
silver_sales = spark.createDataFrame(silver_sales)

# Creating new columns (Month, Year, Total Price and Price Difference (Discounts and additions))
silver_sales = silver_sales.withColumn('Month', substring('date', 4, 2))\
    .withColumn('Year', substring('date', 7, 4))

silver_sales = silver_sales.withColumn('Total_price', col('Qtd') * col('Original_price'))
silver_sales = silver_sales.withColumn('Price_diff', col('Final_price') - col('Total_price'))

# Rearranging columns
silver_sales = silver_sales.select(['OrderID',
 'Date',
 'OS_number',
 'Employee',
 'Customer',
 'CustomerID',
 'Contact',
 'Ref',
 'Description',
 'Qtd',
 'Original_price',
 'Total_price',
 'Price_diff',
 'Final_price',
 'Month',
 'Year'])

# Adjusting Refs to uppercase (to avoid typing mismatches)
silver_sales = silver_sales.withColumn('Ref', upper('Ref'))

# Cleaning Year column (removing erroneously declared data)
silver_sales = silver_sales.where(col('OrderID') != 'ARMAÇÃO DA HOLY')
silver_sales = silver_sales.where(col('Year') != 'OBRO')
silver_sales = silver_sales.where(col('Year') != '-0')
silver_sales = silver_sales.where(col('Year') != '')

# Creating Sales Table Temp View
silver_sales.createOrReplaceTempView('temp_silver_sales')

In [0]:
# PRODUCT DF

# Removing duplicates with emphasis on the Ref column
silver_products = bronze_products.dropDuplicates().dropDuplicates(['Referência'])

# Selecting only required columns
silver_products = silver_products.select([
 'Referência',
 'Descrição',
 'Fornecedor',
 'Grupo',
 'Subgrupo',
 'Grife',
 'Preco de custo',
 'Preco de Venda'
])

# Renaming columns
silver_products = silver_products.withColumnsRenamed(
    {
        'Referência':'Ref',
        'Descrição':'Description',
        'Fornecedor':'Supplier',
        'Grupo':'Group',
        'Subgrupo':'Style',
        'Grife':'Brand',
        'Preco de custo':'Cost_price',
        'Preco de Venda':'Sale_price'
    }
)

# Droping Null values (all null lines)
silver_products = silver_products.dropna(how='all')

# Adjusting values ponctuation and Columns types
silver_products = silver_products.withColumn('Cost_price', regexp_replace(col('Cost_price'), ',', '.'))\
    .withColumn('Sale_price', regexp_replace(col('Sale_price'), ',', '.'))\
    .withColumn('Cost_price', col('Cost_price').cast('float'))\
    .withColumn('Sale_price', col('Sale_price').cast('float'))

# Adding Margin column
silver_products = silver_products.withColumn('Margin', col('Sale_price') - col('Cost_price'))

# Adjusting Refs to uppercase (to avoid typing mismatches)
silver_products = silver_products.withColumn('Ref', upper('Ref'))

# Checking for any duplicate Ref
total_count = silver_products.count()
distinct_count = silver_products.select("Ref").distinct().count()

if total_count == distinct_count:
    print("All values ​​in 'Ref' are unique.")
else:
    print(f"There are {total_count - distinct_count} duplicates in column 'Ref'.")

# Creating Products Table Temp View
silver_products.createOrReplaceTempView('temp_silver_products')

All values ​​in 'Ref' are unique.


In [0]:
# CUSTOMERS DF

# Removing duplicates
silver_customers = bronze_customers.dropDuplicates()

# Selecting only required columns
silver_customers = silver_customers.select(['Documento',
 'Nome / Razão Social',
 'Idade',
 'Número de Vendas',
 'Última Venda',
 'Valor Total de Vendas'])

# Renaming columns
silver_customers = silver_customers.withColumnsRenamed(
        {'Documento':'CustomerID',
        'Nome / Razão Social':'Name',
        'Idade':'Age',
        'Número de Vendas':'Sales_qtd',
        'Última Venda':'Last_sale',
        'Valor Total de Vendas':'Total_sales'}
)

# Droping Null values (all null lines)
silver_customers = silver_customers.dropna('all')

# Adjusting values ponctuation and Columns types
silver_customers = silver_customers.withColumn('Total_sales', regexp_replace(col('Total_sales'), ',', '.'))\
    .withColumn('Total_sales', col('Total_sales').cast('float'))

# Creating new columns (last year sale)
silver_customers = silver_customers.withColumn('Last_year_sale', substring(col('Last_sale'), 7, 4))

# Filling ID Null values with 'ND' (Not Declared)
silver_customers = silver_customers.fillna({'CustomerID':'ND'})

# Creating Customers Table Temp View
silver_customers.createOrReplaceTempView('temp_silver_customers')

In [0]:
# Finishing Silver Layer
# Starting Gold Layer

In [0]:
%sql

-- Creating gold table using sales, customers and products tables
CREATE OR REPLACE TABLE gold_sales AS
SELECT
    sales.OrderID,
    sales.Date,
    sales.OS_number,
    sales.Employee,
    sales.Customer,
    sales.CustomerID,
    customers.Age AS Customer_Age,
    customers.Sales_qtd AS Customer_Sales_qtd,
    customers.Last_sale AS Customer_Last_sale,
    customers.Total_sales AS Customer_Total_sales,
    customers.Last_year_sale AS Customer_Last_year_sale,
    sales.Ref AS Product_Ref,
    sales.Description AS Product_Description,
    products.Supplier AS Product_Supplier,
    products.Group AS Product_Group,
    products.Style AS Product_Style,
    products.Brand AS Product_Brand,
    products.Cost_price AS Product_Cost_price,
    products.Sale_price AS Product_Sale_price,
    products.Margin AS Product_Margin,
    sales.Qtd,
    sales.Original_price,
    sales.Total_price,
    sales.Price_diff,
    sales.Final_price,
    sales.Month,
    sales.Year
FROM
    temp_silver_sales AS sales
JOIN
    temp_silver_customers AS customers
ON
    sales.CustomerID = customers.CustomerID
JOIN
    temp_silver_products AS products
ON
    sales.Ref = products.Ref


num_affected_rows,num_inserted_rows


In [0]:
%sql

-- Sales by Year (in early november 2024)
SELECT Year, FORMAT_NUMBER(ROUND(SUM(Final_Price)), 0) AS Total_Sales
FROM gold_sales
GROUP BY Year
ORDER BY Year DESC


Year,Total_Sales
2024,1119996
2023,1184269
2022,1138643
2021,777015
2020,548524


In [0]:
%sql

-- Consumers with larger purchase quantities
SELECT SPLIT(Customer, ' ')[0] || ' Censured' AS Censured_name, COUNT(OrderID) AS total_purchases
FROM gold_sales
WHERE Customer NOT LIKE "Cliente Padrão"
GROUP BY Customer
ORDER BY total_purchases DESC
LIMIT 10


Censured_name,total_purchases
Alexandre Censured,22
Shirlei Censured,19
Alexandre Censured,19
Maria Censured,18
Ingrid Censured,18
Fabiana Censured,17
Dionísio Censured,17
Priscila Censured,17
Rita Censured,16
Soraia Censured,16


In [0]:
%sql

-- Checking which Style sells the most
SELECT Product_Style, COUNT(OrderID) as Total_Count FROM gold_sales
WHERE Product_Style IS NOT NULL
GROUP BY Product_Style
ORDER BY Total_Count DESC

Product_Style,Total_Count
Quadrado,271
Redondo,225
Gatinho Quadrado,58
Gatinho,54
Quadrado Nylon,36
Gatinho Redondo,33
Aviador,31
Redondo Titanium,28
Gatinho Balgriff,26
Gatinho Quadrado Titanium,18
