In [0]:
spark

###Mostrando os pontos de montagem no cluster Databricks

In [0]:
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/Volumes,UnityCatalogVolumes,
/mnt/datalake7a68c04c876ba15d/bronze,wasbs://bronze@datalake7a68c04c876ba15d.blob.core.windows.net,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/datalake7a68c04c876ba15d/silver,wasbs://silver@datalake7a68c04c876ba15d.blob.core.windows.net,
/mnt/datalake7a68c04c876ba15d/landing-zone,wasbs://landing-zone@datalake7a68c04c876ba15d.blob.core.windows.net,
/mnt/datalake7a68c04c876ba15d/gold,wasbs://gold@datalake7a68c04c876ba15d.blob.core.windows.net,
/Volume,DbfsReserved,


### Acessando nome da Storage Account

In [0]:
storageAccountName = "datalake7a68c04c876ba15d"

###Desmontando os pontos de montagem não utilizados

In [0]:
dbutils.fs.unmount(f'/mnt/{storageAccountName}/landing-zone')

/mnt/datalake7a68c04c876ba15d/landing-zone has been unmounted.


True

###Mostrando os pontos de montagem no cluster Databricks

In [0]:
display(dbutils.fs.mounts())

mountPoint,source,encryptionType
/databricks-datasets,databricks-datasets,
/Volumes,UnityCatalogVolumes,
/mnt/datalake7a68c04c876ba15d/bronze,wasbs://bronze@datalake7a68c04c876ba15d.blob.core.windows.net,
/databricks/mlflow-tracking,databricks/mlflow-tracking,
/databricks-results,databricks-results,
/databricks/mlflow-registry,databricks/mlflow-registry,
/mnt/datalake7a68c04c876ba15d/silver,wasbs://silver@datalake7a68c04c876ba15d.blob.core.windows.net,
/mnt/datalake7a68c04c876ba15d/gold,wasbs://gold@datalake7a68c04c876ba15d.blob.core.windows.net,
/Volume,DbfsReserved,
/volumes,DbfsReserved,


## Transformações de dados

### Mostrando todos os arquivos da camada bronze

In [0]:
display(dbutils.fs.ls(f"/mnt/{storageAccountName}/bronze"))

path,name,size,modificationTime
dbfs:/mnt/datalake7a68c04c876ba15d/bronze/agendamentos/,agendamentos/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/bronze/cargas/,cargas/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/bronze/clientes/,clientes/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/bronze/motoristas/,motoristas/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/bronze/rotas/,rotas/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/bronze/veiculos/,veiculos/,0,0


###Gerando um dataframe dos delta lake no container bronze do Azure Data Lake Storage

In [0]:
df_agendamentos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/agendamentos")
df_cargas = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/cargas")
df_clientes = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/clientes")
df_motoristas = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/motoristas")
df_rotas = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/rotas")
df_veiculos = spark.read.format('delta').load(f"/mnt/{storageAccountName}/bronze/veiculos")

### Adicionando metadados de data e hora de processamento e nome do arquivo de origem

In [0]:
from pyspark.sql.functions import current_timestamp, lit

df_agendamentos   = df_agendamentos.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("agendamentos"))
df_cargas     = df_cargas.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("cargas"))
df_clientes   = df_clientes.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("clientes"))
df_motoristas  = df_motoristas.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("motoristas"))
df_rotas    = df_rotas.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("rotas"))
df_veiculos     = df_veiculos.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_arquivo", lit("veiculos"))


###Mudando as colunas para conter somente letras minúsculas e retirando abreviações

In [0]:
import re

def atualizar_colunas(df):
    colunas = df.columns
    for coluna in colunas:
        if coluna.lower() == coluna:
            continue
        nova_coluna = coluna
        if "ID" in nova_coluna:
            position = nova_coluna.find("ID")
            nova_coluna = "Codigo"+nova_coluna[:position]
        nova_coluna = "_".join(re.findall('[A-Z][^A-Z]*', nova_coluna)).lower()
        df = df.withColumnRenamed(coluna, nova_coluna)
    return df

df_agendamentos = atualizar_colunas(df_agendamentos)
df_cargas = atualizar_colunas(df_cargas)
df_clientes = atualizar_colunas(df_clientes)
df_motoristas = atualizar_colunas(df_motoristas)
df_rotas = atualizar_colunas(df_rotas)
df_veiculos = atualizar_colunas(df_veiculos)

### Separando endereço do cliente

In [0]:
import pyspark.pandas as ps
ps.set_option('compute.ops_on_diff_frames', True)

pd_clientes = ps.DataFrame(df_clientes)

