In [0]:
# Carregar o arquivo parquet
df_calendar_silver = spark.read.parquet("dbfs:/mnt/bronze/calendar")

In [0]:
# Confirmando que o carregamento do arquivo foi executado com sucesso

df_calendar_silver.display(5)

listing_id,date,available,price,adjusted_price,minimum_nights,maximum_nights
297908,2024-06-27,f,$250.00,,2,1125
17878,2024-06-28,f,$350.00,,5,28
17878,2024-06-29,f,$350.00,,5,28
17878,2024-06-30,f,$350.00,,5,28
17878,2024-07-01,f,$350.00,,5,28
17878,2024-07-02,f,$350.00,,5,28
17878,2024-07-03,f,$350.00,,5,28
17878,2024-07-04,f,$350.00,,5,28
17878,2024-07-05,f,$350.00,,5,28
17878,2024-07-06,f,$350.00,,5,28


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType, DateType


In [0]:
# Verificando o numero de linhas e numero de colunas do DF

numero_linhas = df_calendar_silver.count()

numero_colunas = len(df_calendar_silver.columns)

print(f"numero de colunas {numero_colunas}")
print(f"numero de linhas {numero_linhas}")

numero de colunas 7
numero de linhas 12652371


In [0]:
# Verificando se temos alguma coluna todos os valores NULL

null_columns = []

for column in df_calendar_silver.columns:
    all_null = df_calendar_silver.filter(df_calendar_silver[column].isNotNull()).count() == 0
    if all_null:
        null_columns.append(column)

print("Colunas com todos os valores NULL:", null_columns)

Colunas com todos os valores NULL: ['adjusted_price']


In [0]:
# Tratamento das colunas

df_calendar_silver = (df_calendar_silver.withColumn("listing_id", df_calendar_silver["listing_id"].cast(IntegerType()))
                      .withColumn("minimum_nights", df_calendar_silver["minimum_nights"].cast(IntegerType()))
                      .withColumn("maximum_nights", df_calendar_silver["maximum_nights"].cast(IntegerType()))
                      .withColumn("price", F.regexp_replace("price", r"[$,]", "").cast(FloatType()))
                      .withColumn("date", df_calendar_silver["date"].cast(DateType()))
                      .drop("adjusted_price")
                      .dropDuplicates())


df_calendar_silver.display()





listing_id,date,available,price,minimum_nights,maximum_nights
17878,2024-09-20,f,350.0,5,28
297908,2024-09-23,f,250.0,2,1125
297908,2025-05-24,t,250.0,2,1125
223073,2024-12-06,t,300.0,5,1125
223073,2025-01-02,t,300.0,9,1125
298395,2024-10-10,t,411.0,3,1125
35764,2024-10-19,f,129.0,3,15
230989,2025-06-19,t,200.0,3,89
48305,2024-10-09,t,4024.0,15,89
303587,2024-10-16,t,102.0,4,15


In [0]:
# Salvando o DF ja tratado em arquivo DELTA

df_calendar_silver.write.format("delta").mode("overwrite").saveAsTable("calendar_silver")

In [0]:
# INICIANDO O GREAT EXPECTATIONS

!pip install great_expectations


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import great_expectations as gx
from great_expectations.expectations.core import ExpectColumnValuesToBeOfType, ExpectColumnValuesToNotBeNull, ExpectColumnValuesToMatchRegex
from great_expectations.core import ExpectationSuite


In [0]:
context = gx.get_context()

gx_df_calendar = df_calendar_silver

In [0]:
# Criando data source e data asset
data_source = context.data_sources.add_spark('gx_df_calendar_silver')
data_asset = data_source.add_dataframe_asset (name = 'gx_df_calendar_silver_asset')

# Criando batch
batch_definition = data_asset.add_batch_definition_whole_dataframe("Batch Definition")
batch = batch_definition.get_batch(batch_parameters = {"dataframe": df_calendar_silver})

# Criando Suite
suite = gx.ExpectationSuite(name="suite_calendar") 
suite = context.suites.add(suite)




In [0]:
# Criando Expectativas

expectation_listing_id_type = ExpectColumnValuesToBeOfType(
    column="listing_id",
    type_="IntegerType"
)
expectation_listing_id_not_null = ExpectColumnValuesToNotBeNull(
    column="listing_id"
)
expectation_date_type = ExpectColumnValuesToBeOfType(
    column="date",
    type_= "DateType"
)
expectation_date_not_null = ExpectColumnValuesToNotBeNull(
    column="date"
)


expectation_suite = ExpectationSuite("df_calendar_silver_suite")

# Adicionando as expectativas ao ExpectationSuite
expectation_suite.add_expectation(expectation_listing_id_type)
expectation_suite.add_expectation(expectation_listing_id_not_null)
expectation_suite.add_expectation(expectation_date_type)
expectation_suite.add_expectation(expectation_date_not_null)


validation_result = batch.validate(expectation_suite)


Calculating Metrics:   0%|          | 0/17 [00:00<?, ?it/s]

In [0]:
validation_result 


{
  "success": false,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_of_type",
        "kwargs": {
          "batch_id": "gx_df_calendar_silver-gx_df_calendar_silver_asset",
          "column": "listing_id",
          "type_": "IntegerType"
        },
        "meta": {}
      },
      "result": {
        "observed_value": "IntegerType"
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": false,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "batch_id": "gx_df_calendar_silver-gx_df_calendar_silver_asset",
          "column": "listing_id"
        },
        "meta": {}
      },
      "result": {
        "element_count": 8445341,
        "unexpected_count": 3348841,
        "unexpected_percent": 39.6531176183412