In [32]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import logging
import os
import great_expectations as gx
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

In [2]:
def create_spark_session():
    
    """
    Create the spark session with the passed configs.
    """
    
    spark = SparkSession \
        .builder \
        .appName("How-Desafio-02")\
        .getOrCreate()

    return spark

In [3]:
def process_order(spark, input_data, output_data):

    """
    Perform ETL on orders to create the orders_silver:
    - Extract the match result data and insert in the match_results table.
      
    Parameters:
    - spark: spark session
    - input_data : path to input files
    - output_data : path to output files
    """


    #reading json files
    order_file_Path = input_data

    orders_df = (spark.read
                  .option("inferSchema", True)
                  .json(order_file_Path))
    
    orders_df_partition = orders_df.withColumn('date_partition', date_format(col('order_created_at'), "yyyy-MM-dd"))

    data_quality(orders_df_partition)

    orders_df_partition.write.parquet(os.path.join(output_data, 'orders.parquet'), 'overwrite')

    
    print("--- orders.parquet completed ---")

In [4]:
def data_quality(input_dataset):
    
    gx_context = gx.get_context()
    datasource = gx_context.sources.add_spark("my_spark_datasource")

    data_asset = datasource.add_dataframe_asset(name="my_df_asset", dataframe=input_dataset).build_batch_request()
    
    gx_context.add_or_update_expectation_suite("my_expectation_suite")
    
    #my_batch_request = data_asset
    
    validator = gx_context.get_validator(
    batch_request=data_asset,
    expectation_suite_name="my_expectation_suite"
                                        )
    
    order_null = validator.expect_column_values_to_not_be_null(column="order_id")
    order_unique = validator.expect_column_values_to_be_unique(column="order_id")
    date_format = validator.expect_column_values_to_match_strftime_format("date_partition", "%Y-%m-%d")
    rows_number = validator.expect_table_row_count_to_be_between(400,600)

    
    if order_null.success == False :
      raise ValueError(f"Data quality check failed {order_null.expectation_config.kwargs['column']} is null.")
    
    elif order_unique.success == False :
      raise ValueError(f"Data quality check failed {order_unique.expectation_config.kwargs['column']} is not unique.")
    
    elif date_format.success == False :
      raise ValueError(f"Data quality check failed {date_format.expectation_config.kwargs['column']} is not in {date_format.expectation_config.kwargs['strftime_format']} format.")
    
    #elif rows_number.success == False :
    #  raise ValueError(f"Data quality check failed number of rows is not between {rows_number.expectation_config.kwargs['min_value']} and {rows_number.expectation_config.kwargs['max_value']}.")
    
    else: logger.info(f"All validators passed with success!")
    

In [5]:
order_file_Path = "./order-data/*/*.json"

spark = create_spark_session()

orders_df = (spark.read
                  .option("inferSchema", True)
                  .json(order_file_Path))

23/09/02 18:15:19 WARN Utils: Your hostname, 14111-NB resolves to a loopback address: 127.0.1.1; using 172.26.45.45 instead (on interface eth0)
23/09/02 18:15:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/02 18:15:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/02 18:15:22 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

In [8]:
orders_df_partition = orders_df.withColumn('date_partition', date_format(col('order_created_at'), "yyyy-MM-dd"))

In [29]:
def data_quality(input_dataset):
    
    gx_context = gx.get_context()
    datasource = gx_context.sources.add_spark("my_spark_datasource")

    data_asset = datasource.add_dataframe_asset(name="my_df_asset", dataframe=input_dataset).build_batch_request()
    
    gx_context.add_or_update_expectation_suite("my_expectation_suite")
    
    #my_batch_request = data_asset
    
    validator = gx_context.get_validator(
    batch_request=data_asset,
    expectation_suite_name="my_expectation_suite"
                                        )
    
    order_null = validator.expect_column_values_to_not_be_null(column="customer_id")
    order_unique = validator.expect_column_values_to_be_unique(column="customer_id")
    #date_format = validator.expect_column_values_to_match_strftime_format("date_partition", "%Y-%m-%d")
    #rows_number = validator.expect_table_row_count_to_be_between(400,600)

    
    if order_null.success == False :
      raise ValueError(f"Data quality check failed {order_null.expectation_config.kwargs['column']} is null.")
    
    elif order_unique.success == False :
      raise ValueError(f"Data quality check failed {order_unique.expectation_config.kwargs['column']} is not unique.")
    
    else: logger.info(f"All validators passed with success!")

In [30]:
def process_cutomers(spark, input_data, output_data):

    """
    Perform ETL on orders to create the orders_silver:
    - Extract the match result data and insert in the customers table.
      
    Parameters:
    - spark: spark session
    - input_data : path to input files
    - output_data : path to output files
    """


    #reading json files
    order_file_Path = input_data
    
    orders_trusted=spark.read.parquet(order_file_Path)
    
    customer_data = orders_trusted.select('customer_info.customer_id', 'customer_info.customer_name', 'customer_info.customer_phone_number').distinct()

    data_quality(customer_data)

    customer_data.write.parquet(os.path.join(output_data, 'customers'), 'overwrite')

    
    print("--- customers.parquet completed ---")

In [33]:
raw = "order-data/*/*.json"
trusted = "trusted/"
business = "business/"
    
process_cutomers(spark, trusted, business)

