In [19]:
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from sqlalchemy import create_engine
#from delta import *
import pyspark

from pyspark.sql.types import DateType, IntegerType, LongType, FloatType, BooleanType

from pyspark.sql.functions import col, sum

from pyspark.sql.functions import regexp_replace

import pandas as pd 
import json

import great_expectations as gx

#from delta import DeltaTable  # Necessário apenas se for usar operações específicas

In [2]:
context = gx.get_context()

print(gx.__version__)

1.2.2


### 0. Inicia Sessão Spark

In [3]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## 1. Great Expectation

### 1.1 Reviews

In [28]:
# Carregar dados Delta
reviews = spark.read.format("delta").load("deltalake/silver/silver_reviews")

In [87]:
reviews.show(n=5)

+----------+------------------+----------+-----------+-------------+--------------------+
|listing_id|                id|      date|reviewer_id|reviewer_name|            comments|
+----------+------------------+----------+-----------+-------------+--------------------+
|    854496|          95807013|2016-08-21|   64850524|      Ricardo| Localização nota 10|
|  25728359|944196639692565570|2023-07-26|  453609373| Joanna Paula|Apartamento espaç...|
|  38820649|         562094445|2019-11-10|  221445646|       Cibele|Hospitalidade inc...|
|  45329462|561497635891415917|2022-02-13|  106063766|        Heric|Fiel ao anúncio. ...|
|  52507271|910910423101689718|2023-06-10|  236708134|       Daniel|Tudo certo. O anf...|
+----------+------------------+----------+-----------+-------------+--------------------+
only showing top 5 rows



In [89]:
data_source = context.data_sources.add_spark("teste_spark")

In [90]:
data_asset = data_source.add_dataframe_asset(name="spark dataframe asset")

In [91]:
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch definition")

In [92]:
batch = batch_definition.get_batch(batch_parameters={"dataframe": reviews})

In [93]:
suite = gx.ExpectationSuite(name="suite_teste")
suite = context.suites.add(suite)

In [94]:
lista_col_data = ["date"]
lista_col_int = ["listing_id","id","reviewer_id"]

In [95]:
for coluna in lista_col_data:
  expectation = gx.expectations.ExpectColumnValuesToBeOfType(column=coluna, type_="DateType")
  suite.add_expectation(expectation)
for coluna in lista_col_int:
  expectation = gx.expectations.ExpectColumnValuesToBeOfType(column=coluna, type_="LongType")
  suite.add_expectation(expectation)

In [96]:
#expectation = gx.expectations.ExpectColumnValuesToBeBetween(column="price", min_value=0, strict_min=True)

In [97]:
validation_definition = gx.ValidationDefinition(data=batch_definition, suite=suite, name="validacao dados auto")

#### 1.1.1 Great Expectations

In [98]:
reviews_ge = validation_definition.run(batch_parameters={"dataframe":reviews})

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

In [99]:
reviews_dict = reviews_ge.to_json_dict()

In [100]:
reviews_ge_df = pd.json_normalize(reviews_dict)
reviews_ge_df = reviews_ge_df.explode('results')

In [101]:
# Expandir os dicionários dentro de 'results' para colunas
results_reviews_df = pd.json_normalize(df['results'])

In [104]:
# Redefinir o índice dos DataFrames antes de concatená-los
df_reset = reviews_ge_df.drop(columns=['results']).reset_index(drop=True)
results_df_reset = results_reviews_df.reset_index(drop=True)

# Concatenar os DataFrames
reviews_final_df = pd.concat([df_reset, results_df_reset], axis=1)

In [105]:
final_df

