In [10]:
##############
### Test 1 ###
##############

# import sys
# print(sys.executable)
# import numpy
# import duckdb
# print("Everything works!")


##############
### Test 2 ###
##############

# import duckdb

# # Connect to an in-memory database (does not create a file on disk)
# con = duckdb.connect(database=':memory:', read_only=False)

# # Execute a query
# con.execute("CREATE TABLE my_table (id INTEGER, name VARCHAR);")
# con.execute("INSERT INTO my_table VALUES (1, 'Alice'), (2, 'Bob');")

# # Fetch results
# result = con.execute("SELECT * FROM my_table;").fetchdf()
# print(result)

# # Close the connection (optional for in-memory)
# con.close()


###########
### ETL ###
###########

###########
### Step 0: Load requried packages and raw data from CSV file into DuckDB
###########

import duckdb
import matplotlib.pyplot as plt



# Connect/create a DuckDB database file
con = duckdb.connect(database='etl_database.duckdb')

# Load raw data from CSV files into staging tables (temporary tables storing raw/cleaned data)
con.execute("""
            CREATE OR REPLACE TABLE stg_county_raw AS
            SELECT *
            FROM read_csv_auto('file1_county_population_center.csv', header=True);
            """)

con.execute("""
            CREATE OR REPLACE TABLE stg_eq_raw AS
            SELECT *
            FROM read_csv_auto('file2_eq_query.csv', header=True);
            """)

# desc1 = con.execute("DESCRIBE stg_county_raw;").fetchdf()
# desc2 = con.execute("DESCRIBE stg_eq_raw;").fetchdf()
# print(desc1)
# print(desc2)

###########
### Step 1: Create clean tables for ETL (counties, and eq_events)
########### 

#####
# 1.1 Create "counties" table
#####

# # Count the rows of raw staging tables
# n_raw_county_rows = con.execute("Select COUNT(*) FROM stg_county_raw;").fetchone()[0]
# print(f"Number of raw county rows: {n_raw_county_rows}")

con.execute("""
            CREATE OR REPLACE TABLE counties AS
            WITH base AS (
                SELECT
                    -- build 5-digit county FIPS code (ssccc)
                    LPAD(CAST(STATEFP AS VARCHAR), 2, '0') || LPAD(CAST(COUNTYFP AS VARCHAR), 3, '0') as fips,
                    -- select and rename other columns
                    TRIM(CAST(COUNAME AS VARCHAR)) AS county_name,
                    TRIM(CAST(STNAME AS VARCHAR)) AS state_name,
                    TRY_CAST(POPULATION AS BIGINT) AS population,
                    TRY_CAST(LATITUDE AS DOUBLE) AS lat_pop_cent,
                    TRY_CAST(LONGITUDE AS DOUBLE) AS lon_pop_cent
                FROM stg_county_raw
            ),
            qc AS (
                SELECT *
                FROM base
                WHERE fips IS NOT NULL
                    AND population IS NOT NULL
                    AND lat_pop_cent BETWEEN -90 AND 90
                    AND lon_pop_cent BETWEEN -180 AND 180
                    AND population >= 0
            )
            SELECT *
            FROM qc;
            """)

# # Count the rows of cleaned counties table
# n_final_county_rows = con.execute("Select COUNT(*) FROM counties;").fetchone()[0]
# print(f"Number of raw county rows: {n_final_county_rows}")

#####
# 1.2 Create "eq_events" table
#####

# # Count the rows of raw staging tables
# n_raw_eq_rows = con.execute("Select COUNT(*) FROM stg_eq_raw;").fetchone()[0]
# print(f"Number of raw eq rows: {n_raw_eq_rows}")

con.execute("""
            CREATE OR REPLACE TABLE eq_events AS
            WITH base AS (
                SELECT
                    -- Choose and standardize names and types
                    CAST(time AS TIMESTAMP) AS event_time,
                    CAST(id AS VARCHAR) AS event_id,
                    TRY_CAST(latitude AS DOUBLE) AS lat_event,
                    TRY_CAST(longitude AS DOUBLE) AS lon_event,
                    TRY_CAST(depth AS DOUBLE) AS depth_km,
                    TRY_CAST(mag AS DOUBLE) AS mag
                FROM stg_eq_raw
            ),
            qc AS (
                -- Basic quality control / outlier handling
                SELECT *
                FROM base
                WHERE event_time IS NOT NULL
                    AND lat_event BETWEEN -90 AND 90
                    AND lon_event BETWEEN -180 AND 180
                    AND depth_km >= 0
                    AND mag >= 0
            )
            SELECT *
            FROM qc;
            """)

