In [28]:
#%pip install pyarrow
import sys
from pathlib import Path
import io
sys.path.append(str(Path().absolute().parent / "src"))
from IPython.display import display, HTML
# This CSS forces the table to stay wide and provides a horizontal scrollbar
style = """
<style>
    .rendered_html table {
        display: block;
        overflow-x: auto;
        white-space: nowrap !important;
    }
</style>
"""
display(HTML(style))



In [2]:
import numpy as np
import pandas as pd
import io
from openweather_pipeline.s3_operations import S3Operations
from openweather_pipeline.config_manager import get_config


In [3]:
config = get_config().config
source_bucket = config.get("s3", {}).get("buckets", {}).get("source_bucket")
processed_prefix = config.get("s3", {}).get("buckets", {}).get("processed_prefix")
processed_file_name = (config.get("s3", {}).get("buckets", {}).get("processed_file_name"))
cleaned_file_name = (config.get("s3", {}).get("buckets", {}).get("processed_file_name"))
region = config.get("aws", {}).get("region", "us-east-1")


In [5]:
# Read processed parquet file
process_file_key =f"{processed_prefix}/{processed_file_name}"
s3_operations = S3Operations(bucket=source_bucket, region=region)
content = s3_operations.read_file_as_bytes(process_file_key)
df= pd.read_parquet(io.BytesIO(content))
display(df.head())

Unnamed: 0,date,cloud_cover,humidity,precipitation,pressure,temperature,temperature_min,temperature_max,wind_speed,wind_direction,zip_code,country_code
0,2020-01-01,75.0,41.0,0.0,1008.0,38.32,30.72,39.78,18.34,270.0,10001,10001
1,2020-01-01,75.0,41.0,0.0,1008.0,38.35,30.74,39.87,14.99,270.0,10002,10002
2,2020-01-01,75.0,41.0,0.0,1008.0,38.35,30.74,39.87,14.99,270.0,10003,10003
3,2020-01-01,75.0,41.0,0.25,1008.0,38.37,30.83,39.83,18.34,270.0,11101,11101
4,2020-01-01,75.0,41.0,0.0,1008.0,38.35,30.74,39.87,14.99,270.0,11201,11201


In [6]:
# check for missing values
df.isnull().sum()

date               0
cloud_cover        0
humidity           0
precipitation      0
pressure           0
temperature        0
temperature_min    0
temperature_max    0
wind_speed         0
wind_direction     0
zip_code           0
country_code       0
dtype: int64

In [13]:
#define ranges for weather parameters
weather_ranges = {
    'cloud_cover':{'max':100,'min':0}, 
    'humidity':{'max':100,'min':0}, 
    'precipitation':{'max':10,'min':0}, 
    'pressure':{'max':1100,'min':900}, 
    'temperature':{'max':120,'min':-20}, 
    'temperature_min':{'max':120,'min':-20}, 
    'temperature_max':{'max':120,'min':-20}, 
    'wind_speed':{'max':100,'min':0}, 
    'wind_direction':{'max':360,'min':0} 
} 
weather_ranges

{'cloud_cover': {'max': 100, 'min': 0},
 'humidity': {'max': 100, 'min': 0},
 'precipitation': {'max': 10, 'min': 0},
 'pressure': {'max': 1100, 'min': 900},
 'temperature': {'max': 120, 'min': -20},
 'temperature_min': {'max': 120, 'min': -20},
 'temperature_max': {'max': 120, 'min': -20},
 'wind_speed': {'max': 100, 'min': 0},
 'wind_direction': {'max': 360, 'min': 0}}

In [15]:
#check for outliers
outlier_counts=[]
col_count=0
for col in df.columns:
    if col in weather_ranges:
        col_count = len(df[(df[col] > weather_ranges.get(col,'{}').get('max')) | (df[col] < weather_ranges.get(col,'{}').get('min'))])
    if col_count > 0:
        outlier_counts.append({col:col_count})
        col_count=0
display(outlier_counts)

[{'precipitation': 636}]

In [17]:
# check for duplicates
df[df.duplicated(subset=["date","zip_code","country_code"], keep=False)]
#remove duplicates
df= df.drop_duplicates(subset=["date","zip_code","country_code"], keep='first')
            
            
            

In [18]:
#Verify dups removed
check_dups = df[df.duplicated(subset=["date", "zip_code","country_code"], keep=False)]
check_dups

Unnamed: 0,date,cloud_cover,humidity,precipitation,pressure,temperature,temperature_min,temperature_max,wind_speed,wind_direction,zip_code,country_code


In [21]:
# verify data types
date_cols=['date']
numeric_cols = ['cloud_cover','humidity','precipitation','pressure','temperature','temperature_min','temperature_max',
                'wind_speed','wind_direction','zip_code']
