## Databricks Optical Sales Notebook

In [0]:
import pandas as pd
from pyspark.sql.functions import col, regexp_replace, cast, substring, desc, when, upper, format_number, countDistinct, to_date, count

## Starting Bronze Layer

In [0]:
# Getting all files names
all_files = dbutils.fs.ls('dbfs:/FileStore/tables/')

# Getting CSV sales files and organizing by date (recent first) - Only SBC
sales_files = [file for file in all_files if file.name.startswith('exportacao_venda') and file.name.endswith('sbc.csv')]
sales_files = sorted(sales_files, key=lambda file: file.modificationTime, reverse=True)

# Getting CSV product files and organizing by date (recent first)
product_files = [file for file in all_files if file.name.startswith('exportacao_produto') and file.name.endswith('csv')]
product_files = sorted(product_files, key=lambda file: file.modificationTime, reverse=True)

# Getting CSV customers files and organizing by date (recent first)
customers_files = [file for file in all_files if file.name.startswith('clientes') and file.name.endswith('csv')]
customers_files = sorted(customers_files, key=lambda file: file.modificationTime, reverse=True)

In [0]:
bronze_sales = spark.read.csv(sales_files[0].path, inferSchema=True, header=True, encoding='latin1')
bronze_products = spark.read.csv(product_files[0].path, inferSchema=True, header=True, encoding='latin1')
bronze_customers = spark.read.csv(customers_files[0].path, inferSchema=True, header=True, encoding='latin1')

### Finishing Bronze Layer
## Starting Silver Layer

In [0]:
# SALES DF
# Selecting only required columns
silver_sales = bronze_sales.select(['Número Venda',
 'Data',
 'Número OS',
 'Funcionário',
 'Cliente',
 'Item - Referência',
 'Item - Descrição',
 'Item - Quantidade',
 'Item - Valor Original',
 'Item - Valor Ajuste',
 'Item - Valor Total Líquido'])

# Renaming columns
silver_sales = silver_sales.withColumnsRenamed(
    {
        'Número Venda':'OrderID',
        'Data':'Date',
        'Número OS':'OS_number',
        'Funcionário':'Employee',
        'Cliente':'Customer',
        'Item - Referência':'Ref',
        'Item - Descrição':'Description',
        'Item - Quantidade':'Qtd',
        'Item - Valor Original':'Original_price',
        'Item - Valor Ajuste':'Price_adjustment',
        'Item - Valor Total Líquido':'Final_price'
    }
)

# Droping Null values (all null lines)
silver_sales = silver_sales.dropna('all')

# Adjusting values ponctuation and Columns types
silver_sales = silver_sales.withColumn('Original_price', regexp_replace('Original_price', ',', '.'))\
    .withColumn('Qtd', regexp_replace('Qtd', ',', '.'))\
    .withColumn('Price_adjustment', regexp_replace('Price_adjustment', ',', '.'))\
    .withColumn('Final_price', regexp_replace('Final_price', ',', '.'))\
    .withColumn('Original_price', col('Original_price').cast('float'))\
    .withColumn('Qtd', col('Qtd').cast('int'))\
    .withColumn('Price_adjustment', col('Price_adjustment').cast('float'))\
    .withColumn('Final_price', col('Final_price').cast('float'))
    
# Filling Null values with "0" in the Price_adjusting column to improve analisys
silver_sales = silver_sales.fillna({'Price_adjustment':0})

# Transforming to pandas for filling empty values ​​with ffill method
silver_sales = silver_sales.toPandas()
silver_sales.fillna(method='ffill', inplace=True)

# Creating Spark DF with modified DF
silver_sales = spark.createDataFrame(silver_sales)

# Creating new columns (Month, Year and Price Difference)
silver_sales = silver_sales.withColumn('Month', substring('date', 4, 2))\
    .withColumn('Year', substring('date', 7, 4))
silver_sales = silver_sales.withColumn('Price_diff', col('Final_price') - col('Original_price'))

# Adjusting Refs to uppercase (to avoid typing mismatches)
silver_sales = silver_sales.withColumn('Ref', upper('Ref'))

# Cleaning Year column (removing erroneously declared data)
silver_sales = silver_sales.where(col('OrderID') != 'ARMAÇÃO DA HOLY')
silver_sales = silver_sales.where(col('Year') != 'OBRO')
silver_sales = silver_sales.where(col('Year') != '-0')
silver_sales = silver_sales.where(col('Year') != '')

In [0]:
# PRODUCT DF
# Selecting only required columns
silver_products = bronze_products.select([
 'Referência',
 'Descrição',
 'Fornecedor',
 'Grupo',
 'Grife',
 'Preco de custo',
 'Preco de Venda'
])

# Renaming columns
silver_products = silver_products.withColumnsRenamed(
    {
        'Referência':'Ref',
        'Descrição':'Description',
        'Fornecedor':'Supplier',
        'Grupo':'Group',
        'Grife':'Brand',
        'Preco de custo':'Cost_price',
        'Preco de Venda':'Sale_price'
    }
)

# Droping Null values (all null lines)
silver_products = silver_products.dropna(how='all')

# Adjusting values ponctuation and Columns types
silver_products = silver_products.withColumn('Cost_price', regexp_replace(col('Cost_price'), ',', '.'))\
    .withColumn('Sale_price', regexp_replace(col('Sale_price'), ',', '.'))\
    .withColumn('Cost_price', col('Cost_price').cast('float'))\
    .withColumn('Sale_price', col('Sale_price').cast('float'))

# Adding Margin column
silver_products = silver_products.withColumn('Margin', col('Sale_price') - col('Cost_price'))

# Adjusting Refs to uppercase (to avoid typing mismatches)
silver_products = silver_products.withColumn('Ref', upper('Ref'))

