In [1]:
import requests
import pandas as pd 
from pandas import json_normalize
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta
from sqlalchemy import create_engine , inspect
import psycopg2
import re

In [2]:
load_dotenv()

api_key = os.getenv('weather_api_key')
base_url = 'http://api.weatherapi.com/v1'
history_url = base_url + "/history.json"

db_name = os.getenv('db_name')
user = os.getenv('user')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')

engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{db_name}')

capitals = [
    "Johor Bahru", 
    "Alor Setar", 
    "Kota Bharu", 
    "Melaka", 
    "Seremban", 
    "Kuantan", 
    "George Town", 
    "Ipoh", 
    "Kangar", 
    "Kota Kinabalu", 
    "Kuching", 
    "Shah Alam", 
    "Kuala Terengganu",
    "Kuala Lumpur" 
]
dates_string = [(datetime.now() - timedelta(day)).strftime("%Y-%m-%d") for day in range(1,9)]
dates = [(datetime.now() - timedelta(day)) for day in range(1,9)]

In [None]:
def get_hourly_history():
    for capital in capitals:    
        for date in dates_string:
            params = {'key': api_key, 'q': capital , 'dt': date}
            r = requests.get(history_url , params=params).json()
            df = pd.json_normalize(r)
            hours_df = json_normalize(df['forecast.forecastday'][0][0]['hour'])

            hours_df['location'] = df['location.name']
            hours_df['region'] = df['location.region']
            hours_df['country'] = df['location.country']
            hours_df['date'] = hours_df['time'].dt.date
            hours_df = hours_df.ffill(axis=0)
            capital = re.sub(r'\s+', '_', capital)
            return hours_df
            
            
            # hours_df.to_sql(f"{capital}_hourly" , if_exists='append' , index=False , con=engine)

hourly = get_hourly_history()
hourly.head()

In [None]:
dates_string = [(datetime.now() - timedelta(day)).strftime("%Y-%m-%d") for day in range(0,9)]
dict_empty = {}
df = pd.DataFrame(dict_empty)

for capital in capitals:    
    for date in dates_string:
                params = {'key': api_key, 'q': capital , 'dt': date}
                r = requests.get(history_url , params=params)
                response = r.json()
                raw_data = pd.json_normalize(response)
                hours_df = pd.json_normalize(raw_data['forecast.forecastday'][0][0]['hour'])
                hours_df['location'] = raw_data['location.name']
                hours_df = hours_df.ffill(axis=0)
                df = pd.concat([df , hours_df] , axis=0)
df.head(3)


        # hours_df['location'] = df['location.name']
        # hours_df['region'] = df['location.region']
        # hours_df['country'] = df['location.country']
        # hours_df['date'] = pd.to_datetime(hours_df['time']).dt.date
        # hours_df = hours_df.ffill(axis=0)

# hours_df

In [None]:
dates_string = [(datetime.now() - timedelta(day)).strftime("%Y-%m-%d") for day in range(1,9)]
dict_empty = {}
daily_df = pd.DataFrame(dict_empty)

for capital in capitals:
    for date in dates_string:
        params = {'key': api_key, 'q': capital , 'dt': date}
        response = requests.get(history_url , params=params)
        day_history_data = response.json()
        day_raw_data = pd.json_normalize(day_history_data)

        days_df = pd.json_normalize(day_raw_data['forecast.forecastday'][0][0]['day'])
        days_df['date'] = day_raw_data['forecast.forecastday'][0][0]['date']
        days_df['location'] = day_raw_data['location.name']
        days_df = days_df.rename(columns={"condition.text":"condition"} , inplace=False).drop(columns=['condition.icon','condition.code'] , axis=1)
        capital = re.sub(r'\s+', '_', capital)
        with engine.begin() as connection:
            days_df.to_sql(f"{capital}_daily" , if_exists='append' , index=False , con=connection)


In [57]:
dates_string = [(datetime.now() - timedelta(day)).strftime("%Y-%m-%d") for day in range(0,9)]
dates_string

