In [12]:
# %% [1] Imports
import logging
import os
import sys

from IPython.display import display

In [13]:
# Define ANSI escape codes for green color
GREEN = "\033[92m"
RESET = "\033[0m"


# Custom logging formatter to include green color
class CustomFormatter(logging.Formatter):
    def format(self, record):
        log_msg = super().format(record)
        return f"{GREEN}{log_msg}{RESET}"


# Configure logging with the custom formatter
handler = logging.StreamHandler()
handler.setFormatter(CustomFormatter("%(asctime)s [%(levelname)s] %(message)s"))
logging.basicConfig(level=logging.INFO, handlers=[handler])

In [14]:
# Add the source directory to the path
sys.path.append(os.path.abspath("../source"))


In [6]:
#  Import the ingest_data function from the data_ingestion module
from source.data_ingestion import ingest_data
from source.utils.config_loader import load_config

In [7]:
# %% [2]  Load Config
config_path = os.path.join(os.path.abspath("../config"), "settings.yml")
config = load_config(config_path)

RAW_DIR = os.path.abspath(config["paths"]["raw_dir"])
INTERIM_DIR = os.path.abspath(config["paths"].get("interim_dir", "data/interim"))
PROCESSED_DIR = os.path.abspath(config["paths"]["processed_dir"])
ARCHIVE_DIR = os.path.abspath(config["paths"]["archive_dir"])
METADATA_DIR = os.path.abspath(config["paths"]["metadata_dir"])
METADATA_PATH = os.path.abspath(os.path.join(METADATA_DIR, "processed_files.json"))


In [8]:
# Define archive and metadata paths
ARCHIVE_DIR = "../data/archive"
METADATA_DIR = "../data/metadata"
RAW_DIR = "../data/raw"
PROCESSED_DIR = "../data/processed"