Unnamed: 0,success,suite_name,id,statistics.evaluated_expectations,statistics.successful_expectations,statistics.unsuccessful_expectations,statistics.success_percent,meta.great_expectations_version,meta.batch_spec.batch_data,meta.batch_markers.ge_load_time,...,success.1,expectation_config.type,expectation_config.kwargs.batch_id,expectation_config.kwargs.column,expectation_config.kwargs.type_,expectation_config.id,result.observed_value,exception_info.raised_exception,exception_info.exception_traceback,exception_info.exception_message
0,True,suite_teste,,4,4,0,100.0,1.2.2,SparkDataFrame,20241111T201710.728421Z,...,True,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,date,DateType,55220df2-f2e7-4108-adc8-99433bc3613a,DateType,False,,
1,True,suite_teste,,4,4,0,100.0,1.2.2,SparkDataFrame,20241111T201710.728421Z,...,True,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,listing_id,LongType,9eaca797-a0ea-47cd-95c3-fdb8845c16bc,LongType,False,,
2,True,suite_teste,,4,4,0,100.0,1.2.2,SparkDataFrame,20241111T201710.728421Z,...,True,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,id,LongType,f43a0d54-ce39-4521-b49d-e18e7c8dec30,LongType,False,,
3,True,suite_teste,,4,4,0,100.0,1.2.2,SparkDataFrame,20241111T201710.728421Z,...,True,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,reviewer_id,LongType,0d51b3e2-cba5-47c4-9b38-6267930dc90e,LongType,False,,


### 1.2 Calendar

In [27]:
# Carregar dados Delta
calendar = spark.read.format("delta").load("deltalake/silver/silver_calendar")

In [108]:
calendar.show(n=5)

+----------+----------+---------+-----+--------------+--------------+--------------+
|listing_id|      date|available|price|adjusted_price|minimum_nights|maximum_nights|
+----------+----------+---------+-----+--------------+--------------+--------------+
|    297908|2024-06-27|    false|250.0|          NULL|             2|          1125|
|     17878|2024-06-28|    false|350.0|          NULL|             5|            28|
|     17878|2024-06-29|    false|350.0|          NULL|             5|            28|
|     17878|2024-06-30|    false|350.0|          NULL|             5|            28|
|     17878|2024-07-01|    false|350.0|          NULL|             5|            28|
+----------+----------+---------+-----+--------------+--------------+--------------+
only showing top 5 rows



In [89]:
#data_source = context.data_sources.add_spark("teste_spark")

In [90]:
#data_asset = data_source.add_dataframe_asset(name="spark dataframe asset")

In [91]:
#batch_definition = data_asset.add_batch_definition_whole_dataframe("batch definition")

In [109]:
batch = batch_definition.get_batch(batch_parameters={"dataframe": calendar})

In [110]:
suite = gx.ExpectationSuite(name="suite_teste_calendar")
suite = context.suites.add(suite)

In [111]:
lista_col_data = ["date"]
lista_col_long_int = ["listing_id"]
lista_col_float = ["price"]
lista_col_bool = ["available"]
lista_col_int = ["adjusted_price","minimum_nights","maximum_nights"]

In [112]:
for coluna in lista_col_data:
  expectation = gx.expectations.ExpectColumnValuesToBeOfType(column=coluna, type_="DateType")
  suite.add_expectation(expectation)
for coluna in lista_col_long_int:
  expectation = gx.expectations.ExpectColumnValuesToBeOfType(column=coluna, type_="LongType")
  suite.add_expectation(expectation)
for coluna in lista_col_float:
  expectation = gx.expectations.ExpectColumnValuesToBeOfType(column=coluna, type_="IntegerType")
  suite.add_expectation(expectation)
for coluna in lista_col_bool:
  expectation = gx.expectations.ExpectColumnValuesToBeOfType(column=coluna, type_="BooleanType")
  suite.add_expectation(expectation)
for coluna in lista_col_int:
  expectation = gx.expectations.ExpectColumnValuesToBeOfType(column=coluna, type_="IntegerType")
  suite.add_expectation(expectation)


In [96]:
#expectation = gx.expectations.ExpectColumnValuesToBeBetween(column="price", min_value=0, strict_min=True)

In [113]:
validation_definition = gx.ValidationDefinition(data=batch_definition, suite=suite, name="validacao dados calendar")

