In [7]:
import sqlite3
import os
import time

# Define database file and data directory
DB_WTHR_FILE = "new_weather_data.db"
DATA_DIR = "../wx_data"

# Function to Create table if not exists Using SQLite 3 Connection Object
def create_table():
    conn = sqlite3.connect(DB_WTHR_FILE)
    cursor = conn.cursor()
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS new_weather_data (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            station_id TEXT NOT NULL,
            date TEXT CHECK (length(date) = 8) NOT NULL,
            max_temp INTEGER CHECK (max_temp >= -9999),
            min_temp INTEGER CHECK (min_temp >= -9999),
            precipitation INTEGER CHECK (precipitation >= -9999),
            UNIQUE (station_id, date)
        );
    """)
    conn.commit()
    conn.close()
    print("Table created successfully (if not already exists).")

# Run this function
create_table()


Table created successfully (if not already exists).


In [8]:
def insert_data(file_path):
    conn = sqlite3.connect(DB_WTHR_FILE)
    cursor = conn.cursor()
    
    # Extract station ID from filename
    station_id = os.path.basename(file_path).split('.')[0]  
    inserted_count = 0
    
    with open(file_path, "r") as file:
        for line in file:
            parts = line.strip().split("\t")
            if len(parts) != 4:
                continue  # Skip invalid lines
            
            date, max_temp, min_temp, precipitation = parts
            
            # Handle malformed data
            try:
                max_temp, min_temp, precipitation = int(max_temp), int(min_temp), int(precipitation)
            except ValueError:
                continue
            
            # Insert data with exception handling for duplicates
            try:
                cursor.execute("""
                    INSERT INTO new_weather_data (station_id, date, max_temp, min_temp, precipitation)
                    VALUES (?, ?, ?, ?, ?)
                """, (station_id, date, max_temp, min_temp, precipitation))
                inserted_count += 1
            except sqlite3.IntegrityError:
                continue  # Skip duplicates

    conn.commit()
    conn.close()
    print(f"{inserted_count} records inserted from {os.path.basename(file_path)}")
    return inserted_count


In [9]:
def process_all_files():
    start_time = time.time()
    total_inserted = 0

    print("Data ingestion started...\n")
    
    if not os.path.exists(DATA_DIR):
        print(f"Data directory '{DATA_DIR}' not found. Please check the path.")
        return

    files = [f for f in os.listdir(DATA_DIR) if os.path.isfile(os.path.join(DATA_DIR, f))]
    
    if not files:
        print("No files found in the data directory.")
        return
    
    for filename in files:
        file_path = os.path.join(DATA_DIR, filename)
        print(f"Processing {filename}...")
        inserted_count = insert_data(file_path)
        total_inserted += inserted_count
    
    end_time = time.time()
    print("\n Data ingestion completed.")
    print(f"Total time taken: {end_time - start_time:.2f} seconds.")
    print(f"Total records inserted: {total_inserted}")

# Run the data ingestion process
process_all_files()


Data ingestion started...

Processing USC00257715.txt...
10957 records inserted from USC00257715.txt
Processing USC00113879.txt...
9074 records inserted from USC00113879.txt
Processing USC00127935.txt...
10380 records inserted from USC00127935.txt
Processing USC00112193.txt...
10653 records inserted from USC00112193.txt
Processing USC00250640.txt...
10957 records inserted from USC00250640.txt
Processing USC00257070.txt...
10926 records inserted from USC00257070.txt
Processing USC00114442.txt...
10957 records inserted from USC00114442.txt
Processing USC00115833.txt...
10866 records inserted from USC00115833.txt
Processing USC00130133.txt...
10957 records inserted from USC00130133.txt
Processing USC00132724.txt...
10836 records inserted from USC00132724.txt
Processing USC00250130.txt...
9587 records inserted from USC00250130.txt
Processing USC00256970.txt...
10017 records inserted from USC00256970.txt
Processing USC00116446.txt...
10654 records inserted from USC00116446.txt
Processing US

In [10]:
def fetch_sample_records(limit=10):
    conn = sqlite3.connect(DB_WTHR_FILE)
    cursor = conn.cursor()
    cursor.execute(f"SELECT * FROM new_weather_data LIMIT {limit}")
    records = cursor.fetchall()
    conn.close()
    
    print(f"Showing {limit} sample records:\n")
    for row in records:
        print(row)

# Run this to check the inserted data
fetch_sample_records()


Showing 10 sample records:

(1, 'USC00257715', '19850101', -83, -144, 0)
(2, 'USC00257715', '19850102', 0, -133, 0)
(3, 'USC00257715', '19850103', 22, -111, 0)
(4, 'USC00257715', '19850104', 61, -50, 0)
(5, 'USC00257715', '19850105', 78, -67, 0)
(6, 'USC00257715', '19850106', 94, -11, 0)
(7, 'USC00257715', '19850107', 56, -11, 0)
(8, 'USC00257715', '19850108', 17, -89, 0)
(9, 'USC00257715', '19850109', -50, -94, 33)
(10, 'USC00257715', '19850110', -61, -106, 41)


In [11]:
def count_total_records():
    conn = sqlite3.connect(DB_WTHR_FILE)
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM new_weather_data")
    total_records = cursor.fetchone()[0]
    conn.close()
    print(f"Total records in database: {total_records}")

# Run this function
count_total_records()


Total records in database: 1729957