os.makedirs(INTERIM_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
os.makedirs(ARCHIVE_DIR, exist_ok=True)
os.makedirs(METADATA_DIR, exist_ok=True)


In [9]:
# %% [3] Ingest Data
ingest_data(raw_dir="../data/raw")


2025-01-04 16:38:55,354 - INFO - Starting data ingestion process.
[92m2025-01-04 16:38:55,354 [INFO] Starting data ingestion process.[0m
2025-01-04 16:38:55,360 - INFO - Number of CSV files found: 515
[92m2025-01-04 16:38:55,360 [INFO] Number of CSV files found: 515[0m
2025-01-04 16:38:55,362 - INFO - Processing: epa_so2_virginia_2023.csv
[92m2025-01-04 16:38:55,362 [INFO] Processing: epa_so2_virginia_2023.csv[0m
2025-01-04 16:38:55,366 - INFO - File hash: 6a3de812a864fd09ee3039be92ae3709
[92m2025-01-04 16:38:55,366 [INFO] File hash: 6a3de812a864fd09ee3039be92ae3709[0m
2025-01-04 16:38:55,379 - INFO - Deduplication: (2877, 21) -> (2877, 21)
[92m2025-01-04 16:38:55,379 [INFO] Deduplication: (2877, 21) -> (2877, 21)[0m
2025-01-04 16:38:55,398 - INFO - Processed data saved to interim: /Users/dogaaydin/PycharmProjects/Geo_Sentiment_Climate/data/interim/interim_epa_so2_virginia_2023.csv
[92m2025-01-04 16:38:55,398 [INFO] Processed data saved to interim: /Users/dogaaydin/PycharmPr

In [10]:
 # %% [2] Metadata file path
import os
import json

if os.path.exists(METADATA_PATH):
    print(f"Metadata file found: {METADATA_PATH}")
    with open(METADATA_PATH, "r") as f:
        metadata = json.load(f)
        print("Metadata içeriği:")
        print(json.dumps(metadata, indent=4))
else:
    print(f": Metadata file not found: {METADATA_PATH}")


Metadata file found: /Users/dogaaydin/PycharmProjects/Geo_Sentiment_Climate/data/metadata/processed_files.json
Metadata içeriği:
{
    "processed_files": [
        {
            "file_name": "epa_so2_virginia_2023.csv",
            "file_hash": "6a3de812a864fd09ee3039be92ae3709",
            "processed_at": "2025-01-04T16:38:55.401380",
            "rows_count": 2877
        },
        {
            "file_name": "epa_so2_kansas_2023.csv",
            "file_hash": "2895f5203520d743fd97def898f35874",
            "processed_at": "2025-01-04T16:38:55.423019",
            "rows_count": 1350
        },
        {
            "file_name": "epa_so2_iowa_2023.csv",
            "file_hash": "5af77f61f2d95f707673dd844610b960",
            "processed_at": "2025-01-04T16:38:55.452177",
            "rows_count": 2327
        },
        {
            "file_name": "epa_so2_arkansas_2023.csv",
            "file_hash": "e0ba9f49cc942abdda171e51798e0859",
            "processed_at": "2025-01-04T16:38:55.4

In [11]:
# %% [3] Ingestion results inspection

def load_processed_files(metadata_path):
    if not os.path.exists(metadata_path):
        return {"processed_files": []}
    with open(metadata_path, "r") as f:
        return json.load(f)


metadata = load_processed_files(METADATA_PATH)
processed_files = metadata.get("processed_files", [])

print(f"processed file count: {len(processed_files)}")
print("first 5 processed files:")
for file in processed_files[:5]:
    print(file)

# Show the first 5 processed files
display(processed_files[:5])


processed file count: 514
first 5 processed files:
{'file_name': 'epa_so2_virginia_2023.csv', 'file_hash': '6a3de812a864fd09ee3039be92ae3709', 'processed_at': '2025-01-04T16:38:55.401380', 'rows_count': 2877}
{'file_name': 'epa_so2_kansas_2023.csv', 'file_hash': '2895f5203520d743fd97def898f35874', 'processed_at': '2025-01-04T16:38:55.423019', 'rows_count': 1350}
{'file_name': 'epa_so2_iowa_2023.csv', 'file_hash': '5af77f61f2d95f707673dd844610b960', 'processed_at': '2025-01-04T16:38:55.452177', 'rows_count': 2327}
{'file_name': 'epa_so2_arkansas_2023.csv', 'file_hash': 'e0ba9f49cc942abdda171e51798e0859', 'processed_at': '2025-01-04T16:38:55.468022', 'rows_count': 360}
{'file_name': 'epa_so2_new_hampshire_2023.csv', 'file_hash': 'e64b587165a381c2640c2bb84508dc70', 'processed_at': '2025-01-04T16:38:55.491792', 'rows_count': 1398}


[{'file_name': 'epa_so2_virginia_2023.csv',
  'file_hash': '6a3de812a864fd09ee3039be92ae3709',
  'processed_at': '2025-01-04T16:38:55.401380',
  'rows_count': 2877},
 {'file_name': 'epa_so2_kansas_2023.csv',
  'file_hash': '2895f5203520d743fd97def898f35874',
  'processed_at': '2025-01-04T16:38:55.423019',
  'rows_count': 1350},
 {'file_name': 'epa_so2_iowa_2023.csv',
  'file_hash': '5af77f61f2d95f707673dd844610b960',
  'processed_at': '2025-01-04T16:38:55.452177',
  'rows_count': 2327},
 {'file_name': 'epa_so2_arkansas_2023.csv',
  'file_hash': 'e0ba9f49cc942abdda171e51798e0859',
  'processed_at': '2025-01-04T16:38:55.468022',
  'rows_count': 360},
 {'file_name': 'epa_so2_new_hampshire_2023.csv',
  'file_hash': 'e64b587165a381c2640c2bb84508dc70',
  'processed_at': '2025-01-04T16:38:55.491792',
  'rows_count': 1398}]

In [None]:
# %% [4] Ingestion results inspection

def load_processed_files(metadata_path):
    if not os.path.exists(metadata_path):
        return {"processed_files": []}
    with open(metadata_path, "r") as f:
        return json.load(f)


metadata = load_processed_files(METADATA_PATH)
processed_files = metadata.get("processed_files", [])

# Show the first 5 processed files
display(processed_files[:5])


In [None]:
# %% [4] Ingestion results inspection

def load_processed_files(metadata_path):
    if not os.path.exists(metadata_path):
        return {"processed_files": []}
    with open(metadata_path, "r") as f:
        return json.load(f)


metadata = load_processed_files(METADATA_PATH)
processed_files = metadata.get("processed_files", [])

print(f"processed file count: {len(processed_files)}")
print("first 5 processed files:")
for file in processed_files[:5]:
    print(file)

display(processed_files[:5])


In [None]:
# %% [4] Ingestion results inspection

def load_processed_files(metadata_path):
    if not os.path.exists(metadata_path):
        return {"processed_files": []}
    with open(metadata_path, "r") as f:
        return json.load(f)


metadata = load_processed_files(METADATA_PATH)
processed_files = metadata.get("processed_files", [])

print(f"processed file count: {len(processed_files)}")
print("first 5 processed files:")
for file in processed_files[:5]:
    print(file)

display(processed_files[:5])
