In [1]:
import os
import json
import boto3
import requests
import pandas as pd
import pyarrow as pa
import awswrangler as wr
import pyarrow.parquet as pq
from datetime import datetime

In [2]:
pd.set_option('display.max_colwidth', None)

In [3]:
client = boto3.session.Session(profile_name='CRISTIAN_AWS').client('secretsmanager', region_name='us-east-1')
response = client.get_secret_value(SecretId='env/openweather')
secret_data = response['SecretString']
secrets_dict = json.loads(secret_data)
open_weather_token = secrets_dict.get('OPEN_WEATHER_SECRET', '')
API_LINK = f"https://api.openweathermap.org/data/2.5/weather?q=novo hamburgo,br&APPID={open_weather_token}"

In [4]:
requisicao = requests.get(API_LINK)
wheater_json = requisicao.json()
weather_json_string = json.dumps(wheater_json)
weather_json_string = json.dumps(wheater_json)

Save raw JSON at Data-lake

In [5]:
# Defina suas credenciais explicitamente (não recomendado)
profile_name = 'CRISTIAN_AWS'
s3 = boto3.session.Session(profile_name=profile_name).client('s3')
bucket_name = 'data-integration-projects'
directory_path = 'openweather-data-lake/raw-data/'

# Obtenha a data atual
current_date = datetime.now()
dia_da_execucao = current_date.strftime('%Y-%m-%d-%H-%M-%S')

# Divida a data em ano, mês e dia
year = str(current_date.year)
month = str(current_date.month).zfill(2)  # Adicionar zero à esquerda se necessário
day = str(current_date.day).zfill(2)  # Adicionar zero à esquerda se necessário

# Construa o caminho do diretório no formato 'year/month/day/'
directory_path = f'{directory_path}{year}/{month}/{day}/'

# Nome do arquivo usando a data e hora da execução
file_name = f'weather_data_{dia_da_execucao}.json'

# Carregar o arquivo para o S3
s3.put_object(Bucket=bucket_name, Key=directory_path + file_name, Body=weather_json_string)

print(f'Arquivo salvo em: {bucket_name}/{directory_path}{file_name}')

Arquivo salvo em: data-integration-projects/openweather-data-lake/raw-data/2024/01/13/weather_data_2024-01-13-23-58-17.json


Import data from data-lake

In [6]:
profile_name = 'CRISTIAN_AWS'
# Configuração do cliente S3
s3 = boto3.session.Session(profile_name=profile_name)
# Substitua 'seu-bucket' e 'seu-arquivo' pelos valores reais
bucket_name = 'data-integration-projects/openweather-data-lake/raw-data'
year = "2024"
month = "01"
day = "05" 
# Leitura dos dados JSON do S3 usando o awswrangler
df = wr.s3.read_json(f's3://{bucket_name + "/" + year + "/" + month + "/" + day}/*', lines=True, boto3_session = s3)

Tranform data

In [7]:
df['lon'] = df['coord'].apply(lambda x: x['lon'])
df['lat'] = df['coord'].apply(lambda x: x['lat'])
df['clouds_description'] = df['weather'].apply(lambda x: x[0]['description'] if x and isinstance(x, list) and 'description' in x[0] else None)
df['temp'] = df['main'].apply(lambda x: x['temp'])
df['feels_like'] = df['main'].apply(lambda x: x['feels_like'])
df['temp_min'] = df['main'].apply(lambda x: x['temp_min'])
df['temp_max'] = df['main'].apply(lambda x: x['temp_max'])
df['pressure'] = df['main'].apply(lambda x: x['pressure'])
df['humidity'] = df['main'].apply(lambda x: x['humidity'])
df['clouds_number'] = df['clouds'].apply(lambda x: x['all'])
df['country'] = df['sys'].apply(lambda x: x['country'])
df['dt'] = pd.to_numeric(df['dt'])
df['dt_utc'] = pd.to_datetime(df['dt'], unit='s')
df['dt_utc'] = pd.to_datetime(df['dt_utc'], utc=True)
df.drop(columns= ["coord", "weather", "main", "wind", "clouds", "sys", "dt", "timezone"], inplace= True)
df.reset_index(drop= True)

Unnamed: 0,base,visibility,id,name,cod,lon,lat,clouds_description,temp,feels_like,temp_min,temp_max,pressure,humidity,clouds_number,country,dt_utc
0,stations,10000,3456068,Novo Hamburgo,200,-51.1306,-29.6783,few clouds,302.48,305.45,300.41,305.23,1010,64,22,BR,2024-01-05 18:39:42+00:00
1,stations,10000,3456068,Novo Hamburgo,200,-51.1306,-29.6783,few clouds,302.48,305.45,300.41,305.23,1010,64,22,BR,2024-01-05 18:39:42+00:00
2,stations,10000,3456068,Novo Hamburgo,200,-51.1306,-29.6783,few clouds,302.3,305.28,299.86,305.23,1010,65,22,BR,2024-01-05 19:05:49+00:00
3,stations,10000,3456068,Novo Hamburgo,200,-51.1306,-29.6783,scattered clouds,301.03,303.22,299.3,302.65,1010,67,25,BR,2024-01-05 20:46:50+00:00


Save at S3

In [8]:
# Defina suas credenciais explicitamente (não recomendado)
profile_name = 'CRISTIAN_AWS'
s3 = boto3.session.Session(profile_name=profile_name).client('s3')
bucket_name = 'data-integration-projects'
directory_path = 'openweather-data-lake/processed-data/'

# Obtenha a data atual
current_date = datetime.now()
dia_da_execucao = current_date.strftime('%Y-%m-%d-%H-%M-%S')

# Divida a data em ano, mês e dia
year = str(current_date.year)
month = str(current_date.month).zfill(2)  # Adicionar zero à esquerda se necessário
day = str(current_date.day).zfill(2)  # Adicionar zero à esquerda se necessário

# Construa o caminho do diretório no formato 'year/month/day/'
directory_path = f'{directory_path}{year}/{month}/{day}/'

# Nome do arquivo usando a data e hora da execução
file_name = f'df_{dia_da_execucao}.parquet'

# Supondo que 'df' seja o DataFrame que você deseja salvar
df = pd.DataFrame({'example_key': ['example_value']})

# Salvar o DataFrame no formato parquet
df.to_parquet(file_name)

# Carregar o arquivo para o S3
s3.upload_file(file_name, bucket_name, directory_path + file_name)

print(f'Arquivo salvo em: {bucket_name}/{directory_path}{file_name}')


Arquivo salvo em: data-integration-projects/openweather-data-lake/processed-data/2024/01/13/df_2024-01-13-23-58-20.parquet


In [9]:
## Defina suas credenciais explicitamente (não recomendado)
#profile_name = 'CRISTIAN_AWS'
#s3 = boto3.session.Session(profile_name=profile_name).client('s3')
#bucket_name = 'data-integration-projects'
#directory_path = 'openweather-data-lake/processed-data/'
#dia_da_exucucao = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')
#file_name = f'df{dia_da_exucucao}.parquet'
#df.to_parquet(file_name)
#s3.upload_file(file_name, bucket_name, directory_path + file_name)