### Pipeline to download CH2 CLASS data and plot its graph(count vs time)

In this pipeline, we will first download the data, then we will make a text file with all the zipfile names so that we can extract csv files from each,then we generate lightcurve csv files from each zip files, then we combine the csv files and then sort them and then plot them.

update your pradhan username and password below:

In [6]:
# Inputs
USER = "username"
PASS = "password"

In [None]:
Now, edit the below section based on your requirement

In [1]:
start = "2023-10-01 00:00:00"  # start datetime to be searched
end = "2023-10-31 23:59:59"  # end datetime to be searched
no_files = 51732  # Total number of files
count = 0  # Starting index for download, change this from what is shown in website in case of break in download, add 500 to what value was shown in "downloaded files from index no"
DOWNLOAD_DIR = '/path/to/downloaded/files'  # Directory for downloaded files, you have to create them initially


In [2]:
#necessary libraries and modules:
#!/usr/bin/env python3
import os
import time
from selenium import webdriver
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait, Select
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import ElementClickInterceptedException

import sys
import glob
import shutil
import zipfile
import numpy as np
import pandas as pd
from astropy.io import fits
from datetime import datetime, timedelta

import matplotlib.pyplot as plt
from matplotlib.dates import DayLocator, DateFormatter

In [None]:
#Download geckodriver and update the below path
service = Service(r'/path/to/geckodriver') #Directory where the geckodriver is kept

In [None]:
# Defining Firefox browser preferences
profile = Options()
profile.set_preference('browser.download.folderList', 2)  # custom location
profile.set_preference('browser.download.dir', DOWNLOAD_DIR)
profile.set_preference('browser.download.manager.showWhenStarting', False)
profile.set_preference("browser.download.manager.showAlertOnComplete", False)
profile.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/zip")

In [None]:
# Script to automatically download files from pradhan and log out from the website upon completion
# Function to check status of download
def did_you_download():
    dpath = DOWNLOAD_DIR
    os.chdir(dpath)
    string = ".zip.part"

    while string == ".zip.part":
        files = sorted(os.listdir(dpath), key=os.path.getmtime)

        if not files:
            time.sleep(2)  # Wait and try again
            continue  # Skip this iteration if no files are found

        newfile = files[-1]
        string = newfile[-9:]  # Check for the ".zip.part" suffix
        time.sleep(2)

    status = "Downloaded: " + newfile
    return status

# Initialize the WebDriver
options = Options()
driver = webdriver.Firefox(service=service, options=options)
driver.get("https://pradan.issdc.gov.in/ch2/")

# Find and click on "Browse and Download"
BandD = driver.find_element(By.PARTIAL_LINK_TEXT, "Browse and Download")
BandD.click()

# Initialize a set to keep track of downloaded indexes
downloaded_indexes = set()

