### 1. Import lib

In [1]:
import yaml
import pandas as pd

from sqlalchemy import create_engine
from sqlalchemy.engine.base import Connection
from datetime import datetime, timedelta
from pytz import timezone

### 2. Help functions

In [2]:

# Function load data
def read_data (url:str) -> pd.DataFrame:
    return pd.read_csv(url, sep='|')

# Function transform data
def clean_data(df: pd.DataFrame, last_report_dt: str) -> pd.DataFrame:
    """
    Clean the DataFrame by removing unnecessary columns and renaming others.
    """
    # Rename columns
    df = df.rename(columns={'ReportDt':'report_dt', 'Unit':'unit', 'Power':'power'})

    # Alter types of columns
    df['power'] = df['power'].astype(int)

    # Remoce sapce and lower case
    df['unit'] = df['unit'].apply(lambda x: x.strip().lower().replace(' ','_') )

    # Format date of column report_dt
    df['report_dt'] = pd.to_datetime( df['report_dt'].apply(lambda x: x.split(' ')[0]), format='%m/%d/%Y' )

    return df[ df['report_dt'] > last_report_dt ].reset_index(drop=True)


# Get the last ingestion date
def get_ingestion_date(con:Connection) -> str:
    last_report_dt = con.exec_driver_sql('select max(report_dt) from reactor_status').fetchone()[0]
    last_report_dt = last_report_dt.strftime("%Y-%m-%d")

    return last_report_dt

### 3. DB Conection

In [3]:
# Load the YAML configuration file
with open('/home/las/Documentos/repos/projeto_airflow/config.yaml', "r") as file:
    config = yaml.safe_load(file)

# Extract database connection details from the config
host = config["database"]["host"]
db = config["database"]["db"]
user = config["database"]["user"]
password = config["database"]["password"]

# conection string
conn = f'postgresql://{user}:{password}@{host}.oregon-postgres.render.com/{db}'

# create engine
engine = create_engine(conn)

# create connection
con = engine.connect()


### 4. Loading data

In [None]:
url = 'https://www.nrc.gov/reading-rm/doc-collections/event-status/reactor-status/powerreactorstatusforlast365days.txt'
df = read_data(url=url)
df.head()

Unnamed: 0,ReportDt,Unit,Power
0,5/23/2025 12:00:00 AM,Arkansas Nuclear 1,100
1,5/23/2025 12:00:00 AM,Arkansas Nuclear 2,100
2,5/23/2025 12:00:00 AM,Beaver Valley 1,100
3,5/23/2025 12:00:00 AM,Beaver Valley 2,99
4,5/23/2025 12:00:00 AM,Braidwood 1,100


### 5. Transform data

In [5]:
last_report_dt = get_ingestion_date(con=con)
df = clean_data(df=df, last_report_dt=last_report_dt)
df.head()

Unnamed: 0,report_dt,unit,power
0,2025-05-23,arkansas_nuclear_1,100
1,2025-05-23,arkansas_nuclear_2,100
2,2025-05-23,beaver_valley_1,100
3,2025-05-23,beaver_valley_2,99
4,2025-05-23,braidwood_1,100


In [8]:
df.shape

(282, 3)

In [7]:
df.report_dt.max()

Timestamp('2025-05-23 00:00:00')

In [8]:
df2 = df[ df['report_dt'] <= '2025-05-20' ].reset_index(drop=True)
df2.shape

(34028, 3)

In [13]:
df2.to_sql(
    'reactor_status',
    schema='public',
    con=con,
    if_exists='replace',
    index=False,
)

28

In [14]:
con.exec_driver_sql('select count(*) from reactor_status').fetchone()[0]

34028

In [None]:


get_ingestion_date(con=con)

'2025-05-23'

In [16]:
df3 = df[ df['report_dt'] > last_report_dt ].reset_index(drop=True)
df3.shape

(282, 3)

In [17]:
df3.to_sql(
    'reactor_status',
    schema='public',
    con=con,
    if_exists='append',
    index=False,
)

282

In [18]:
34028 + 282

34310

In [19]:
con.exec_driver_sql('select count(*) from reactor_status').fetchone()[0]

34310

In [None]:
con.exec_driver_sql(
    """
    CREATE TABLE IF NOT EXISTS reactor_status (
        report_dt DATE,
        unit VARCHAR(50),
        power INT
    )
    """
)
# Insert the DataFrame into the database    
df.to_sql('reactor_status', con, if_exists='append', index=False)
# Close the connection
con.close()
# Close the engine
engine.dispose()


In [35]:
con.close()
engine.dispose()