# # Count the rows of cleaned eq_events table
# n_final_eq_rows = con.execute("Select COUNT(*) FROM eq_events;").fetchone()[0]
# print(f"Number of final eq rows: {n_final_eq_rows}")

#####
# 1.3 Quick sanity checks
#####
n_counties = con.execute("SELECT COUNT(*) FROM counties;").fetchone()[0]
n_eq = con.execute("SELECT COUNT(*) FROM eq_events;").fetchone()[0]
print(f"\nClean counties rows: {n_counties}")
print(f"Clean eq_events rows: {n_eq}") # 26 rows had depth_km < 0 and were removed

# Compute the number of earthquake events with magnitude >=7.0
n_eqGE7 = con.execute("SELECT COUNT(*) FROM eq_events WHERE mag >= 7.0;").fetchone()[0]
print((f"\nNumber of clean earthquake events with magnitude >= 7.0: {n_eqGE7}"))

print("\nETL Step 1 completed: 'counties' and 'eq_events' tables -> created.")


###########
### Step 2: Link event to counties based on proximity
########### 

#####
# 2.1 Create event-county link table with Haversine distance
#     Radius rule: (1) mag <  7.0 -> 200 km
#                  (2) mag >= 7.0 -> 250 km
#####

con.execute("""
            CREATE OR REPLACE TABLE event_county_links AS
            WITH pairs AS (
                SELECT
                    e.event_id,
                    e.event_time,
                    e.mag,
                    e.depth_km,
                    c.fips,
                    c.population,
                    -- Haversine distance in km (6371 km = Earth's radius)
                    6371.0 * 2 * ASIN(SQRT(POWER(SIN(((e.lat_event - c.lat_pop_cent)*PI()/180.0)/2.0),2)
                                           +
                                           COS(e.lat_event*PI()/180.0)*
                                           COS(c.lat_pop_cent*PI()/180.0)*
                                           POWER(SIN(((e.lon_event-c.lon_pop_cent)*PI()/180.0)/2.0),2)
                                        )
                                    ) AS distance_km
                FROM eq_events e
                CROSS JOIN counties c
            ),
            filtered AS (
                -- Filter to keep only the pairs with (mag<7 & distance_km<=200) OR (mag>=7 & distance_km<=250)
                SELECT *
                FROM pairs
                WHERE distance_km <= CASE 
                                        WHEN mag < 7.0 THEN 200.0
                                        ELSE 250.0
                                     END
            )
            SELECT
                fips,
                event_id,
                distance_km,
                mag,
                depth_km,
                event_time,
                population AS county_pop
            FROM filtered;
            """)

n_links = con.execute("SELECT COUNT(*) FROM event_county_links;").fetchone()[0]
print(f"\nNumber of event-county links within the two mag-dependent radius: {n_links}")


#####
# 2.2 QC: percentage of events linked and states exposed to events
#####

con.execute("""
            CREATE OR REPLACE TABLE qc_link_summary AS
            WITH total_e AS (
                SELECT COUNT(*) AS total_events FROM eq_events
            ),
            total_c AS (
                SELECT COUNT(*) AS total_counties FROM counties
            ),
            linked AS (
                SELECT COUNT(DISTINCT event_id) AS linked_events FROM event_county_links
            ),
            affected AS (
                SELECT COUNT(DISTINCT fips) AS affected_counties FROM event_county_links
            )
            SELECT
                total_events,
                linked_events,
                100.0*linked_events/total_events AS pct_events_linked,
                total_counties,
                affected_counties,
                100.0*affected_counties/total_counties AS pct_affected_counties
            FROM total_e, total_c, linked, affected;
""")