for col in date_cols:
    if df[col].dtype != 'datetime64[ns]':
        df.loc[:,col] = pd.to_datetime(df[col])
for col in numeric_cols:
    if df[col].dtype != 'float64':
        df.loc[:,col] = pd.to_numeric(df[col],errors='coerce')
            
    

In [22]:
# check for missing values
df.isnull().sum()

date               0
cloud_cover        0
humidity           0
precipitation      0
pressure           0
temperature        0
temperature_min    0
temperature_max    0
wind_speed         0
wind_direction     0
zip_code           0
country_code       0
dtype: int64

In [24]:
# Check for date gaps per zipcode
for zipcode in df['zip_code'].unique():
    zip_data = df[df['zip_code'] == zipcode].copy()
    zip_data = zip_data.sort_values('date')
    
    # Create expected date range
    date_range = pd.date_range(
        start=zip_data['date'].min(),
        end=zip_data['date'].max(),
        freq='D'
    )
    
    # Find missing dates
    existing_dates = set(zip_data['date'].dt.date)
    expected_dates = set(date_range.date)
    missing_dates = expected_dates - existing_dates
    
    if missing_dates:
        print(f"\nZipcode {zipcode}: {len(missing_dates)} missing dates")
        print(f"Sample missing dates: {sorted(list(missing_dates))[:5]}")


Zipcode 10001: 25 missing dates
Sample missing dates: [datetime.date(2022, 6, 5), datetime.date(2023, 2, 4), datetime.date(2023, 2, 5), datetime.date(2023, 2, 9), datetime.date(2023, 2, 10)]

Zipcode 10002: 24 missing dates
Sample missing dates: [datetime.date(2022, 6, 4), datetime.date(2023, 2, 4), datetime.date(2023, 2, 5), datetime.date(2023, 2, 9), datetime.date(2023, 2, 10)]

Zipcode 10003: 30 missing dates
Sample missing dates: [datetime.date(2022, 2, 1), datetime.date(2022, 6, 2), datetime.date(2022, 6, 4), datetime.date(2022, 6, 5), datetime.date(2023, 2, 4)]

Zipcode 11101: 32 missing dates
Sample missing dates: [datetime.date(2020, 5, 22), datetime.date(2021, 8, 20), datetime.date(2022, 2, 8), datetime.date(2022, 2, 11), datetime.date(2022, 6, 2)]

Zipcode 11201: 29 missing dates
Sample missing dates: [datetime.date(2022, 4, 17), datetime.date(2022, 6, 3), datetime.date(2022, 6, 4), datetime.date(2022, 6, 5), datetime.date(2022, 6, 6)]

Zipcode 12084: 24 missing dates
Sample

In [26]:
df_sorted = df.sort_values(['zip_code','country_code','date'])

In [27]:
df_sorted

Unnamed: 0,date,cloud_cover,humidity,precipitation,pressure,temperature,temperature_min,temperature_max,wind_speed,wind_direction,zip_code,country_code
0,2020-01-01,75.0,41.0,0.00,1008.0,38.32,30.72,39.78,18.34,270.0,10001,10001
6,2020-01-02,1.0,45.0,1.01,1015.0,45.82,27.99,47.03,13.96,247.0,10001,10001
20,2020-01-03,90.0,93.0,4.72,1009.0,45.00,42.13,46.36,9.93,249.0,10001,10001
26,2020-01-04,90.0,93.0,8.33,1003.0,47.39,41.74,48.96,9.17,320.0,10001,10001
32,2020-01-05,90.0,48.0,1.00,1011.0,39.94,32.67,40.51,27.07,321.0,10001,10001
...,...,...,...,...,...,...,...,...,...,...,...,...
7253,2023-05-19,75.0,59.0,0.00,1024.0,65.50,46.67,71.42,19.57,150.0,12084,12084
7259,2023-05-20,100.0,73.0,3.22,1017.0,66.58,53.17,67.24,14.97,150.0,12084,12084
7265,2023-05-21,75.0,72.0,0.00,1013.0,60.60,50.38,72.03,18.41,260.0,12084,12084
7271,2023-05-22,75.0,51.0,0.00,1023.0,66.72,49.78,72.72,10.36,360.0,12084,12084


In [None]:
#Save file to s3
parquet_buffer= io.BytesIO()
df_sorted.to_parquet(parquet_buffer)
parquet_buffer.seek(0)
cleaned_file_key =f"{processed_prefix}/{cleaned_file_name}"
s3_operations.store_object_in_s3(self, key=cleaned_file_key, body=parquet_buffer, transfer='upload_fileobj')