INFO:great_expectations.util:Could not find local context root directory
INFO:great_expectations.data_context.types.base:Created temporary directory '/tmp/tmprdb2qz9f' for ephemeral docs site
INFO:great_expectations.data_context.data_context.abstract_data_context:EphemeralDataContext has not implemented `_load_fluent_config()` returning empty `GxConfig`
INFO:great_expectations.datasource.fluent.config:Loading 'datasources' ->
[]
INFO:great_expectations.datasource.fluent.fluent_base_model:SparkDatasource.dict() - substituting config values
23/09/02 18:45:23 WARN CacheManager: Asked to cache already cached data.


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/10 [00:00<?, ?it/s]

INFO:__main__:All validators passed with success!
23/09/02 18:45:25 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
23/09/02 18:45:25 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 84.47% for 8 writers
23/09/02 18:45:25 WARN MemoryManager: Total allocation exceeds 95.00% (906,992,014 bytes) of heap memory
Scaling row group sizes to 96.54% for 7 writers
                                                                                

--- customers.parquet completed ---


In [12]:
orders_trusted=spark.read.parquet("./trusted/*")

['customer_info',
 'delivery_address',
 'order_created_at',
 'order_id',
 'order_total',
 'payment_info',
 'product_info',
 'date_partition']

In [21]:
orders_trusted.select('customer_info').collect()

                                                                                

[Row(customer_info=Row(customer_id='366743c3-ed75-493b-b845-54c2dbe98263', customer_name='Marcos Vinicius Ribeiro', customer_phone_number='+55 71 7042-8010')),
 Row(customer_info=Row(customer_id='fd647a8d-99a3-4377-a8e4-c694537d89cf', customer_name='Dr. Luiz Fernando Peixoto', customer_phone_number='(041) 3452-3466')),
 Row(customer_info=Row(customer_id='b6753e9d-1b11-4a76-875a-3db171c10b85', customer_name='Gustavo Henrique Melo', customer_phone_number='(031) 4284 4589')),
 Row(customer_info=Row(customer_id='d8e7bc3c-170e-4e72-8152-8413dd79809f', customer_name='Yuri Farias', customer_phone_number='61 9321-8257')),
 Row(customer_info=Row(customer_id='fff26d33-3192-4918-8440-00f9843aef1d', customer_name='Sr. Luiz Henrique Vieira', customer_phone_number='(021) 7678-6374')),
 Row(customer_info=Row(customer_id='83a1bdab-4302-4075-9463-5d5cd0189026', customer_name='Nicolas Costa', customer_phone_number='0900 654 9045')),
 Row(customer_info=Row(customer_id='481d0626-a375-43ca-9fd5-c2fc5107ecf

In [24]:
orders_trusted.select('customer_info.customer_id', 'customer_info.customer_name', 'customer_info.customer_phone_number').distinct().show(truncate=False)



+------------------------------------+------------------------+---------------------+
|customer_id                         |customer_name           |customer_phone_number|
+------------------------------------+------------------------+---------------------+
|d8e7bc3c-170e-4e72-8152-8413dd79809f|Yuri Farias             |61 9321-8257         |
|3a9c2f35-7797-4ad9-a8b6-7e83affb2292|Melissa da Conceição    |71 8040-4937         |
|1a07e52c-a86f-4924-8e4f-c9c1be45cf3b|Clarice das Neves       |0900 020 2008        |
|2db4ac2e-2e80-43d0-bcb5-1f1e761be24f|Pedro Miguel Moreira    |+55 (021) 2071-6513  |
|3894e622-50dd-44e1-bcfc-726f728413eb|Dr. Leonardo Barros     |+55 (061) 6589-8832  |
|5478572e-dbf3-4914-8891-a655e0c9f838|Kevin da Conceição      |+55 (041) 2230 3495  |
|2d6799ef-9d4f-4710-8413-7985eb8328ec|Srta. Ana Júlia Ribeiro |(081) 9134-6228      |
|15c57b51-b6b6-4b78-a45d-3a7a3f320b02|Bernardo Souza          |11 2033-6330         |
|3ad005b2-f746-4ef8-8153-80925551b69e|Gabriela Monteir



In [28]:
orders_trusted.groupBy('customer_info.customer_id').agg(count_distinct('customer_info.customer_name').alias('dist_customer_name'), count_distinct('customer_info.customer_phone_number').alias('dist_phone')).orderBy(col('dist_phone').desc()).show(truncate=False)



+------------------------------------+------------------+----------+
|customer_id                         |dist_customer_name|dist_phone|
+------------------------------------+------------------+----------+
|366743c3-ed75-493b-b845-54c2dbe98263|1                 |1         |
|c3d5ce37-eb4c-4714-9526-62ff33827db2|1                 |1         |
|efec20fb-87cf-4cbc-9b19-ae31d9f98dc4|1                 |1         |
|28772edb-7c2b-4881-9ef4-4f1a1b8574e9|1                 |1         |
|29bbda41-8331-44ee-8e16-c14a1abeb655|1                 |1         |
|d38e0fb8-69ee-4760-ad61-54e1e6e46456|1                 |1         |
|9a886157-a067-4566-8410-c552b3094819|1                 |1         |
|10cd686f-dbe4-455b-8e07-02913fad50fe|1                 |1         |
|65f7a18a-2b32-409f-86ec-9c2dcc25fbf9|1                 |1         |
|05e16191-8a97-40af-8b91-a7b6b5a77b72|1                 |1         |
|6d8867f8-26fd-4135-ad48-8e9a6fe1f54b|1                 |1         |
|deb3a694-7d38-4827-ba9a-1f1d6d0ef

                                                                                