['2025-01-09',
 '2025-01-08',
 '2025-01-07',
 '2025-01-06',
 '2025-01-05',
 '2025-01-04',
 '2025-01-03',
 '2025-01-02',
 '2025-01-01']

In [56]:
dates_string = [(datetime.now() - timedelta(day)).strftime("%Y-%m-%d") for day in range(0,9)]
dict_empty = {}
df = pd.DataFrame(dict_empty)

for capital in capitals:    
    for date in dates_string:
                params = {'key': api_key, 'q': capital , 'dt': date}
                r = requests.get(history_url , params=params)
                response = r.json()
                raw_data = pd.json_normalize(response)
                hours_df = pd.json_normalize(raw_data['forecast.forecastday'][0][0]['hour'])
                hours_df['location'] = raw_data['location.name']
                hours_df = hours_df.ffill(axis=0)
                df = pd.concat([df , hours_df] , axis=0)
df

Unnamed: 0,time_epoch,time,temp_c,temp_f,is_day,wind_mph,wind_kph,wind_degree,wind_dir,pressure_mb,...,chance_of_snow,vis_km,vis_miles,gust_mph,gust_kph,uv,condition.text,condition.icon,condition.code,location
0,1736352000,2025-01-09 00:00,25.2,77.3,0,8.7,14.0,7,N,1011.0,...,0,10.0,6.0,14.1,22.7,0.0,Partly cloudy,//cdn.weatherapi.com/weather/64x64/night/116.png,1003,Johor Bahru
1,1736355600,2025-01-09 01:00,25.1,77.1,0,8.9,14.4,9,N,1011.0,...,0,10.0,6.0,14.0,22.5,0.0,Clear,//cdn.weatherapi.com/weather/64x64/night/113.png,1000,Johor Bahru
2,1736359200,2025-01-09 02:00,25.0,77.0,0,8.5,13.7,10,N,1011.0,...,0,10.0,6.0,13.1,21.1,0.0,Clear,//cdn.weatherapi.com/weather/64x64/night/113.png,1000,Johor Bahru
3,1736362800,2025-01-09 03:00,24.9,76.8,0,8.9,14.4,10,N,1010.0,...,0,10.0,6.0,13.8,22.1,0.0,Partly cloudy,//cdn.weatherapi.com/weather/64x64/night/116.png,1003,Johor Bahru
4,1736366400,2025-01-09 04:00,24.7,76.5,0,8.5,13.7,11,NNE,1009.0,...,0,10.0,6.0,13.2,21.2,0.0,Clear,//cdn.weatherapi.com/weather/64x64/night/113.png,1000,Johor Bahru
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
19,1735729200,2025-01-01 19:00,25.1,77.1,1,2.7,4.3,290,WNW,1008.0,...,0,10.0,6.0,4.9,7.8,6.0,Light rain shower,//cdn.weatherapi.com/weather/64x64/day/353.png,1240,Kuala Lumpur
20,1735732800,2025-01-01 20:00,24.0,75.3,0,1.8,2.9,322,NW,1009.0,...,0,10.0,6.0,3.5,5.6,0.0,Patchy rain possible,//cdn.weatherapi.com/weather/64x64/night/176.png,1063,Kuala Lumpur
21,1735736400,2025-01-01 21:00,23.8,74.9,0,1.6,2.5,348,NNW,1010.0,...,0,10.0,6.0,3.1,4.9,0.0,Patchy rain possible,//cdn.weatherapi.com/weather/64x64/night/176.png,1063,Kuala Lumpur
22,1735740000,2025-01-01 22:00,23.6,74.4,0,2.0,3.2,356,N,1011.0,...,0,10.0,6.0,4.0,6.5,0.0,Partly cloudy,//cdn.weatherapi.com/weather/64x64/night/116.png,1003,Kuala Lumpur


In [52]:
import pandas as pd

# DataFrame pertama
df1 = pd.DataFrame({})

# DataFrame kedua
df2 = pd.DataFrame({
    'A': [7, 8],
    'B': [9, 10]
})