#### 1.2.1 Great Expectations

In [114]:
calendar_ge = validation_definition.run(batch_parameters={"dataframe":reviews})

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

In [116]:
calendar_dict = calendar_ge.to_json_dict()

In [117]:
calendar_ge_df = pd.json_normalize(calendar_dict)
calendar_ge_df = calendar_ge_df.explode('results')

In [118]:
# Expandir os dicionários dentro de 'results' para colunas
results_calendar_df = pd.json_normalize(calendar_ge_df['results'])

In [119]:
# Redefinir o índice dos DataFrames antes de concatená-los
df_reset = calendar_ge_df.drop(columns=['results']).reset_index(drop=True)
results_df_reset = results_calendar_df.reset_index(drop=True)

# Concatenar os DataFrames
calendar_final_df = pd.concat([df_reset, results_df_reset], axis=1)

In [120]:
calendar_final_df

Unnamed: 0,success,suite_name,id,statistics.evaluated_expectations,statistics.successful_expectations,statistics.unsuccessful_expectations,statistics.success_percent,meta.great_expectations_version,meta.batch_spec.batch_data,meta.batch_markers.ge_load_time,...,success.1,expectation_config.type,expectation_config.kwargs.batch_id,expectation_config.kwargs.column,expectation_config.kwargs.type_,expectation_config.id,result.observed_value,exception_info.raised_exception,exception_info.exception_traceback,exception_info.exception_message
0,False,suite_teste_calendar,,7,2,5,28.571429,1.2.2,SparkDataFrame,20241111T204059.098993Z,...,True,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,date,DateType,e70e48a4-312b-46a5-8559-967a517de5cd,DateType,False,,
1,False,suite_teste_calendar,,7,2,5,28.571429,1.2.2,SparkDataFrame,20241111T204059.098993Z,...,True,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,listing_id,LongType,83d16f54-c771-4f19-a748-ee744af5e0a1,LongType,False,,
2,False,suite_teste_calendar,,7,2,5,28.571429,1.2.2,SparkDataFrame,20241111T204059.098993Z,...,False,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,price,IntegerType,c109f34e-bbc3-4d05-b143-158592bcff59,,True,"Traceback (most recent call last):\n File ""/o...",list index out of range
3,False,suite_teste_calendar,,7,2,5,28.571429,1.2.2,SparkDataFrame,20241111T204059.098993Z,...,False,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,available,BooleanType,14489ed9-2251-414c-812b-8d260587b1cf,,True,"Traceback (most recent call last):\n File ""/o...",list index out of range
4,False,suite_teste_calendar,,7,2,5,28.571429,1.2.2,SparkDataFrame,20241111T204059.098993Z,...,False,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,adjusted_price,IntegerType,b425b757-721a-4462-b447-8854bf87c3cd,,True,"Traceback (most recent call last):\n File ""/o...",list index out of range
5,False,suite_teste_calendar,,7,2,5,28.571429,1.2.2,SparkDataFrame,20241111T204059.098993Z,...,False,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,minimum_nights,IntegerType,60eebc16-ae1f-4f05-84dc-cf84e9e71da3,,True,"Traceback (most recent call last):\n File ""/o...",list index out of range
6,False,suite_teste_calendar,,7,2,5,28.571429,1.2.2,SparkDataFrame,20241111T204059.098993Z,...,False,expect_column_values_to_be_of_type,teste_spark-spark dataframe asset,maximum_nights,IntegerType,70635cf1-da94-4970-9663-4bc498005cb5,,True,"Traceback (most recent call last):\n File ""/o...",list index out of range


### 1.3 Listings

In [4]:
# Carregar dados Delta
listings = spark.read.format("delta").load("deltalake/silver/silver_listings")

In [5]:
data_source = context.data_sources.add_spark("teste_spark")

In [6]:
data_asset = data_source.add_dataframe_asset(name="spark dataframe asset")

In [7]:
batch_definition = data_asset.add_batch_definition_whole_dataframe("batch definition")