qc_link = con.execute("SELECT * FROM qc_link_summary;").fetchone()
print("\nQC - Event-Link Summary:")
print(qc_link)


#####
# 2.3 QC: Histogram of distances for linked event-county pairs
#####

df_links = con.execute("SELECT distance_km FROM event_county_links;").fetchdf()

plt.figure(figsize=(8,6))
df_links["distance_km"].hist(bins=30, color='skyblue', edgecolor='black')
plt.xlabel("Event-county distance (km)", fontsize=12, fontweight='bold')
plt.ylabel("Number of links", fontsize=12, fontweight='bold')
plt.title("Histogram of distances for linked event-county pairs", fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig("qc_distance_histogram.png", dpi=300)
plt.close()
print("\nQC - Histogram of event-county distances saved to 'qc_distance_histogram.png'.")  


#####
# 2.4 QC: Spot check a few big events (top 10 by to nearest county)
#####

con.execute("""
            CREATE OR REPLACE TABLE qc_top_events_nearest_county AS
            WITH ranked AS (
                SELECT
                    e.event_id,
                    e.event_time,
                    e.mag,
                    e.depth_km,
                    l.fips,
                    l.distance_km,
                    l.county_pop,
                    -- Assigning a consecutive row number (1,2,3,...) to each row within the group based on some ordering.
                    ROW_NUMBER() OVER (
                        -- Starting a new row number separately for each event_id
                        PARTITION BY e.event_id
                        -- Ordering the rows within each event_id by distance_km
                        ORDER BY l.distance_km
                ) AS row_num
            FROM eq_events e
            LEFT JOIN event_county_links l
                ON e.event_id = l.event_id
            )
            SELECT *
            FROM ranked
            WHERE row_num = 1 
                AND fips IS NOT NULL
            ORDER BY mag DESC
            LIMIT 10;
""")

qc_top_nearest_events = con.execute("SELECT * FROM qc_top_events_nearest_county;").fetchdf()
print("\nQC- Top 10 largest counties and their nearest counties:")
print(qc_top_nearest_events)

print("\nETL Step 2 completed: Link events to counties based on proximity.")


###########
### Step 3: Building hazard and risk metrics at county level
########### 

#####
# 3.1 For each county, compute raw hazard and exposure, standardized hazard and exposure, and risk index
#####

con.execute("""
            CREATE OR REPLACE TABLE county_hazard_risk AS
            WITH 
            -- (1) Compute raw hazard per county
            -- (NOTE: SQL first groups the rows, then applies the agg. functions, then produces one output)
            hazard AS (
                SELECT
                    fips,
                    SUM(POWER(mag,1.5)/(1+distance_km)) AS raw_hazard
                FROM event_county_links
                GROUP BY fips
            ),

            -- (2) Compute raw exposure per county
            exposure AS (
                SELECT
                    fips,
                    LOG(1+population) AS raw_exposure
                FROM counties
            ),

            -- (3) Join counties, hazard and exposure
            joined AS (
                SELECT
                    c.fips,
                    c.county_name,
                    c.state_name,
                    c.population,
                    h.raw_hazard,
                    e.raw_exposure
                FROM counties c
                LEFT JOIN hazard h ON c.fips = h.fips
                LEFT JOIN exposure e on c.fips = e.fips
            ),

            -- (4) Compute mean and stddev for hazard and exposure
            z_scores AS (
                SELECT
                    AVG(raw_hazard) AS mean_haz,
                    STDDEV(raw_hazard) AS stddev_haz,
                    AVG(raw_exposure) AS mean_expo,
                    STDDEV(raw_exposure) AS stddev_expo
                FROM joined
            ),

            -- (5) Compute standardized hazard, exposure, and risk index
            final AS (
                SELECT
                    j.*,
                    (j.raw_hazard - z.mean_haz) / z.stddev_haz AS z_hazard,
                    (j.raw_exposure - z.mean_expo) / z.stddev_expo AS z_exposure,
                    ((j.raw_hazard - z.mean_haz) / z.stddev_haz) * 
                    ((j.raw_exposure - z.mean_expo) / z.stddev_expo) AS risk_index
                FROM joined j, z_scores z
            )

            SELECT *
            FROM final
            ORDER BY risk_index DESC;
""")

final_table = con.execute("SELECT * FROM county_hazard_risk;").fetchdf()
print("\nQC- Top 10 largest risk index counties:")
print(final_table.head(10))


#####
# 3.2 Forming bar charts for the top 30 counties by risk index
#####

# Select the data
top_risk_df = con.execute("""
                        SELECT
                          fips,
                          county_name,
                          state_name,
                          risk_index
                        FROM county_hazard_risk
                        ORDER BY risk_index DESC
                        LIMIT 30;
""").fetchdf()

# Add a label 'county, state' for x axis
top_risk_df['label'] = top_risk_df['county_name'] + ',' + top_risk_df['state_name']

# Plot the figure
plt.figure(figsize=(12,6))
plt.bar(top_risk_df['label'], top_risk_df['risk_index'])
plt.xticks(rotation=70, ha="right", fontsize=12)
plt.ylabel('Risk index (z_hazard * z_exposure)', fontsize=12, fontweight='bold')
plt.title('Top 30 counties by earthquake risk index', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig("top30_counties_risk_bar.png", dpi=300)
plt.close()


#####
# 3.3 Forming scatter plot of hazard vs exposure (colored by risk index)
#####

# Select the data
scatter_df = con.execute("""
                        SELECT
                         fips,
                         county_name,
                         state_name,
                         raw_hazard,
                         raw_exposure,
                         risk_index
                        FROM county_hazard_risk
""").fetchdf()

# Plot the figure
plt.figure(figsize=(8,6))
sc = plt.scatter(scatter_df['raw_exposure'], scatter_df['raw_hazard'], c=scatter_df['risk_index'])
plt.xlabel('Raw exposure', fontsize=12, fontweight='bold')
plt.ylabel('Raw hazard', fontsize=12, fontweight='bold')
plt.title('County hazard vs exposure (colored by risk index)', fontsize=14, fontweight='bold')
cb = plt.colorbar(sc)
cb.set_label('Risk_index')
plt.savefig('haz_vs_expo_scatter.png', dpi = 300)
plt.close()


#####
# 3.4 Forming state-level risk summary
#####

con.execute("""
        CREATE OR REPLACE TABLE state_risk_summary AS
        SELECT
            state_name,
            COUNT(*) AS n_counties,
            SUM(population) AS tot_pop,
            AVG(raw_hazard) AS avg_raw_haz,
            AVG(raw_exposure) AS avg_raw_expo,
            AVG(risk_index) AS avg_risk_index,
            MAX(risk_index) AS max_risk_index
        FROM county_hazard_risk
        GROUP BY state_name
        ORDER BY avg_risk_index DESC;
""")

state_summary = con.execute("""
                            SELECT *
                            FROM state_risk_summary
                            ORDER BY avg_risk_index DESC
                            LIMIT 10;
""").fetchdf()

print("\nTop 10 states by average risk index:")
print(state_summary)


# Close the connection (optional for in-memory)
con.close()





Clean counties rows: 3221
Clean eq_events rows: 5646

Number of clean earthquake events with magnitude >= 7.0: 72

ETL Step 1 completed: 'counties' and 'eq_events' tables -> created.

Number of event-county links within the two mag-dependent radius: 11773

QC - Event-Link Summary:
(5646, 1364, 24.15869642224584, 3221, 1173, 36.417261719962745)

QC - Histogram of event-county distances saved to 'qc_distance_histogram.png'.

QC- Top 10 largest counties and their nearest counties:
       event_id              event_time  mag  depth_km   fips  distance_km  \
0  ak002e435qpj 2002-11-03 17:12:41.518  7.9     4.200  02068    86.824765   
1    usp0003aq8 1987-11-30 14:23:19.590  7.9    10.000  02282   199.989547   
2    hv19755025 1975-11-29 09:47:40.100  7.7     8.636  15001    58.533079   
3   ak0138esnzr 2013-01-05 03:58:14.957  7.5     8.700  02198   144.021159   
4    us7000qd1y 2025-07-16 16:37:41.667  7.3    38.000  02013   212.449042   
5    ci14607652 2010-04-04 18:40:42.360  7.2    