# Menambahkan df2 ke df1
df_combined = pd.concat([df1, df2], axis=0)

print(df_combined)


   A   B
0  7   9
1  8  10


In [None]:
def get_hourly_history():
    history_url = base_url + "/history.json"
    dates = [(datetime.now() - timedelta(day)).strftime("%Y-%m-%d") for day in range(1,9)]
    hourly_dict = {}
    
    for date in dates:
        for capital in capitals:
            params = {"key": api_key, "q": capital, "dt": date}
            try:
                response = requests.get(history_url, params=params)
                if response.status_code == 200:
                    history_data = response.json()
                    df = pd.json_normalize(history_data['forecast']['forecastday'][0]['hour'])
                    # hourly = history_data['forecast']['forecastday'][0]['hour']
                    
                    # hourly_dict = {}
                    
                    # for d in hourly:
                    #     for key, value in d.items():
                    #         if key in hourly_dict:
                    #             hourly_dict[key].append(value)
                    #         else:
                    #             hourly_dict[key] = [value]
                    # df = pd.DataFrame(hourly_dict)
                    # print(df.head())
                    table_name = f"{capital}_{date}"
                    df.to_sql(table_name, engine, if_exists='replace', index=False)

                    # inspector = inspect(engine)
                    # if inspector.has_table(table_name):
                    #     print(f"{table_name} is already existed, skipping...")
                    #     continue
                    # else:
                    #     df.to_sql(table_name, engine, if_exists='replace', index=False)

                else:
                    print(f"Error: Received unexpected status code {response.status_code} on {capital}")

            except requests.exceptions.RequestException as e:
                print(f"An error occurred: {e}")

get_hourly_history()

In [78]:
def get_hourly_history():
    folder_path = 'hourly_data'
    os.makedirs(folder_path, exist_ok=True)
    history_url = base_url + "/history.json"

    dates = [(datetime.now() - timedelta(day)).strftime("%Y-%m-%d") for day in range(1,9)]
    hourly_dict = {}
    
    for date in dates[:1]:
        for capital in capitals[:1]:
            params = {"key": api_key, "q": capital, "dt": date}
            try:
                response = requests.get(history_url, params=params)
                if response.status_code == 200:
                    history_data = response.json()

                    hourly = history_data['forecast']['forecastday'][0]['hour']
                    
                    # hourly_dict = {}
                    
                    for d in hourly:
                        for key, value in d.items():
                            if key in hourly_dict:
                                hourly_dict[key].append(value)
                            else:
                                hourly_dict[key] = [value]
                    df = pd.DataFrame(hourly_dict)
                    df = pd.json_normalize(df)
                    print(df.head())

                    # file_name = f"{capital}_{date}"
                    # file_path = os.path.join(folder_path , file_name)

                    # if os.path.exists(file_path):
                    #     print(f"{file_path} is already existed, skipping...")
                    #     continue
                    # else:
                    #     df.to_csv(file_path , index=False, header=True, encoding=None)
                else:
                    print(f"Error: Received unexpected status code {response.status_code} on {capital}")

            except requests.exceptions.RequestException as e:
                print(f"An error occurred: {e}")

get_hourly_history()

Empty DataFrame
Columns: []
Index: [0, 1, 2, 3, 4]


In [19]:
datetime.today().date()

datetime.date(2025, 1, 22)

In [None]:
from airflow.operators.bash_operator import BashOperator


In [None]:
import psycopg2
from dotenv import load_dotenv


load_dotenv()

# Database connection parameters
conn = psycopg2.connect(
    dbname = os.getenv('db_name'), 
    user = os.getenv('user'), 
    password = os.getenv('password'), 
    host = os.getenv('host'), 
    port = os.getenv('port')
)

# Create a cursor object
cursor = conn.cursor()

# Execute a query to list tables
cursor.execute("""
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = 'public'
""")

# Fetch all table names
tables = cursor.fetchall()

# Print the table names
for table in tables:
    print(table[0])

# Close the cursor and connection
cursor.close()
conn.close()