def separar_endereco(r):
    endereco = r['endereco_cliente']
    endereco = endereco.split(',')
    if len(endereco) == 4:
        logradouro = endereco[0].strip()
        numero_residencia = endereco[1].strip()
        bairro = endereco[2].strip()
        cep_cidade_uf = endereco[3].strip()
    else:
        logradouro = endereco[0].strip()
        numero_residencia = "S/N"
        bairro = endereco[1].strip()
        cep_cidade_uf = endereco[2].strip()
    
    cep_cidade_uf = cep_cidade_uf.split('/')
    cep_cidade = cep_cidade_uf[0].strip()
    uf = cep_cidade_uf[1].strip()
    cep = cep_cidade.split(' ')[0].strip()
    cep = ''.join(cep.split('-'))
    cidade = cep_cidade[len(cep) + 1:].strip()

    return [logradouro, numero_residencia, bairro, cep, cidade, uf]

endereco = pd_clientes.apply(separar_endereco, axis=1)

endereco_df = ps.DataFrame(endereco.tolist(), columns=['logradouro_cliente', 'numero_residencia_cliente', 'bairro_cliente', 'cep_cliente', 'cidade_cliente', 'uf_cliente'])
pd_clientes = ps.concat([pd_clientes, endereco_df], axis=1)
pd_clientes = pd_clientes.drop(columns="endereco_cliente")
pd_clientes.columns

Index(['codigo_cliente', 'nome_cliente', 'contato_cliente', 'tipo_cliente',
       'data_hora_bronze', 'nome_arquivo', 'data_hora_silver',
       'logradouro_cliente', 'numero_residencia_cliente', 'bairro_cliente',
       'cep_cliente', 'cidade_cliente', 'uf_cliente'],
      dtype='object')

### Padronizando colunas de contato para somente número

In [0]:
def padronizar_contato(r, coluna):
    contato = r[coluna]
    contato = re.sub('[^0-9]', '', contato)
    return contato

pd_clientes["contato_cliente"] = pd_clientes.apply(lambda row: padronizar_contato(row, "contato_cliente"), axis=1)

pd_motoristas = ps.DataFrame(df_motoristas)
pd_motoristas["telefone"] = pd_motoristas.apply(lambda row: padronizar_contato(row, "telefone"), axis=1)


In [0]:

df_clientes = pd_clientes.to_spark()
df_clientes.select(["contato_cliente", "cidade_cliente"]).show(5)

+---------------+----------------+
|contato_cliente|  cidade_cliente|
+---------------+----------------+
|    07178156593|  Brito de Minas|
|     3171484185|Borges de Barros|
|     4133969477| Santos do Galho|
|  5506198910139|         Sampaio|
|    03156208709|Azevedo de Goiás|
+---------------+----------------+
only showing top 5 rows



In [0]:
df_motoristas = pd_motoristas.to_spark()
df_motoristas.select("telefone").show(5)

+-------------+
|     telefone|
+-------------+
|  02104651614|
|5508120031041|
| 552135315362|
|   7123723313|
| 555183607614|
+-------------+
only showing top 5 rows



### Mudando tempo_estimado para minutos

In [0]:
def tempo_em_minutos(r):
    tempo_estimado = r["tempo_estimado"]
    tempo_estimado = tempo_estimado.split(":")
    horas = int(tempo_estimado[0])
    minutos = int(tempo_estimado[1])
    minutos += horas * 60
    return minutos

pd_rotas = ps.DataFrame(df_rotas)
pd_rotas["tempo_estimado"] = pd_rotas.apply(tempo_em_minutos, axis=1)
df_rotas = pd_rotas.to_spark()
df_rotas.select("tempo_estimado").show(5)

+--------------+
|tempo_estimado|
+--------------+
|          1138|
|           338|
|           336|
|           289|
|          1083|
+--------------+
only showing top 5 rows



### Ajustando tipo dos dados

#### Visualizando tipos

In [0]:
print("agendamentos: ", df_agendamentos.describe(), "\n")
print("cargas: ", df_cargas.describe(), "\n") 
print("clientes: ", df_clientes.describe(), "\n") 
print("motoristas: ", df_motoristas.describe(), "\n") 
print("rotas: ", df_rotas.describe(), "\n") 
print("veiculos: ", df_veiculos.describe(), "\n")

agendamentos:  DataFrame[summary: string, codigo_agendamento: string, data_hora_coleta: string, data_hora_entrega: string, codigo_veiculo: string, codigo_carga: string, codigo_rota: string, codigo_cliente: string, nome_arquivo: string] 

cargas:  DataFrame[summary: string, codigo_carga: string, tipo_carga: string, peso_carga: string, comprimento: string, largura: string, altura: string, data_entrega_prevista: string, nome_arquivo: string] 

clientes:  DataFrame[summary: string, codigo_cliente: string, nome_cliente: string, contato_cliente: string, tipo_cliente: string, nome_arquivo: string, logradouro_cliente: string, numero_residencia_cliente: string, bairro_cliente: string, cep_cliente: string, cidade_cliente: string, uf_cliente: string] 

