IMPORTANDO E CRIANDO AS SESSÕES DO SPARK

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('tratando-dados-com-spark-databricks_silver').getOrCreate()
import pyspark.sql.functions as F
#importei caso necessitasse inferir o schema
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import * 

path_silver = '/FileStore/tables/silver/dummy_data_silver.csv'


TRADUZINDO OS NOMES DAS COLUNAS

In [0]:
header = [
              "idade",
              "genero",
              "tempo_gasto",
              "plataforma",
              "interesses",
              "localizacao",
              'dados_demograficos',
              'profissao',
              'renda',
              'divida',
              'possui_casa',
              'Possui_carro',      
]

#### CARREGANDO OS DADOS COM AS NOVAS COLUNAS.

In [0]:
df = (spark
            .read
            .format('csv')
            .option('header','true')
            .option('inferSchema','true')
            .option('index','false')             
            .option('delimiter',',')
            .load(path_silver)).toDF(*header)


In [0]:
# Verificando a estrutura dos dados
df.printSchema()

root
 |-- idade: integer (nullable = true)
 |-- genero: string (nullable = true)
 |-- tempo_gasto: integer (nullable = true)
 |-- plataforma: string (nullable = true)
 |-- interesses: string (nullable = true)
 |-- localizacao: string (nullable = true)
 |-- dados_demograficos: string (nullable = true)
 |-- profissao: string (nullable = true)
 |-- renda: integer (nullable = true)
 |-- divida: boolean (nullable = true)
 |-- possui_casa: boolean (nullable = true)
 |-- Possui_carro: boolean (nullable = true)



In [0]:
#Verificando se existe valores nulos na coluna 'idade'
df_nulos = df.filter("idade is Null").show()


+-----+------+-----------+----------+----------+-----------+------------------+---------+-----+------+-----------+------------+
|idade|genero|tempo_gasto|plataforma|interesses|localizacao|dados_demograficos|profissao|renda|divida|possui_casa|Possui_carro|
+-----+------+-----------+----------+----------+-----------+------------------+---------+-----+------+-----------+------------+
+-----+------+-----------+----------+----------+-----------+------------------+---------+-----+------+-----------+------------+



In [0]:
# Verificando valores duplicados

distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
distinctDF.show()
# (Não existe valores duplicados!)

Distinct count: 999
+-----+----------+-----------+----------+----------+--------------+------------------+-----------------+-----+------+-----------+------------+
|idade|    genero|tempo_gasto|plataforma|interesses|   localizacao|dados_demograficos|        profissao|renda|divida|possui_casa|Possui_carro|
+-----+----------+-----------+----------+----------+--------------+------------------+-----------------+-----+------+-----------+------------+
|   46|    female|          2|  Facebook|    Travel|United Kingdom|             Urban|          Student|10564|  true|       true|        true|
|   56|      male|          8|   YouTube|    Sports| United States|             Urban|          Student|16881|  true|       true|        true|
|   57|    female|          6| Instagram| Lifestlye|United Kingdom|             Urban|Software Engineer|11928| false|       true|       false|
|   41|non-binary|          9|   YouTube|    Travel| United States|             Rural|          Student|17743|  true|     

CRIANDO UMA FUNÇÃO PARA OBTER A DATA EM TEMPO REAL

In [0]:
# Criando a função data atual para adcionarmos na coluna de carga.
from datetime import datetime

def get_current_date():
    # Obter a data atual 
    current_date = datetime.now().date()
    return current_date
# Testando a função
time = get_current_date()
# Adcionando uma coluna de carga no dataframe.
df = df.withColumn("date", lit(time))
df.show()


+-----+----------+-----------+----------+----------+--------------+------------------+-----------------+-----+------+-----------+------------+----------+
|idade|    genero|tempo_gasto|plataforma|interesses|   localizacao|dados_demograficos|        profissao|renda|divida|possui_casa|Possui_carro|      date|
+-----+----------+-----------+----------+----------+--------------+------------------+-----------------+-----+------+-----------+------------+----------+
|   46|    female|          2|  Facebook|    Travel|United Kingdom|             Urban|          Student|10564|  true|       true|        true|2024-04-16|
|   32|      male|          8| Instagram|    Sports|     Australia|         Sub_Urban| Marketer Manager|13258| false|      false|       false|2024-04-16|
|   60|non-binary|          5| Instagram|    Travel|United Kingdom|             Urban|          Student|12500| false|       true|       false|2024-04-16|
|   25|      male|          1| Instagram| Lifestlye|     Australia|         

In [0]:
# caminho do diretório da camada GOLD
path_gold = '/FileStore/tables/gold/dummy_data_gold.csv'

In [0]:
# Salvando o DataFrame no caminho especificado com o cabeçalho incluso
df.write.csv(path_gold, mode="overwrite", header=True)

###Verificando se os dados foram transferidos com sucesso.

In [0]:
dbutils.fs.rm("/FileStore/tables/gold/dummy_test_gold.csv/", True)


Out[92]: True

In [0]:
%fs ls /FileStore/tables/gold/

path,name,size,modificationTime
dbfs:/FileStore/tables/gold/dummy_data_gold.csv/,dummy_data_gold.csv/,0,0