In [8]:
batch = batch_definition.get_batch(batch_parameters={"dataframe": listings})

In [9]:
suite = gx.ExpectationSuite(name="suite_teste_listings")
suite = context.suites.add(suite)

In [10]:
#listings.columns

In [11]:
expectation = gx.expectations.ExpectColumnPairValuesAToBeGreaterThanB(
    column_A="maximum_nights",
    column_B="minimum_nights",
    or_equal=False
)
suite.add_expectation(expectation)

ExpectColumnPairValuesAToBeGreaterThanB(id='93d0fdf7-5502-4133-8f46-9b5aa3da62e0', meta=None, notes=None, result_format=<ResultFormat.BASIC: 'BASIC'>, description=None, catch_exceptions=True, rendered_content=None, windows=None, batch_id=None, column_A='maximum_nights', column_B='minimum_nights', mostly=1, row_condition=None, condition_parser=None, or_equal=False, ignore_row_if='both_values_are_missing')

In [12]:
lista_number_min_zero = ['bathrooms','accommodates','beds']
for coluna in lista_number_min_zero:
    expectation = gx.expectations.ExpectColumnValuesToBeBetween(
        column=coluna,
        min_value=0,
        max_value=20
    )
    suite.add_expectation(expectation)

In [13]:
#expectation = gx.expectations.ExpectColumnValuesToBeBetween(column="price", min_value=0, strict_min=True)

In [14]:
validation_definition = gx.ValidationDefinition(data=batch_definition, suite=suite, name="validacao dados listings")

#### 1.3.1 Great Expectations

In [16]:
listings_ge = validation_definition.run(batch_parameters={"dataframe":listings})

Calculating Metrics:   0%|          | 0/37 [00:00<?, ?it/s]

In [21]:
listings_dict = listings_ge.to_json_dict()

In [22]:
listings_ge_df = pd.json_normalize(listings_dict)
listings_ge_df = listings_ge_df.explode('results')

In [23]:
# Expandir os dicionários dentro de 'results' para colunas
results_listings_df = pd.json_normalize(listings_ge_df['results'])

In [24]:
# Redefinir o índice dos DataFrames antes de concatená-los
df_reset = listings_ge_df.drop(columns=['results']).reset_index(drop=True)
results_df_reset = results_listings_df.reset_index(drop=True)

# Concatenar os DataFrames
listings_final_df = pd.concat([df_reset, results_df_reset], axis=1)

In [25]:
listings_final_df

Unnamed: 0,success,suite_name,id,statistics.evaluated_expectations,statistics.successful_expectations,statistics.unsuccessful_expectations,statistics.success_percent,meta.great_expectations_version,meta.batch_spec.batch_data,meta.batch_markers.ge_load_time,...,result.missing_percent,result.unexpected_percent_total,result.unexpected_percent_nonmissing,result.partial_unexpected_counts,exception_info.raised_exception,exception_info.exception_traceback,exception_info.exception_message,expectation_config.kwargs.column,expectation_config.kwargs.min_value,expectation_config.kwargs.max_value
0,False,suite_teste_listings,,4,1,3,25.0,1.2.2,SparkDataFrame,20241111T210745.436714Z,...,0.0,0.62601,0.62601,"[{'value': [7, 7], 'count': 5}, {'value': [365...",False,,,,,
1,False,suite_teste_listings,,4,1,3,25.0,1.2.2,SparkDataFrame,20241111T210745.436714Z,...,2.423263,0.002885,0.002956,"[{'value': 29, 'count': 1}]",False,,,bathrooms,0.0,20.0
2,False,suite_teste_listings,,4,1,3,25.0,1.2.2,SparkDataFrame,20241111T210745.436714Z,...,0.0,0.0,0.0,[],False,,,accommodates,0.0,20.0
3,False,suite_teste_listings,,4,1,3,25.0,1.2.2,SparkDataFrame,20241111T210745.436714Z,...,2.429033,0.100969,0.103483,"[{'value': 23, 'count': 3}, {'value': 26, 'cou...",False,,,beds,0.0,20.0