In [11]:
import requests
import pandas as pd 
from pandas import json_normalize
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta
from sqlalchemy import create_engine , inspect
import psycopg2
import re

In [None]:
load_dotenv()

api_key = os.getenv('weather_api_key')
base_url = 'http://api.weatherapi.com/v1'
history_url = base_url + "/history.json"

db_name = os.getenv('db_name')
user = os.getenv('user')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')

engine = create_engine(f'postgresql+psycopg2://{user}:{password}@{host}:{port}/{db_name}')

capitals = [
    "johor bahru", 
    "alor setar", 
    "kota bharu", 
    "melaka", 
    "seremban", 
    "kuantan", 
    "george town", 
    "ipoh", 
    "kangar", 
    "kota kinabalu", 
    "kuching", 
    "shah alam", 
    "kuala terengganu",
    "kuala lumpur" 
]

dates = [(datetime.now() - timedelta(day)).strftime("%Y-%m-%d") for day in range(1,9)]

def get_daily_history():
    for capital in capitals:
        for date in dates:
            params = {'key': api_key, 'q': capital , 'dt': date}
            response = requests.get(history_url , params=params)
            day_history_data = response.json()
            day_raw_data = pd.json_normalize(day_history_data)

            days_df = pd.json_normalize(day_raw_data['forecast.forecastday'][0][0]['day'])
            days_df['date'] = day_raw_data['forecast.forecastday'][0][0]['date']
            days_df['location'] = day_raw_data['location.name']
            days_df = days_df.rename(columns={"condition.text":"condition"} , inplace=False).drop(columns=['condition.icon','condition.code'] , axis=1)
            # display(days_df)

            capital = re.sub(r'\s+', '_', capital)
            with engine.begin() as connection:
                
                # days_df.to_sql(f"{capital}_daily" , if_exists='fail' , index=False , con=connection)

get_daily_history()

: 

In [14]:
import requests
import pandas as pd
from bs4 import BeautifulSoup

base_url = 'http://api.weatherapi.com/v1'
history_url = base_url + "/history.json"
params = {'key': '98078c8de2274791b03161315240410', 'q': 'kuching' , 'dt': '2025-01-15'}
r = requests.get(history_url, params=params)



# r.json()['forecast.forecastday'][0][0]['hour']

In [19]:
r = r.json()
r.keys()

dict_keys(['location', 'forecast'])

In [22]:
r['forecast'].keys()

dict_keys(['forecastday'])

In [34]:
r['forecast']['forecastday'][0].keys()

dict_keys(['date', 'date_epoch', 'day', 'astro', 'hour'])

In [33]:
r['forecast']['forecastday'][0]['date']

'2025-01-15'

In [35]:
r['forecast']['forecastday'][0]['day']

{'maxtemp_c': 31.1,
 'maxtemp_f': 88.1,
 'mintemp_c': 22.0,
 'mintemp_f': 71.7,
 'avgtemp_c': 25.3,
 'avgtemp_f': 77.5,
 'maxwind_mph': 4.0,
 'maxwind_kph': 6.5,
 'totalprecip_mm': 2.38,
 'totalprecip_in': 0.09,
 'totalsnow_cm': 0.0,
 'avgvis_km': 6.2,
 'avgvis_miles': 3.0,
 'avghumidity': 86,
 'daily_will_it_rain': 1,
 'daily_chance_of_rain': 100,
 'daily_will_it_snow': 0,
 'daily_chance_of_snow': 0,
 'condition': {'text': 'Light rain shower',
  'icon': '//cdn.weatherapi.com/weather/64x64/day/353.png',
  'code': 1240},
 'uv': 7.0}

In [None]:
drop table ipoh_daily_staging;

CREATE TABLE ipoh_daily_staging AS
WITH 
duplicate_cte AS (SELECT * , ROW_NUMBER() OVER(PARTITION BY date) AS row_num FROM ipoh_daily),
corrected_cte AS (SELECT * FROM duplicate_cte where row_num = 1),
selected_columns AS (SELECT location , date ,maxtemp_c , mintemp_c , avgtemp_c , maxwind_kph , avghumidity , condition FROM corrected_cte)
SELECT * FROM selected_columns;