try:
    # Logging into Pradan
    username = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "username"))
    )
    username.clear()
    username.send_keys(USER)
    time.sleep(2)

    passwd = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "password"))
    )
    passwd.clear()
    passwd.send_keys(PASS)

    time.sleep(2)
    passwd.submit()
    print("\nLogged in.")

    # Selecting the section for CLASS on the website
    print("Selecting CLASS.")
    time.sleep(10)
    CLASS_data = WebDriverWait(driver, 20).until(
        EC.presence_of_element_located((By.ID, "tableForm:payloads:0:j_idt42"))
    )
    CLASS_data.click()

    # Filtering the data according to START and END time of observation
    print("Filtering data according to START and END time of observation.")
    From = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "filterForm:filterTable:0:datetime1_input"))
    )
    From.clear()
    From.send_keys(start)

    To = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "filterForm:filterTable:0:datetime2_input"))
    )
    To.clear()
    To.send_keys(end)

    time.sleep(2)
    Filter = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "filterForm:filterButton"))
    )
    Filter.click()
    print("Ready to download.")

    time.sleep(10)

    # Locate the dropdown menu and select "DOWNLOAD"
    dropdown = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "filterForm:j_idt53"))
    )
    select = Select(dropdown)
    select.select_by_visible_text("DOWNLOAD")  # Select the DOWNLOAD option

    # Choosing the FITS files and downloading them as batches of 500 files
    while count < no_files:
        # Wait for any modal overlay to disappear before proceeding
        print("Waiting for overlays to disappear...")
        WebDriverWait(driver, 60).until(
            EC.invisibility_of_element_located((By.CSS_SELECTOR, ".ui-widget-overlay, .ui-dialog-mask"))
        )
        print("Overlays have disappeared, continuing...")

        # Entering the starting index of the batch
        print("waiting for selecting index")
        Start_index = WebDriverWait(driver, 60).until(
            EC.presence_of_element_located((By.ID, "tableForm:startIndex"))
        )
        Start_index.clear()
        Start_index.send_keys(count + 1)

        # Wait until the "Select" button is clickable before interacting
        print("waiting for select button")
        Select_button = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.ID, "tableForm:selectButton"))
        )

        # Retry mechanism in case click fails due to intercepted element
        try:
            Select_button.click()
        except ElementClickInterceptedException:
            print("Select button click intercepted. Retrying after waiting for overlay.")
            WebDriverWait(driver, 20).until(
                EC.invisibility_of_element_located((By.CSS_SELECTOR, ".ui-widget-overlay, .ui-dialog-mask"))
            )
            Select_button.click()

        # Downloading the batch
        time.sleep(30)
        Download = WebDriverWait(driver, 30).until(
            EC.presence_of_element_located((By.ID, "tableForm:download"))
        )
        Download.click()

        # Checking the status of download
        time.sleep(1)
        print(did_you_download())

        # Marking the current batch as downloaded
        for i in range(count, min(count + 500, no_files)):
            downloaded_indexes.add(i)

        print("downloaded files from index no: ", count)
        count += 500
        time.sleep(8)

    # Final log of downloaded indexes
    print(f"Downloaded indexes: {downloaded_indexes}")

finally:
    try:
        # Locate the user menu dropdown using the div ID
        user_menu = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.ID, "j_idt30"))
        )
        user_menu.click()  # Click to open the user menu

        # Wait for the dropdown items to become visible
        logout_menu = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.CSS_SELECTOR, ".ui-menu-list.ui-helper-reset"))
        )

        # Now find the Logout link within the dropdown
        logout_link = WebDriverWait(logout_menu, 10).until(
            EC.element_to_be_clickable((By.XPATH, '//a[contains(@href, "logout") and contains(@class, "headerMenu")]'))
        )

        # Scroll the logout link into view
        driver.execute_script("arguments[0].scrollIntoView();", logout_link)

        # Click the logout link
        logout_link.click()
        print("Logout link clicked.")

    except Exception as e:
        print("An error occurred during logout but do not worry!")

    # Close the browser after logging out
    try:
        driver.quit()
        print("Logged out successfully!")
    except Exception as e:
        print("An error occurred while closing the browser.")


The above download process may take a while. Pay attention to the messages it generates. It might break in between due to factors like low internet speed or something. In that case, run the script again by changing the count variable value(according to the comment beside that variable)

Lets process the downloaded data now

In [None]:
#Code to extract and make csv files:

# Output file path for the text file
output_file_path = DOWNLOAD_DIR +'/zipfiles.txt'

# Get a list of all files in the directory
all_files = os.listdir(DOWNLOAD_DIR)

print("Total number of files:",len(all_files))

# Filter the list to only include zip files
zip_files = [file for file in all_files if file.endswith('.zip')]

print("Total number of zip_files:",len(zip_files))

# Print some information for debugging
#print("Directory path:", directory_path)
#print("Zip files:", zip_files)

# Write the names of zip files to the text file
with open(output_file_path, 'w') as file:
    for zip_file in zip_files:
        file.write(zip_file + '\n')

print(f"File names written to {output_file_path}")


# Open the file in read mode
with open(output_file_path, 'r') as file:
    # Count the number of lines
    line_count = sum(1 for line in file)

# Print the result, it should match with the number of zip files.
print(f'The number of lines/file names in the file is: {line_count}')

#For debugging purposes:
#for zip_file in zip_files:
   #print(zip_file)


In [None]:
#Code to generate lightCurve.csv files for each zip file data

# Specify the path to your input file
input_file_path = DOWNLOAD_DIR + '/zipfiles.txt'


