#### Libraries

In [None]:
#!pip install clickhouse-connect

In [None]:
import requests
import pandas as pd
import clickhouse_connect
import logging
import os

from config import ch_conn_string
from dotenv import load_dotenv

import warnings
warnings.filterwarnings('ignore')

logging.basicConfig(level=logging.INFO)

#### Check Connection to ClickHouse

In [None]:
try:
    client = clickhouse_connect.get_client(
        host=ch_conn_string["host"],
        port=ch_conn_string["port"],
        username=ch_conn_string["username"],
        password=ch_conn_string["password"],
        database=ch_conn_string["database"]
    )
    logging.info("Connected to ClickHouse successfully.")
except Exception as e:
    logging.error(f"An error occurred: {str(e)}", exc_info=True)

### Data Source = Weathersatck API

#### 1. Create Table for Store Weather Data

In [None]:
print("Create Table for Store Weather Data")

table_name = "historical_weather"

create_table_query = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
    country String,
    city String,
    date Date,
    time Int64,
    temperature Float64,
    humidity Float64,
    wind_speed Float64,
    weather_description String
) ENGINE = MergeTree()
ORDER BY (city, date, time);
"""

client.command(create_table_query)
logging.info(f"'{table_name}' created successfully.")

#### 2. Get Historical Weather using API

In [None]:
print("Load API Key")
load_dotenv()
API_KEY = os.getenv("WEATHERSTACK_API_KEY")
logging.info("API valid.")

# Setup Parameters
cities = ["Bandung", "Jakarta", "Surabaya", "Makassar", "Medan"]
periods = [
    ("2021-01-01", "2021-02-28"),  # The first peak of COVID
    ("2022-01-01", "2022-02-28")   # The second peak of COVID
]

#### 3. Fetch Data

In [None]:
from datetime import datetime

weather_data = []

# Truncate table before inserting new data
truncate_query = f"TRUNCATE TABLE {table_name};"
client.command(truncate_query)
logging.info(f"Table {table_name} cleaned successfully.")

# Loop for each city and period
for city in cities:
    for start_date, end_date in periods:
        url = f"http://api.weatherstack.com/historical?access_key={API_KEY}&query={city}&historical_date_start={start_date}&historical_date_end={end_date}&hourly=1&units=m"
        
        response = requests.get(url)
        data = response.json()

        if not data.get("success", True):  
            print(f"Error fetching data for {city}: {data.get('error', {})}")
            continue

        country = data.get("location", {}).get("country", "Unknown")
        
        # Parse Data with Date Conversion
        for date_str, details in data["historical"].items():
            date_obj = datetime.strptime(date_str, "%Y-%m-%d").date()
            
            for hour in details["hourly"]:
                weather_data.append([
                    country,
                    city, 
                    date_obj,
                    int(hour["time"]), 
                    hour["temperature"], 
                    hour["humidity"],
                    hour["wind_speed"], 
                    hour["weather_descriptions"][0]
                ])

if weather_data:
    client.insert(table_name, weather_data, column_names=["country", "city", "date", "time", "temperature", "humidity", "wind_speed", "weather_description"])
    print(f"Inserted {len(weather_data)} records into '{table_name}'")
else:
    print("No data to insert.")

### Data Source = OWID

In [None]:
# List CSV URLs
datasets = {
    "cases_deaths": "https://catalog.ourworldindata.org/garden/covid/latest/cases_deaths/cases_deaths.csv",
    "hospital": "https://catalog.ourworldindata.org/garden/covid/latest/hospital/hospital.csv",
    "vaccination": "https://catalog.ourworldindata.org/garden/covid/latest/vaccinations_global/vaccinations_global.csv",
    "testing": "https://catalog.ourworldindata.org/garden/covid/latest/testing/testing.csv"
}

# Define ClickHouse data types mapping
dtype_mapping = {
    "object": "String",
    "int64": "Int64",
    "float64": "Float64",
    "datetime64": "DateTime"
}

# Create Tables & Insert Data
for table_name, url in datasets.items():
    print(f"Processing {table_name}...")

    # Load CSV (only first 5 rows to get schema)
    df = pd.read_csv(url, nrows=5)

    # Define columns with proper types
    columns = []
    for col, dtype in df.dtypes.items():
        # Default to String if unknown
        clickhouse_type = dtype_mapping.get(str(dtype), "String")  
        columns.append(f"{col} {clickhouse_type}")

    # Create Table Query
    create_table_query = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        {", ".join(columns)}
    ) ENGINE = MergeTree()
    ORDER BY tuple();
    """
    client.command(create_table_query)
    print(f"Table '{table_name}' created successfully.")

    # Truncate table before inserting new data
    truncate_query = f"""TRUNCATE TABLE {table_name};"""
    client.command(truncate_query)
    logging.info(f"Table {table_name} cleaned successfully.")

    # Insert Data
    print(f"Inserting data into {table_name}...")
    df = pd.read_csv(url)  # Load full CSV

    # Replace NaN/NULL values:
    df = df.fillna({
        # Replace NaN in numbers with 0, in text with ""
        col: 0 if df[col].dtype in ['int64', 'float64'] else ""  
        for col in df.columns
    })

    # Insert into ClickHouse
    client.insert(table_name, df.to_records(index=False), column_names=list(df.columns))
    print(f"Inserted {len(df)} rows into '{table_name}'.")

#### (Optional) Load Data Result

In [None]:
import configparser

# Read INI file
config = configparser.ConfigParser()
config.read("query_analytics.ini")

# Extract Materialized View details
section = "materialized_view_covide_weather"
view_name = config[section]["name"]
engine = config[section]["engine"]
order_by = config[section]["order_by"]
query = config[section]["query"]

# Drop existing view if needed
client.command(f"DROP VIEW IF EXISTS {view_name}")

# Execute the query from INI
client.command(query)

print(f"Materialized View '{view_name}' created successfully!")