SELECT * FROM ipoh_daily_staging;



SELECT * FROM ipoh_daily;

# ==================================

-- staging table
DROP TABLE ipoh_hourly_staging;

CREATE TABLE ipoh_hourly_staging AS
WITH 
rownum_cte AS (SELECT * , ROW_NUMBER() OVER(PARTITION BY time) AS duplicate_row FROM ipoh_hourly),
removeduplicate_cte AS (SELECT * FROM rownum_cte WHERE duplicate_row = 1),
selectedcolumn_cte AS (SELECT location , time , temp_c , feelslike_c , is_day , wind_kph , humidity , cloud , heatindex_c , dewpoint_c , uv , "condition.text" AS condition FROM removeduplicate_cte)
SELECT *
FROM selectedcolumn_cte;

select * from ipoh_hourly_staging

-- split date and time columns
ALTER TABLE ipoh_hourly_staging
ADD COLUMN date_part date,
ADD COLUMN time_part time;


UPDATE ipoh_hourly_staging
SET date_part = (time::timestamp)::date , time_part = (time::timestamp)::time;

-- time when it was the hottest of the day
CREATE TABLE ipoh_hourly_hottest AS
WITH time_cte AS(
SELECT 
    t1.location,
	t1.date_part AS date, 
    t1.time_part AS time, 
    t1.temp_c AS max_temp
FROM ipoh_hourly_staging t1
WHERE t1.temp_c = (SELECT MAX(t2.temp_c) FROM ipoh_hourly_staging t2 WHERE t2.date_part = t1.date_part))
SELECT *
FROM time_cte;

SELECT * FROM ipoh_hourly_hottest ORDER BY date;

# --=========================================================================================================================================================

# drop table ipoh_hourly_hottest


# drop table ipoh_hourly_hottest









In [1]:
import requests
import pandas as pd 
from pandas import json_normalize
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta
from sqlalchemy import create_engine , inspect
import psycopg2
import re

In [2]:
load_dotenv()

db_name = os.getenv('db_name')
user = os.getenv('user')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')

print(db_name , ':' , user , ':' , password , ':' , host , ':' , port )

postgres : airflow : airflow : localhost : 5432


In [3]:
import psycopg2
from psycopg2 import OperationalError
from dotenv import load_dotenv 
import os

load_dotenv()

def test_connection():
    # Replace these values with your actual database credentials
    db_name = os.getenv('db_name')
    user = os.getenv('user')
    password = os.getenv('password')
    host = os.getenv('host')
    port = os.getenv('port')

    try:
        # Establish the connection
        connection = psycopg2.connect(
            host=host,
            port=port,
            database=db_name,
            user=user,
            password=password
        )
        
        # If the connection is successful
        print("Connection to PostgreSQL DB successful")
        
        # Close the connection
        connection.close()
    except OperationalError as e:
        # If the connection fails
        print(f"The error '{e}' occurred")

# Call the function to test the connection
test_connection()


Connection to PostgreSQL DB successful


In [None]:
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError

def test_sqlalchemy_connection():
    # Replace with your actual database credentials
    # username = 'your_username'
    # password = 'your_password'
    # host = 'your_host'
    # port = '5432'  # Default PostgreSQL port
    # database = 'your_database'

    # Create the connection string
    connection_string = f'postgresql+psycopg2://airflow:airflow@localhost:5432/postgres'

    # Create the SQLAlchemy engine
    engine = create_engine(connection_string)

    try:
        # Attempt to connect
        with engine.connect() as connection:
            print("Connection to PostgreSQL DB successful")
    except OperationalError as e:
        print(f"Error occurred: {e}")

# Run the test
test_sqlalchemy_connection()


Connection to PostgreSQL DB successful


In [9]:
connection_string = f'postgresql://airflow:airflow@localhost:5432/postgres'

engine = create_engine(connection_string)

print(type(engine.begin()))

<class 'sqlalchemy.engine.base.Engine._trans_ctx'>