i = 0
# Open the file in read mode ('r')
with open(input_file_path, 'r') as input_file:
    # Read each line and use it as input for the code snippet
    for line in input_file:
        i = i+1
        # Strip the newline character from the end of the line
        line = line.strip()

        #using the line as input
        #if len(sys.argv) > 1:
        #    zip_file_path = sys.argv[1]
        #else:
        zip_file_path = DOWNLOAD_DIR + '/'+line
        print(zip_file_path)
      
       # ### Unzipping the CLASS L1 zip file and finding fits files within

       	extract_to_path = DOWNLOAD_DIR +'/fits_files/'

        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
             zip_ref.extractall(extract_to_path)

        fits_files = []
        # Recursive search for all files with a .fits extension
        for root, dirs, files in os.walk(extract_to_path):
            fits_files.extend(glob.glob(os.path.join(root, '*.fits')))


        # ### Function to read fits file and retrieve the following
        # 1. total counts, 2. Start time, and 3. End time


        def read_FITS(fits_file_path):
  
            # Open the FITS file
            with fits.open(fits_file_path) as hdul:
            # Get the header from the primary HDU
            	header = hdul[1].header
            	data = hdul[1].data['counts'][519:1187] #we are interested in this channel because we need 7-16 keV band data
            	total_counts = np.sum(data)
        
            # Check if the keyword is present in the header
            if 'startime' in header:
                start = header['startime']
            else:
                start = 'Not found'
             
            if 'endtime' in header:
                end = header['endtime']
            else:
                end = 'Not found'
            
            return total_counts, start, end


        # ### Function to estimate the mid time when start time and end time are given

        def calculate_mid_time(start_time_str, end_time_str):
    
            if (start_time_str == 'Not found' or end_time_str == 'Not found'):
                return 'Not found in FITS'
            else:
                # Parse the start and end time strings
                start_time = datetime.strptime(start_time_str, "%Y-%m-%dT%H:%M:%S.%f")
                end_time = datetime.strptime(end_time_str, "%Y-%m-%dT%H:%M:%S.%f")
                
                # Calculate the time difference between start and end times
                time_difference = end_time - start_time
                
                # Calculate the mid time as half of the time difference added to the start time
                mid_time = start_time + time_difference / 2
                
                return mid_time


    # ### Generating CLASS light curve      
        
        Counts = []
        time = []
        
        for f in fits_files:
             c, s, e = read_FITS(f)
             m = calculate_mid_time(s, e)
            
             Counts = np.append(Counts, c.astype('float'))
             time = np.append(time, m.strftime('%Y-%m-%dT%H:%M:%S.%f'))
            
             light_curve = np.column_stack([time, Counts.astype('float')])
             lc = pd.DataFrame(light_curve, columns=['time', 'total_counts'])
             lc['time'] = pd.to_datetime(lc.time)
             lc_sorted = lc.sort_values(by='time', ascending=True)
             lc_sorted.to_csv(os.path.join(DOWNLOAD_DIR, 'LightCurve'+str(i)+'.csv'), index=False)

            
            
     # ### Removing temporary files                
                
        if os.path.exists(extract_to_path):
             try:
                 shutil.rmtree(extract_to_path)  # Use this if the directory is empty
             except OSError as e:
                 pass
        print('LightCurve'+str(i)+'.csv saved in current directory.')
                        
                        
print("DONE!")



Wait till Light Curve file is generated for each zip file

In [None]:
#Code to combine LightCurve csv files

# Directory path where your CSV files are located
#DOWNLOAD_DIR

# Extract the month_year from the DOWNLOAD_DIR
month_year = os.path.basename(os.path.dirname(DOWNLOAD_DIR))  


files = os.path.join(DOWNLOAD_DIR,"LightCurve*.csv")
files = glob.glob(files)

df = pd.concat(map(pd.read_csv, files),ignore_index = True)
print(df)

# Write the resulting DataFrame to a CSV file named 'output.csv'
# Construct the output file path
output_file_path = os.path.join(os.path.dirname(DOWNLOAD_DIR), f'CombinedLightCurve{month_year}.csv')

df.to_csv(output_file_path, index=False)
print(f'Done!\nCombinedLightCurve{month_year}.csv saved at {output_file_path}')