In [26]:
#df.select("*").toPandas()

# 2. SQL para camada GOLD

## 2.1 Reviews

In [31]:
reviews_silver = spark.read.format("delta").load("deltalake/silver/silver_reviews/")

# Registrar o DataFrame como uma tabela temporária para usar SQL
reviews_silver.createOrReplaceTempView("silver_reviews_table")

In [32]:
# SQL 
query = """
SELECT 
    *
FROM silver_reviews_table
WHERE listing_id = 1280058  -- Filtro de exemplo
"""

In [34]:
# Executar a query SQL e armazenar o resultado em um DataFrame
gold_df = spark.sql(query)

In [35]:
# Passo 3: Salvar o resultado em formato Delta na camada gold
gold_df.write.format("delta").mode("overwrite").save("deltalake/gold/gold_reviews/")

In [76]:
teste = spark.read.format("delta").load("deltalake/gold/gold_reviews/")

In [77]:
teste.show()

+----------+-------------------+----------+-----------+--------------------+--------------------+
|listing_id|                 id|      date|reviewer_id|       reviewer_name|            comments|
+----------+-------------------+----------+-----------+--------------------+--------------------+
|   1280058|          195213424|2017-09-18|   60674508|            Caroline|A localização é ó...|
|   1280058|          407526648|2019-02-02|  239367709|            Giovanna|Apartamento incrí...|
|   1280058|          231287303|2018-01-31|  100685150|            Fernando|El departamento e...|
|   1280058| 557130762134654314|2022-02-07|  396432495|               Bruno|Apartamento sensa...|
|   1280058|           19765713|2014-09-18|   11898253|          Juan Pablo|Lindo apartamento...|
|   1280058| 496255180049167643|2021-11-15|  231940891|         Maria Júlia|O apartamento é p...|
|   1280058|           25986716|2015-02-01|   12534709|            Macarena|We have just arri...|
|   1280058|        

In [78]:
teste.count()

404

## 2.2 Calendar

In [43]:
calendar_silver = spark.read.format("delta") \
                        .load("deltalake/silver/silver_calendar/") \

# Registrar o DataFrame como uma tabela temporária para usar SQL
calendar_silver.createOrReplaceTempView("silver_calendar_table")

In [45]:
calendar_silver.show(n=5)

+----------+----------+---------+-----+--------------+--------------+--------------+
|listing_id|      date|available|price|adjusted_price|minimum_nights|maximum_nights|
+----------+----------+---------+-----+--------------+--------------+--------------+
|    297908|2024-06-27|    false|250.0|          NULL|             2|          1125|
|     17878|2024-06-28|    false|350.0|          NULL|             5|            28|
|     17878|2024-06-29|    false|350.0|          NULL|             5|            28|
|     17878|2024-06-30|    false|350.0|          NULL|             5|            28|
|     17878|2024-07-01|    false|350.0|          NULL|             5|            28|
+----------+----------+---------+-----+--------------+--------------+--------------+
only showing top 5 rows



In [51]:
# SQL 
query = """
SELECT 
    listing_id, date, price, minimum_nights, maximum_nights
FROM silver_calendar_table
WHERE date > '2024-05-01' and available=True
"""

In [52]:
# Executar a query SQL e armazenar o resultado em um DataFrame
gold_df = spark.sql(query)

In [53]:
gold_df.show(n=5)

+----------+----------+-----+--------------+--------------+
|listing_id|      date|price|minimum_nights|maximum_nights|
+----------+----------+-----+--------------+--------------+
|     17878|2024-07-10|350.0|             5|            28|
|     17878|2024-07-11|350.0|             5|            28|
|     17878|2024-07-12|350.0|             5|            28|
|     17878|2024-07-13|350.0|             5|            28|
|     17878|2024-07-14|350.0|             5|            28|
+----------+----------+-----+--------------+--------------+
only showing top 5 rows