In [0]:
# CUSTOMERS DF
# Selecting only required columns
silver_customers = bronze_customers.select(['Documento',
 'Nome / Razão Social',
 'Idade',
 'Número de Vendas',
 'Última Venda',
 'Valor Total de Vendas'])

# Renaming columns
silver_customers = silver_customers.withColumnsRenamed(
        {'Documento':'ID',
        'Nome / Razão Social':'Name',
        'Idade':'Age',
        'Número de Vendas':'Sales_qtd',
        'Última Venda':'Last_sale',
        'Valor Total de Vendas':'Total_sales'}
)

# Droping Null values (all null lines)
silver_customers = silver_customers.dropna('all')

# Adjusting values ponctuation and Columns types
silver_customers = silver_customers.withColumn('Total_sales', regexp_replace(col('Total_sales'), ',', '.'))\
    .withColumn('Total_sales', col('Total_sales').cast('float'))

# Creating new columns
silver_customers = silver_customers.withColumn('Last_year_sale', substring(col('Last_sale'), 7, 4))

# Filling ID Null values with "ND" (Not Declared)
silver_customers = silver_customers.fillna({'ID':'ND'})

### Finishing Silver Layer
## Starting Gold Layer

In [0]:
# Checking sales by Year
sales_year = silver_sales.groupBy('Year').sum('Final_price').orderBy(desc('sum(Final_price)')).withColumnRenamed('sum(Final_price)', 'Total Sales')\
    .withColumn('Total Sales', format_number(col('Total Sales'), 2))

sales_year.show()
# We can see a growth in sales over the years, especially from 2021 to 2022.

+----+------------+
|Year| Total Sales|
+----+------------+
|2023|1,286,711.48|
|2022|1,191,250.71|
|2021|  831,866.70|
|2024|  792,268.80|
|2020|  638,617.72|
|2019|      100.00|
+----+------------+



In [0]:
# Top10 Best Selling Products
top_best_selling = silver_sales.groupBy('Ref', 'Description').agg({'Qtd':'count'}).orderBy(desc('count(Qtd)')).withColumnRenamed('count(Qtd)', 'Total Qtd Sold')

top_best_selling.show(10, truncate=False)

In [0]:
# 20 Best Customers (of all time)
best_customers = silver_sales.where(col('Customer') != '--- Cliente não informado ---')\
.groupBy('Customer').agg({'Final_price':'sum'}).orderBy(desc('sum(Final_price)'))\
.withColumn('sum(Final_price)', format_number(col('sum(Final_price)'), 2))

best_customers.show(20, truncate=False)

In [0]:
# Recurring customers (bought more times)
recurring_customers = silver_sales.filter((col('Customer') != '--- Cliente não informado ---') & (col('Customer') != 'Cliente Padrão') & (col('Customer') != 'Consumidor'))\
.groupBy('Customer').agg(countDistinct('OrderID').alias('Transaction_count')).orderBy(desc('Transaction_count'))

recurring_customers.show(truncate=False)

In [0]:
# Best salesperson in 2024
employee_2024 = silver_sales.filter(col('Year')== '2024').groupBy("Employee").agg({'Final_price':'sum'}).withColumnRenamed('sum(Final_price)', 'Total_sales').orderBy(desc('Total_sales')).withColumn('Total_sales', format_number('Total_sales', 2))
employee_2024.show()

+-----------+-----------+
|   Employee|Total_sales|
+-----------+-----------+
|Matheus Sbc| 336,049.52|
|       Alef| 248,516.09|
|  Renan Sbc| 101,220.92|
| Amanda Sbc|  53,968.17|
|Vendas Holy|  31,026.00|
|Juliana Sbc|  21,488.10|
+-----------+-----------+



In [0]:
# Viewing which products have the biggest discounts
biggest_discounts = silver_sales.groupBy('Ref', 'Description').agg({'Price_diff':'sum'})
biggest_discounts = biggest_discounts.orderBy(('sum(Price_diff)')).withColumn('sum(Price_diff)', format_number(col('sum(Price_diff)'), 2))
biggest_discounts.show(truncate=False)

+-----+-----------------------------------------------------+---------------+
|Ref  |Description                                          |sum(Price_diff)|
+-----+-----------------------------------------------------+---------------+
|Z262 |Zeiss Antirreflexo Silver                            |-7,099.59      |
|Z187 |Zeiss Progressive Light 3Dv Blueguard 1.50           |-6,320.95      |
|Z93  |Zeiss Visão Simples Acabadas Clear View Platinum 1.50|-6,189.61      |
|Z167 |Zeiss Progressive Smartlife Essential Blueguard 1.50 |-5,343.84      |
|Z260 |Zeiss Antirreflexo Blueprotect                       |-4,425.92      |
|Z261 |Zeiss Antirreflexo Platinum                          |-3,932.85      |
|HB010|Hb Vision 1.56 Blue Light Uv                         |-3,835.89      |
|Z89  |Zeiss Visão Simples Acabadas Clear View Silver 1.50  |-3,779.09      |
|Z197 |Zeiss Progressive Light 3D Blueguard 1.50            |-3,653.39      |
|Z92  |Zeiss Visão Simples Acabadas Clear View Silver 1.67  |-3,

### Finishing Gold Layer

## Saving Dataframes

In [0]:
# Saving DataFrame as CSV in DBFS
df_vendas.write.mode('overwrite').csv("/dbfs/FileStore/df_vendas.csv", header=True)
df_produtos.write.mode('overwrite').csv("/dbfs/FileStore/df_produtos.csv", header=True)
df_client.write.mode('overwrite').csv("/dbfs/FileStore/df_clientes.csv", header=True)