In [None]:
#Code to sort the data in csv file

# Directory path where your CSV files are located
directory_path = os.path.dirname(DOWNLOAD_DIR) 

# Read the CSV file into a pandas DataFrame
file_path = os.path.join(directory_path, f'CombinedLightCurve{month_year}.csv')  # Construct the file path dynamically
df = pd.read_csv(file_path, header=None, names=['time', 'counts'])



# Identify the first row
first_row = df.iloc[0:1]


# Convert 'time' to datetime format, specifying the format
df['time'] = pd.to_datetime(df['time'], format='%Y-%m-%d %H:%M:%S.%f', errors='coerce')

# Drop rows with NaT (Not a Time) values
df = df.dropna(subset=['time'])
# Sort the DataFrame by the 'time' column
df_sorted = pd.concat([first_row, df.iloc[1:].sort_values(by='time')])


# Save the sorted DataFrame back to the CSV file
output_path = os.path.join(os.path.dirname(DOWNLOAD_DIR), f'CLASS_LC_{month_year}_sorted.csv')  # Construct the output file path dynamically
df_sorted.to_csv(output_path, index=False, header=False)

print(f'Data sorted and saved to {output_path}')


### Code to plot CLASS data in 7-16keV range

In [None]:
# Construct the full file path
file_path = os.path.join(os.path.dirname(DOWNLOAD_DIR), f'CLASS_LC_{month_year}_sorted.csv')

# Read the data from the constructed file path into a DataFrame
df2 = pd.read_csv(file_path)

# Convert 'time' to datetime format
df2['time'] = pd.to_datetime(df2['time'])

# Create a single plot with two y-axes
fig, ax1 = plt.subplots(figsize=(10, 6))


# Plot the data
ax1.plot(df2['time'], df2['total_counts'], marker='o', linestyle='dashdot', color='#0073bc', label='CLASS protons')
ax1.set_xlabel('Time(UTC)')
ax1.set_ylabel('Counts/8s in 7-16 keV', color='#0073bc')
#ax1.legend(loc="upper right", bbox_to_anchor=(1.0, 1.0))

# Create a twin Axes sharing the xaxis
ax2 = ax1.twinx()

# Set the x-axis range (e.g., from '2023-01-13 00:00:00' to '2023-02-01 23:59:59')
plt.xlim([pd.to_datetime(start), pd.to_datetime(end)])

# Set ticks at each day
ax1.xaxis.set_major_locator(DayLocator())
ax1.xaxis.set_major_formatter(DateFormatter('%Y-%m-%d'))

#Uncomment for making fullmoon date ±3 days range
# Mark the full moon on Aug 1 and the ±3 days range
#full_moon_dates = pd.to_datetime(['2023-08-01'])
#for date in full_moon_dates:
#    plt.axvline(x=date, color='g', linestyle='--', label='Full Moon')
#    plt.axvspan(date - pd.Timedelta(days=3), date + pd.Timedelta(days=3), color='g', alpha=0.2)

#Uncomment for bluemoon
# Mark the full moon on the date and the ±3 days range
#full_moon_dates_bluemoon = pd.to_datetime(['2023-08-30'])
#for date in full_moon_dates_bluemoon:
#    plt.axvline(x=date, color='g', linestyle='--', label='Full Moon(BM)')
#    plt.axvspan(date - pd.Timedelta(days=3), date + pd.Timedelta(days=3), color='g', alpha=0.2)



# Rotate x-axis labels for both axes
ax1.set_xticklabels(ax1.get_xticklabels(), rotation='vertical')


# Adjust layout to prevent clipping of labels
plt.tight_layout()

# Move legends to the right side top
ax1.legend(loc="upper left")
ax2.legend(loc="upper right")

#Save the plot
plt.savefig(os.path.join(os.path.dirname(DOWNLOAD_DIR),f'CLASS_LC_{month_year}.png'))
# Show the plot
plt.show()

# Print confirmation of save
print(f"Plot saved as: {os.path.join(os.path.dirname(DOWNLOAD_DIR),f'CLASS_LC_{month_year}.png')}")

Code Credits: [Vaishnav Sankar K](https://github.com/Vai838)