motoristas:  DataFrame[summary: string, codigo_motorista: string, nome: string, telefone: string, numero_carteira: string, data_contratacao: string, categoria_carteira: string, status: string, nome_arquivo: string] 

rotas:  DataFrame[summary: stri

#### Tipos Cargas

In [0]:
from pyspark.sql.types import FloatType
df_cargas = df_cargas \
  .withColumn("peso_carga" ,
              df_cargas["peso_carga"]
              .cast(FloatType()))   \
  .withColumn("comprimento",
              df_cargas["comprimento"]
              .cast(FloatType()))    \
  .withColumn("largura"  ,
              df_cargas["largura"]
              .cast(FloatType())) \
  .withColumn("altura"  ,
              df_cargas["altura"]
              .cast(FloatType())) 

#### Tipos Rotas

In [0]:
df_rotas = df_rotas \
  .withColumn("distancia" ,
              df_rotas["distancia"]
              .cast(FloatType()))   

### Apagando coluna nula do DataFrame Rotas

In [0]:
df_rotas = df_rotas.drop("restricoes_trafego")
df_rotas.columns

['codigo_rota',
 'origem',
 'destino',
 'distancia',
 'tempo_estimado',
 'data_hora_bronze',
 'nome_arquivo',
 'data_hora_silver']

###Salvando os dataframes em delta lake (formato de arquivo) no data lake (repositorio cloud)

In [0]:
df_agendamentos.write.format('delta').mode("overwrite").option("overwriteSchema", "true").save(f"/mnt/{storageAccountName}/silver/agendamentos"),
df_cargas.write.format('delta').mode("overwrite").option("overwriteSchema", "true").save(f"/mnt/{storageAccountName}/silver/cargas"),
df_clientes.write.format('delta').mode("overwrite").option("overwriteSchema", "true").save(f"/mnt/{storageAccountName}/silver/clientes"),
df_motoristas.write.format('delta').mode("overwrite").option("overwriteSchema", "true").save(f"/mnt/{storageAccountName}/silver/motoristas"),
df_rotas.write.format('delta').mode("overwrite").option("overwriteSchema", "true").save(f"/mnt/{storageAccountName}/silver/rotas"),
df_veiculos.write.format('delta').mode("overwrite").option("overwriteSchema", "true").save(f"/mnt/{storageAccountName}/silver/veiculos")

###Verificando os dados gravados em delta na camada silver

In [0]:
display(dbutils.fs.ls(f"/mnt/{storageAccountName}/silver/"))

path,name,size,modificationTime
dbfs:/mnt/datalake7a68c04c876ba15d/silver/agendamentos/,agendamentos/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/silver/cargas/,cargas/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/silver/clientes/,clientes/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/silver/motoristas/,motoristas/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/silver/rotas/,rotas/,0,0
dbfs:/mnt/datalake7a68c04c876ba15d/silver/veiculos/,veiculos/,0,0


### Lendo um exemplo de um delta lake para validar a existencia dos dados e das colunas do metadados

In [0]:
spark.read.format('delta').load(f'/mnt/{storageAccountName}/silver/agendamentos').limit(10).display()

codigo_agendamento,data_hora_coleta,data_hora_entrega,codigo_veiculo,codigo_carga,codigo_rota,codigo_cliente,data_hora_bronze,nome_arquivo,data_hora_silver
1,2019-10-22 09:24:33.884663,2019-11-14 09:24:33.884663,631,3042,1728,1471,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
2,2024-01-31 17:27:20.243187,2024-02-24 17:27:20.243187,6028,1649,977,8136,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
3,2023-08-01 10:19:28.431359,2023-08-07 10:19:28.431359,1227,4382,524,6560,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
4,2020-07-12 20:32:58.738134,2020-07-18 20:32:58.738134,131,6561,8786,781,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
5,2021-12-21 11:57:31.030714,2021-12-23 11:57:31.030714,2260,5315,5068,1075,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
6,2021-09-11 20:02:17.343064,2021-09-15 20:02:17.343064,8750,4505,1019,346,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
7,2022-11-28 15:21:53.559554,2022-12-20 15:21:53.559554,4400,9879,1682,9891,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
8,2023-09-25 02:13:43.305995,2023-10-08 02:13:43.305995,6369,2956,9396,3791,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
9,2019-07-25 15:30:45.039371,2019-08-02 15:30:45.039371,847,313,469,7102,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
10,2019-03-04 02:37:25.339560,2019-03-27 02:37:25.339560,8200,6688,9044,3516,2024-06-28T16:55:13.033Z,agendamentos,2024-06-28T17:01:52.05Z
