In [14]:
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import logging
import os
import great_expectations as gx
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

In [2]:
def create_spark_session():
    
    """
    Create the spark session with the passed configs.
    """
    
    spark = SparkSession \
        .builder \
        .appName("How-Desafio-03")\
        .getOrCreate()

    return spark

In [3]:
spark = create_spark_session()

24/02/06 20:35:14 WARN Utils: Your hostname, 14111-NB resolves to a loopback address: 127.0.1.1; using 172.18.58.55 instead (on interface eth0)
24/02/06 20:35:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/06 20:35:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
spark = create_spark_session()

weather_df = (spark.read
                  .option("inferSchema", True)
                  .json("./*.json"))

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [10]:
weather_df.collect()

[Row(calctime=0.003620315, city='belem', city_id=1, cnt=1, cod='200', latitude='-1.45056', list=[Row(clouds=Row(all=0), dt=1706230800, main=Row(feels_like=299.17, humidity=94, pressure=1012, temp=299.17, temp_max=299.17, temp_min=299.17), weather=[Row(description='clear sky', icon='01n', id=800, main='Clear')], wind=Row(deg=40, speed=1.54))], longitude='-48.4682453', message='Count: 1')]

In [18]:
def data_quality(input_dataset):
    
    gx_context = gx.get_context()
    datasource = gx_context.sources.add_spark("my_spark_datasource")

    data_asset = datasource.add_dataframe_asset(name="my_df_asset", dataframe=input_dataset).build_batch_request()
    
    gx_context.add_or_update_expectation_suite("my_expectation_suite")
    
    #my_batch_request = data_asset
    
    validator = gx_context.get_validator(
    batch_request=data_asset,
    expectation_suite_name="my_expectation_suite"
                                        )
    
    weather_null = validator.expect_column_values_to_not_be_null(column="city")
    date_format = validator.expect_column_values_to_match_strftime_format("date_partition", "%Y-%m-%d")
    rows_number = validator.expect_table_row_count_to_be_between(27,27)

    
    if weather_null.success == False :
      raise ValueError(f"Data quality check failed {weather_null.expectation_config.kwargs['column']} is null.")
    else : logger.info(f"Data quality check success {weather_null.expectation_config.kwargs['column']} is not null.")
    
    if date_format.success == False :
      raise ValueError(f"Data quality check failed {date_format.expectation_config.kwargs['column']} is in the wrong format.")
    else: logger.info(f"Data quality check success {date_format.expectation_config.kwargs['column']}  is in the right format.")
        
    if rows_number.success == False :
      raise ValueError(f"Data quality check failed, dataset has unexpected number of rows.")
    else: logger.info(f"Data quality check success, dataset has the expected number of rows.")
       
    logger.info(f"All validators passed with success!")

In [6]:
weather_df_partition = weather_df.withColumn('date_partition', from_unixtime(col("list.dt")[0],"yyyy-MM-dd"))
weather_df_partition.collect()

#data_quality(weather_df_partition)

weather_df_partition.write.partitionBy('date_partition').parquet(os.path.join('', 'weather'), 'overwrite')

print("--- weather.parquet completed ---")

                                                                                

--- weather.parquet completed ---


In [19]:
weather_business = (weather_trusted.select(col('list')[0].alias('list_0'), 'city', 'latitude', 'longitude', 'date_partition')
                                   .select('city',
                                           'latitude',
                                           'longitude',                                           
                                           col('list_0.weather.main')[0].alias('main_weather'),
                                           col('list_0.weather.description')[0].alias('main_weather_description'),
                                           'list_0.main.temp',
                                           'list_0.main.feels_like',
                                           'list_0.main.pressure',
                                           'list_0.main.humidity',
                                           'list_0.main.temp_min',
                                           'list_0.main.temp_max',
                                           'list_0.wind.speed',
                                           'list_0.wind.deg',
                                           col('list_0.clouds.all').alias('clouds'),
                                           col('list_0.dt').alias('collect_timestamp'),
                                          col('date_partition').cast(StringType()))
                   )

data_quality(weather_business)

INFO:great_expectations.util:Could not find local context root directory
INFO:great_expectations.data_context.types.base:Created temporary directory '/tmp/tmp012cn0lk' for ephemeral docs site
INFO:great_expectations.data_context.data_context.abstract_data_context:EphemeralDataContext has not implemented `_load_fluent_config()` returning empty `GxConfig`
INFO:great_expectations.datasource.fluent.config:Loading 'datasources' ->
[]
INFO:great_expectations.datasource.fluent.fluent_base_model:SparkDatasource.dict() - substituting config values
24/02/06 20:41:52 WARN CacheManager: Asked to cache already cached data.


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/11 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

INFO:__main__:Data quality check success city is not null.
INFO:__main__:Data quality check success date_partition  is in the right format.
INFO:__main__:Data quality check success, dataset has the expected number of rows.
INFO:__main__:All validators passed with success!


In [1]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
import logging
import os
from datetime import datetime
import boto3
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

In [3]:
def create_spark_session():
    
    """
    Create the spark session with the passed configs.
    """
    
    spark = SparkSession \
        .builder \
        .appName("How-Desafio-3")\
        .getOrCreate()

    return spark

In [5]:
def list_files(bucket):
    files = []
    s3 = boto3.client('s3')
    result = s3.list_objects(Bucket=bucket)
    for obj in result['Contents']:
        files.append(obj['Key'])
    return files

In [6]:
spark = create_spark_session()
bucket = "how-desafio-3"

files = list_files(bucket)
#recent_file = recent_date_files(files)

INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials


ClientError: An error occurred (InvalidToken) when calling the ListObjects operation: The provided token is malformed or otherwise invalid.