# plan
- define province names
- create entrypoint for main flow
- create subtasks
  - get api results and parse to dataframe
  - write the dataframe to lakefs


In [35]:
import requests
import pandas as pd
from datetime import datetime
import pytz
from prefect import flow, task # Prefect flow and task decorators

@task
def get_weather_data(province_context={'province':None, 'lat':None, 'lon':None}):
    # API endpoint and parameters
    WEATHER_ENDPOINT = "https://api.openweathermap.org/data/2.5/weather"
    API_KEY = "70e208d9d8ba1534136297fb1f3fe396"  # Replace with your actual API key
    province=province_context['province']
    
    params = {
        "lat": province_context['lat'],
        "lon": province_context['lon'],
        "appid": API_KEY,
        "units": "metric",
        "lang": "th"
    }
    try:
        # Make API request
        response = requests.get(WEATHER_ENDPOINT, params=params)
        response.raise_for_status()  # Raise an exception for bad status codes
        data = response.json()
        
        # Convert timestamp to datetime
        # created_at = datetime.fromtimestamp(data['dt'])

        dt = datetime.now()
        thai_tz = pytz.timezone('Asia/Bangkok')
        created_at = dt.replace(tzinfo=thai_tz)


        timestamp = datetime.now()
        
        # Create dictionary with required fields
        weather_dict = {
            'timestamp': timestamp,
            'year': timestamp.year,
            'month': timestamp.month,
            'day': timestamp.day,
            'hour': timestamp.hour,
            'minute': timestamp.minute,
            'created_at': created_at,
            'location': location_name,
            'temperature': data['main']['temp'],
            'feels_like': data['main']['feels_like'],
            'humidity': data['main']['humidity'],
            'pressure': data['main']['pressure'],
            'wind_speed': data['wind']['speed'],
            'visibility': data.get('visibility'),
            'weather_main': data['weather'][0]['main'],
            'weather_description': data['weather'][0]['description']
        }
        
        # Create DataFrame
        # df = pd.DataFrame([weather_dict])
        
        # return df
        return weather_dict

    
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")
        return None
    except KeyError as e:
        print(f"Error processing data: Missing key {e}")
        return None

In [36]:
provinces = {
    "Satitram Alumni": {
        "lat": 13.752916,
        "lon": 100.618616
    },
    "Siam Paragon": {
        "lat": 13.746239,
        "lon": 100.534345
    },
    "MBK Center": {
        "lat": 13.746141,
        "lon": 100.530547
    },
    "Chulalongkorn University": {
        "lat": 13.746724,
        "lon": 100.530898
    },
    "Erawan Shrine": {
        "lat": 13.746656,
        "lon": 100.541333
    },
    "CentralWorld": {
        "lat": 13.746774,
        "lon": 100.539462
    },
    "Pratunam Market": {
        "lat": 13.749609,
        "lon": 100.539115
    },
    "Jim Thompson House": {
        "lat": 13.746603,
        "lon": 100.529531
    },
    "Bangkok Art and Culture Centre": {
        "lat": 13.746599,
        "lon": 100.531101
    },
    "Lumphini Park": {
        "lat": 13.727924,
        "lon": 100.542287
    },
    "Wat Arun": {
        "lat": 13.743682,
        "lon": 100.488146
    },
    "Wat Pho": {
        "lat": 13.746697,
        "lon": 100.493469
    },
    "Grand Palace": {
        "lat": 13.750046,
        "lon": 100.491346
    },
    "Asiatique The Riverfront": {
        "lat": 13.703379,
        "lon": 100.509145
    },
    "ICONSIAM": {
        "lat": 13.723621,
        "lon": 100.517333
    },
    "Khao San Road": {
        "lat": 13.749136,
        "lon": 100.495136
    },
    "Terminal 21": {
        "lat": 13.736657,
        "lon": 100.561172
    },
    "The Mall Bangkapi": {
        "lat": 13.767090,
        "lon": 100.640134
    },
    "Dusit Zoo": {
        "lat": 13.766410,
        "lon": 100.525019
    },
    "Sukhumvit Road": {
        "lat": 13.731731,
        "lon": 100.571259
    }
}
province='Satitram Alumni'
province_context={
    'province':province,
    'lat':provinces[province]['lat'],
    'lon':provinces[province]['lon'],
}
get_weather_data(province_context)

NameError: name 'location_name' is not defined

In [28]:
locations = {
    "Satitram Alumni": {
        "lat": 13.752916,
        "lon": 100.618616
    }
}

location = "Satitram Alumni"
location_context = {
    'location': location,
    'lat': locations[location]['lat'],
    'lon': locations[location]['lon'],
}

get_weather_data("Satitram Alumni")

{'timestamp': datetime.datetime(2025, 5, 9, 16, 1, 8, 102102, tzinfo=<DstTzInfo 'Asia/Bangkok' +07+7:00:00 STD>),
 'year': 2025,
 'month': 5,
 'day': 9,
 'hour': 16,
 'minute': 1,
 'created_at': datetime.datetime(2025, 5, 9, 15, 59, 31, tzinfo=<DstTzInfo 'Asia/Bangkok' +07+7:00:00 STD>),
 'location': 'Satitram Alumni',
 'temperature': 35.3,
 'feels_like': 42.3,
 'humidity': 58,
 'pressure': 1003,
 'wind_speed': 4.12,
 'visibility': 10000,
 'weather_main': 'Thunderstorm',
 'weather_description': 'พายุฟ้าคะนอง'}

In [26]:
from prefect import flow
import pandas as pd
from datetime import datetime

# Define the locations dictionary
locations = {
    "Satitram Alumni": {
        "lat": 13.752916,
        "lon": 100.618616
    }
}

# Define the function that fetches weather data
def get_weather_data(location_name):
    lat = locations[location_name]['lat']
    lon = locations[location_name]['lon']

    # Your API call logic here (Make sure the API is properly set up)

    # Just for illustration, let's return a mock weather data dict
    return {
        "location": location_name,
        "lat": lat,
        "lon": lon,
        "temperature": 30,  # Mock data
        "humidity": 70,     # Mock data
        "timestamp": datetime.now()  # Add timestamp for partitioning
    }

@flow(name="main-flow", log_prints=True)
def main_flow():
    # Fetch the weather data for Satitram Alumni only
    weather_data = get_weather_data('Satitram Alumni')

    # Convert to DataFrame
    df = pd.DataFrame([weather_data])

    # Add partition columns from the timestamp
    df['year'] = df['timestamp'].dt.year
    df['month'] = df['timestamp'].dt.month
    df['day'] = df['timestamp'].dt.day
    df['hour'] = df['timestamp'].dt.hour

    # lakeFS credentials from your docker-compose.yml
    ACCESS_KEY = "access_key"
    SECRET_KEY = "secret_key"
    
    # lakeFS endpoint (running locally)
    lakefs_endpoint = "http://lakefs-dev:8000/"
    
    # lakeFS repository, branch, and file path
    repo = "weather"
    branch = "main"
    path = "weather.parquet"
    
    # Construct the full lakeFS S3-compatible path
    lakefs_s3_path = f"s3a://{repo}/{branch}/{path}"
    
    # Configure storage_options for lakeFS (S3-compatible)
    storage_options = {
        "key": ACCESS_KEY,
        "secret": SECRET_KEY,
        "client_kwargs": {
            "endpoint_url": lakefs_endpoint
        }
    }
    
    # Save to lakeFS
    df.to_parquet(
        lakefs_s3_path,
        storage_options=storage_options,
        partition_cols=['year', 'month', 'day', 'hour'],
    )

# Run the main flow
main_flow()