In [None]:
%load_ext autoreload
%autoreload 2

from modules.auth import *
from modules.assessments_endpoints import *
from modules.frame_transformations import *
import logging
import os
import sys
from pyspark import RDD
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("API Request Parallelization") \
    .getOrCreate()

spark.sparkContext.setLogLevel("INFO")


# Configure logging to use StreamHandler for stdout
logging.basicConfig(
    level=logging.INFO,  # Adjust as needed (e.g., DEBUG, WARNING)
    format="%(asctime)s - %(message)s",  # Log format
    datefmt="%d-%b-%y %H:%M:%S",  # Date format
    handlers=[
        logging.StreamHandler(sys.stdout)  # Direct logs to stdout
    ],
    force=True  # Ensures existing handlers are replaced
)


def get_assessment_results(spark, save_path, view_path, years_data, start_date, end_date_override=None):
    logging.info('\n\n-------------New Illuminate Operations Logging Instance')

    try:
        access_token, expires_in = get_access_token()

        assessments_df, assessment_id_list = get_all_assessments_metadata(access_token)
        # assessment_id_list = assessment_id_list[:100] #for testing
        missing_ids_from_metadata = ['114845', '141498'] # Add assessments that are not present in assessements metadata
        assessment_id_list = list(set(assessment_id_list + missing_ids_from_metadata))
        logging.info(f'Here is the length of the assessment_id_list variable {len(assessment_id_list)}')

        test_results_group, log_results_group = parallel_get_assessment_scores(spark, access_token, assessment_id_list, 'Group', start_date, end_date_override=None)
        test_results_standard, log_results_standard = parallel_get_assessment_scores(spark, access_token, assessment_id_list, 'Standard', start_date, end_date_override)
        test_results_no_standard, log_results_no_standard = parallel_get_assessment_scores(spark, access_token, assessment_id_list, 'No_Standard', start_date, end_date_override)
 
        test_results_combined = bring_together_test_results(test_results_no_standard, test_results_standard)
        test_results_view = create_test_results_view(test_results_combined, years_data) #add in grade level col, string matching
        logging.info("Assessment results fetched and processed.")

        
        os.makedirs(save_path, exist_ok=True)

        if years_data == '23-24':
            logging.info(f'Sending data for {years_data} school year')
            send_to_local(save_path, test_results_group, 'assessment_results_group_historical.csv')
            send_to_local(save_path, test_results_combined, 'assessment_results_combined_historical.csv')
            send_to_local(view_path, test_results_view, 'illuminate_assessment_results_historical.csv')
            
        elif years_data == '24-25':
            logging.info(f'Sending data for {years_data} school year')
            send_to_local(save_path, test_results_group, 'assessment_results_group.csv')
            send_to_local(save_path, test_results_combined, 'assessment_results_combined.csv')
            send_to_local(view_path, test_results_view, 'illuminate_assessment_results.csv')
        else:
            raise ValueError(f'Unexpected value for years variable data {years_data}')
        
        #No matter what update assessments_metadata file to display available assessments
        send_to_local(save_path, assessments_df, 'assessments_metadata.csv')
        
        


    except Exception as e:
        logging.error(f"Error fetching assessment results: {e}")
        raise AirflowException("Failed to fetch and process assessment results")


get_assessment_results(spark,
                        save_path = '/home/g2015samtaylor/illuminate',
                        view_path = '/home/g2015samtaylor/views',
                        years_data = '23-24',
                        start_date = '2023-07-01',
                        end_date_override='2024-07-01')

# end_date_override='2024-07-01' #should default to todays date


# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/sam/icef-437920.json"


#Add to requirements.txt
#Re-initaite docker with new tag of spark, in case need to roll back
#Update changes in docker file
#Make sure changes flow through to airflow
#Run locally for string matching


In [None]:
# Traceback (most recent call last):
#   File "/app/illuminate_pipeline.py", line 43, in get_assessment_results
#     test_results_view = create_test_results_view(test_results_combined, years_data)
#   File "/app/modules/frame_transformations.py", line 180, in create_test_results_view
#     test_results_view = apply_manual_changes(test_results)
# TypeError: apply_manual_changes() missing 1 required positional argument: 'changes_file_path'

# During handling of the above exception, another exception occurred:

# Traceback (most recent call last):
#   File "/app/illuminate_pipeline.py", line 69, in <module>
#     get_assessment_results(spark,
#   File "/app/illuminate_pipeline.py", line 66, in get_assessment_results
#     raise AirflowException("Failed to fetch and process assessment results")

