In [8]:
import pandas as pd
from datetime import datetime, timedelta
import random
import os

#
# SECTION 1: GENERATE SAMPLE DATA
#

# Only try to load the CSV if it exists and has the required columns
if os.path.exists("school_attendance.csv"):
    temp_df = pd.read_csv("school_attendance.csv")
    if "last_updated" in temp_df.columns:
        df = pd.read_csv(
            "school_attendance.csv",
            parse_dates=["last_updated"],
            converters={"Date": lambda x: datetime.strptime(x, "%Y%m%d")}
        )
        print(f" Full extraction: Loaded ALL {len(df)} records.")
    else:
        print("school_attendance.csv exists but is missing 'last_updated' column. Please generate sample data first.")
        df = None
else:
    print("school_attendance.csv not found. Please generate sample data first.")
    df = None
# You can assign to a variable if needed, e.g. df_full = df
# except Exception as e:
# print(f" # school_attendance_etl.py


def generate_sample_data():
    database_file = 'school_attendance.csv'
    schools = ['01M015', '02M394', '03K403', '04M409', '05M280']
    data = []
    start_date = datetime(2018, 9, 1)

    for i in range(1, 91):  # 90 days of data
            date = start_date + timedelta(days=i)
            if date.weekday() >= 5:  # Skip weekends
                continue
                
            for school in schools:
                enrolled = random.randint(150, 200)
                absent = random.randint(5, 30)
                present = enrolled - absent
                data.append({
                    'School DBN': school,
                    'Date': date.date().strftime('%Y%m%d'),
                    'Enrolled': enrolled,
                    'Absent': absent,
                    'Present': present,
                    'Released': 0,
                    'last_updated': (date + timedelta(hours=random.randint(0, 23))).isoformat()
                })

    df = pd.DataFrame(data)
    df.to_csv('school_attendance.csv', index=False)
print("Sample data generated in school_attendance.csv")

#
# SECTION 2 FULL EXTRACTION AND TRANSFORMATION
# try:Full extraction failed: {str(e)}")
df = None  # or df_full = None

#INCREMENTAL EXTRACTION
def incremental_extraction(max_rows=50):
    """Perform incremental extraction with row limit"""
    try:
        # 1. Handle last_extraction.txt
        if not os.path.exists("last_extraction.txt"):
            with open("last_extraction.txt", "w") as f:
                default_date = "2018-01-01T00:00:00"
                f.write(default_date)
            print(f"Created last_extraction.txt with default date {default_date}")
        
        with open("last_extraction.txt", "r") as f:
            last_extraction = f.read().strip()

        # 2. Load and process data
        df = pd.read_csv("school_attendance.csv", 
                        parse_dates=["last_updated"],
                        converters={'Date': lambda x: datetime.strptime(x, '%Y%m%d')})
        
        last_extraction_time = pd.to_datetime(last_extraction)
        df_incremental = df[df['last_updated'] > last_extraction_time]
        
        # 3. Apply row limit
        df_limited = df_incremental.head(max_rows)
        
        print(f"\nFound {len(df_incremental)} new records since {last_extraction}")
        print(f"Returning {len(df_limited)} records (max {max_rows})")
        
        if len(df_limited) > 0:
            print("\nSample of returned records:")
            print(df_limited.head())
            
            # 4. Update timestamp
            new_checkpoint = df_limited['last_updated'].max()
            with open("last_extraction.txt", "w") as f:
                f.write(new_checkpoint.isoformat())
            print(f"\nUpdated last_extraction.txt to {new_checkpoint}")
            
            return df_limited
        else:
            print("\nNo new records found.")
            return pd.DataFrame()  # Return empty DataFrame if no new records
            
    except Exception as e:
        print(f"\nError: {str(e)}")
        return None

if __name__ == "__main__":
    # Generate sample data if needed
    generate_sample_data()
    
    # Run incremental extraction (gets max 50 rows)
    result = incremental_extraction(max_rows=50)
    
    # Optional: Save the extracted data to a new file
    if result is not None and not result.empty:
        result.to_csv("latest_extracted_records.csv", index=False)
        print("\nSaved extracted records to latest_extracted_records.csv")

 Full extraction: Loaded ALL 325 records.
Sample data generated in school_attendance.csv

Found 0 new records since 2018-11-30T22:00:00
Returning 0 records (max 50)

No new records found.


In [10]:
# Run incremental extraction and print results
df_incremental = incremental_extraction(max_rows=50)

if df_incremental is not None and not df_incremental.empty:
    print(f"\nFound {len(df_incremental)} new records.")
    print("\nSample of returned records:")
    print(df_incremental.head())
else:
    print("\nNo new records found.")



Found 0 new records since 2018-11-30T22:00:00
Returning 0 records (max 50)

No new records found.

No new records found.


In [11]:
try:
    df_full = pd.read_csv(
        "school_attendance.csv",
        parse_dates=["last_updated"],
        converters={"Date": lambda x: datetime.strptime(x, "%Y%m%d")}
    )
    print(f"Full extraction: Loaded ALL {len(df_full)} records.")
    transform_data(df_full, "transformed_full.csv")
except Exception as e:
    print(f"Full extraction failed: {str(e)}")


Full extraction: Loaded ALL 325 records.
Full extraction failed: name 'transform_data' is not defined


In [12]:
if result is not None and not result.empty:
    result.to_csv("latest_extracted_records.csv", index=False)
    print("Saved extracted records to latest_extracted_records.csv")

    # Transforming incremental data
    transform_data(result, "transformed_incremental.csv")


In [13]:
# --- Transforming Full Data ---

import pandas as pd
from datetime import datetime

def transform_data(df, output_path):
    """
    Apply cleaning, enrichment, and structural transformation to the dataset.
    Saves the transformed data to output_path.
    """
    if df is None or df.empty:
        print("No data to transform.")
        return pd.DataFrame()

    # 1. Cleaning: Remove duplicates and fill missing values
    df.drop_duplicates(inplace=True)
    df.fillna(0, inplace=True)

    # 2. Enrichment: Add attendance rate as a percentage
    df['Attendance_Rate'] = (df['Present'] / df['Enrolled']) * 100

    # 3. Structural: Standardize 'Date' format and school codes
    df['Date'] = pd.to_datetime(df['Date'], format='%Y%m%d')
    df['School DBN'] = df['School DBN'].str.upper()

    # Save transformed data
    df.to_csv(output_path, index=False)
    print(f"Transformed full dataset saved to {output_path}")
    return df

# Load full extracted data
try:
    df_full = pd.read_csv(
        "school_attendance.csv",
        parse_dates=["last_updated"],
        converters={"Date": lambda x: datetime.strptime(x, "%Y%m%d")}
    )
    print(f"Loaded full dataset with {len(df_full)} records.")

    # Transform full dataset
    df_transformed_full = transform_data(df_full, "transformed_full.csv")
except Exception as e:
    print(f"Error in full data transformation: {str(e)}")


Loaded full dataset with 325 records.
Transformed full dataset saved to transformed_full.csv


In [15]:
# --- Transform Incremental Data ---

try:
    # Load incremental data (no need for converters if date is already in ISO format)
    df_incremental = pd.read_csv(
        "latest_extracted_records.csv",
        parse_dates=["Date", "last_updated"]
    )
    print(f"Loaded incremental dataset with {len(df_incremental)} records.")

    # Transform incremental dataset
    df_transformed_incremental = transform_data(df_incremental, "transformed_incremental.csv")
except Exception as e:
    print(f"Error in incremental data transformation: {str(e)}")



Loaded incremental dataset with 3 records.
Transformed full dataset saved to transformed_incremental.csv