In [54]:
gold_df.count()

7045924

In [55]:
# Passo 3: Salvar o resultado em formato Delta na camada gold
gold_df.write.format("delta").mode("overwrite").save("deltalake/gold/gold_calendar/")

In [56]:
teste = spark.read.format("delta").load("deltalake/gold/gold_calendar/")

In [57]:
teste.show()

+----------+----------+-----+--------------+--------------+
|listing_id|      date|price|minimum_nights|maximum_nights|
+----------+----------+-----+--------------+--------------+
|     17878|2024-07-10|350.0|             5|            28|
|     17878|2024-07-11|350.0|             5|            28|
|     17878|2024-07-12|350.0|             5|            28|
|     17878|2024-07-13|350.0|             5|            28|
|     17878|2024-07-14|350.0|             5|            28|
|     17878|2024-07-15|350.0|             5|            28|
|     17878|2024-08-01|350.0|             5|            28|
|     17878|2024-08-02|350.0|             5|            28|
|     17878|2024-08-03|350.0|             5|            28|
|     17878|2024-08-04|350.0|             5|            28|
|     17878|2024-08-05|350.0|             5|            28|
|     17878|2024-08-06|350.0|             5|            28|
|     17878|2024-08-07|350.0|             5|            28|
|     17878|2024-08-08|350.0|           

In [58]:
teste.count()

7045924

## 2.3 Listings

In [28]:
#listing = drop neighbourhood_group_cleansed,license

In [59]:
listings_silver = spark.read.format("delta") \
                        .load("deltalake/silver/silver_listings/") \

# Registrar o DataFrame como uma tabela temporária para usar SQL
listings_silver.createOrReplaceTempView("silver_listings_silver_table")

In [60]:
listings_silver.show(n=1, vertical=True)

-RECORD 0------------------------------------------------------------
 id                                           | 96478                
 listing_url                                  | https://www.airbn... 
 scrape_id                                    | 20240627045056       
 last_scraped                                 | 2024-06-28           
 source                                       | city scrape          
 name                                         | APARTAMENT IN BOT... 
 description                                  | LINDO APARTAMENTO... 
 neighborhood_overview                        | bRes,resturantes,... 
 picture_url                                  | https://a0.muscac... 
 host_id                                      | 530471               
 host_url                                     | https://www.airbn... 
 host_name                                    | Hugo                 
 host_since                                   | 2011-04-23           
 host_location      

In [77]:
# SQL 
query = """
SELECT 
    id, name, host_response_rate, property_type, review_scores_rating
FROM silver_listings_silver_table
WHERE review_scores_rating > 2.5
"""

In [78]:
# Executar a query SQL e armazenar o resultado em um DataFrame
gold_df = spark.sql(query)

In [79]:
gold_df.show()

+--------+--------------------+------------------+--------------------+--------------------+
|      id|                name|host_response_rate|       property_type|review_scores_rating|
+--------+--------------------+------------------+--------------------+--------------------+
|  107469|Quarto confortáve...|             100.0|Private room in home|                 5.0|
| 1912695|Oceanfront pentho...|             100.0|  Entire rental unit|                4.58|
| 4582104|Apartment 300 met...|             100.0|  Entire rental unit|                4.65|
| 4997179|Descobrindo praia...|             100.0|  Entire rental unit|                4.94|
| 5251487|Flat incrível sui...|             100.0|Entire serviced a...|                4.92|
| 5606630|Apartamento 3 qua...|              64.0|  Entire rental unit|                4.77|
| 7053762|Apartamento Next ...|             100.0|  Entire rental unit|                 4.7|
| 8444563|Ótima Suite Hotel...|             100.0|        Entire place

In [80]:
# Passo 3: Salvar o resultado em formato Delta na camada gold
gold_df.write.format("delta").mode("overwrite").save("deltalake/gold/gold_listings/")

In [81]:
teste = spark.read.format("delta").load("deltalake/gold/gold_listings/")

In [82]:
teste.count()

26098