# Query 

In [None]:
from google.cloud import bigquery
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/sam/icef-437920.json"
    
# Initialize the BigQuery client
client = bigquery.Client(project='icef-437920')



# Execute the query
changes = client.query('''
SELECT 
CAST(assessment_id AS STRING) AS assessment_id, 
* EXCEPT(assessment_id) 
FROM `icef-437920.illuminate.illuminate_checkpoint_title_issues`
''')


# Convert the query results to a Pandas DataFrame
changes = changes.result().to_dataframe()

update_dict = changes.set_index('assessment_id')['assessment_id'].to_dict()



In [11]:
import pandas as pd
import os
import logging
from google.cloud import storage

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/sam/icef-437920.json"
test = pd.read_csv('/home/sam/git_directory/ICEF/Illuminate/test.csv')

def send_to_gcs(bucket_name, save_path, frame, frame_name):
    """
    Uploads a DataFrame as a CSV file to a GCS bucket.

    Args:
        bucket_name (str): The name of the GCS bucket.
        save_path (str): The path within the bucket where the file will be saved.
        frame (pd.DataFrame): The DataFrame to upload.
        frame_name (str): The name of the file to save.
    """
    if not frame.empty:
        # Initialize the GCS client
        client = storage.Client()

        # Create a temporary local file to save the DataFrame
        temp_file_path = os.path.join("/tmp", frame_name)
        frame.to_csv(temp_file_path, index=False)

        try:
            # Get the bucket
            bucket = client.bucket(bucket_name)

            # Define the blob (file path in the bucket)
            blob = bucket.blob(os.path.join(save_path, frame_name))

            # Upload the file to GCS
            blob.upload_from_filename(temp_file_path)
            logging.info(f"{frame_name} uploaded to GCS bucket {bucket_name} at {save_path}/{frame_name}")
        except Exception as e:
            logging.error(f"Failed to upload {frame_name} to GCS bucket {bucket_name}: {e}")
        finally:
            # Clean up the temporary file
            if os.path.exists(temp_file_path):
                os.remove(temp_file_path)
    else:
        logging.info(f"No data present in {frame_name} file")

send_to_gcs(
        bucket_name="illuminatebucket-icefschools-1",
        save_path="",
        frame=test,
        frame_name="test.csv"
    )

In [8]:
changes

Unnamed: 0,assessment_id,test_type,curriculum,unit,title
0,62474,checkpoint,Math,6.NS.A.1,6.NS.A.1 Checkpoint
1,142420,checkpoint,Math,6.NS.C.5-7,Grade 6 NS.C.5-7 Checkpoint
2,62479,checkpoint,Math,6.RP.A.2,6.RP.A.2 Checkpoint
3,62525,checkpoint,Math,6.RP.A.3a,6.RP.A.3a Checkpoint
4,62477,checkpoint,Math,6.RP.A.3b,6.RP.A.3b Checkpoint
...,...,...,...,...,...
76,107735,checkpoint,Math,Unit 5 Section B,Grade K Math Unit 5 Section B Checkpoint
77,107670,checkpoint,Math,Unit 5 Section C,Grade 3 Math Unit 5 Section C Checkpoint
78,107526,checkpoint,Math,Unit 6 Section A,Grade 2 Math Unit 6 Section A Checkpoint
79,115933,checkpoint,Math,Unit 6 Section A,Grade K Math Unit 6 Section A Checkpoint


In [4]:
update_dict

NameError: name 'update_dict' is not defined

In [None]:
#   File "/app/illuminate_pipeline.py", line 43, in get_assessment_results
#     test_results_view = create_test_results_view(test_results_combined, years_data)
#   File "/app/modules/frame_transformations.py", line 180, in create_test_results_view
#     test_results_view = apply_manual_changes(test_results)
#   File "/app/modules/frame_transformations.py", line 168, in apply_manual_changes
#     update_dict = changes.set_index('assessment_id')[column].to_dict()
# AttributeError: 'QueryJob' object has no attribute 'set_index'

# During handling of the above exception, another exception occurred:

# Traceback (most recent call last):
#   File "/app/illuminate_pipeline.py", line 69, in <module>
#     get_assessment_results(spark,
#   File "/app/illuminate_pipeline.py", line 66, in get_assessment_results
#     raise AirflowException("Failed to fetch and process assessment results")