In [None]:
pip install -r requirements.txt

In [4]:
## 1 Problem 1: NASA APOD Data Retrieval and JSON File Processing (33 marks)

In [2]:
# use the dotenv package to load the environment variables
from dotenv import load_dotenv
import os

# load the environment variables from the .env file
load_dotenv()

# get the environment variables and store them in variables
api_key = os.getenv('API_KEY')
base_url = os.getenv('BASE_URL')


# define the start and end date for the historical data
start_date = '2020-01-01'
end_date = '2020-12-31'

In [3]:
import requests
from datetime import date , timedelta

def get_apod_data(api_key, date):
    url = f"{base_url}/planetary/apod?api_key={api_key}&date={date}"
    try:
        response = requests.get(url)
        # check if the response is successful and raise an exception incase of an error
        response.raise_for_status()
        # parse the JSON response
        data = response.json()
        return {
            "date": data.get("date"),
            "title": data.get("title"),
            "url": data.get("url"),
            "explanation": data.get("explanation"),
            "media_type": data.get("media_type")
        }
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data for {date}: {e}")
    except KeyError:
        print("Unexpected response format")

# Get today's date  and fetch the data for today from the API to test the function
today = date.today()
today_data = get_apod_data(api_key, today)

print(today_data)

{'date': '2024-11-19', 'title': 'Undulatus Clouds over Las Campanas Observatory', 'url': 'https://apod.nasa.gov/apod/image/2411/ParallelClouds_Beletsky_960.jpg', 'explanation': "What's happening with these clouds?  While it may seem that these long and thin clouds are pointing toward the top of a hill, and that maybe a world-famous observatory is located there, only part of that is true. In terms of clouds, the formation is a chance superposition of impressively periodic undulating air currents in Earth's lower atmosphere. Undulatus, a type of Asperitas cloud, form at the peaks where the air is cool enough to cause the condensation of opaque water droplets.  The wide-angle nature of the panorama creates the illusion that the clouds converge over the hill.  In terms of land, there really is a world-famous observatory at the top of that peak: the Carnegie Science's Las Campanas Observatory in the Atacama Desert of Chile.  The two telescope domes visible are the 6.5-meter Magellan Telesco

In [4]:
# validate the response data by checking if the response is a dictionary and contains the expected keys and values 
import unittest

class TestGetApodData(unittest.TestCase):
    def test_get_apod_data(self):
        data = get_apod_data(api_key, date.today())
        self.assertIsInstance(data, dict)
        self.assertIn("date", data)
        self.assertIn("title", data)
        self.assertIn("url", data)
        self.assertIn("explanation", data)
        self.assertIn("media_type", data)
    
# Run the test
unittest.main(argv=[''], exit=False)


.
----------------------------------------------------------------------
Ran 1 test in 0.774s

OK


<unittest.main.TestProgram at 0x10e9f8c90>

In [5]:
from datetime import datetime
import time

# Function to fetch APOD data for multiple dates within a range
def fetch_multiple_apod_data(api_key, start_date, end_date):
    current_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")
    
    apod_data = []

    while current_date <= end_date:
        date_str = current_date.strftime("%Y-%m-%d")
        # Fetch data for the current date
        data = get_apod_data(api_key, date_str)
        if data:
            print(f"Fetched data for {date_str}")
            apod_data.append(data)
        # Move to the next date
        current_date += timedelta(days=1)
        # Delay to respect API rate limits
        time.sleep(1)  

    return apod_data

# apod_data = fetch_multiple_apod_data(api_key, start_date, end_date)

In [6]:
# Better approach the fetch the data for multiple dates using start_date and end_date as query parameters in the API URL

def get_range_apod_data(api_key, start_date , end_date):
    url = f"{base_url}/planetary/apod?api_key={api_key}&start_date={start_date}&end_date={end_date}"
    try:
        response = requests.get(url)
        response.raise_for_status()
        data = response.json()
        formatted_data = []
        for item in data:
            formatted_data.append({
                "date": item.get("date"),
                "title": item.get("title"),
                "url": item.get("url"),
                "explanation": item.get("explanation"),
                "media_type": item.get("media_type")
            })
        return formatted_data
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data for the range {start_date} to {end_date}: {e}")
    except KeyError:
        print("Unexpected response format")

In [7]:
import os
import json

def save_to_json(data, filename='apod_data.json'):
    try:
        # Verify if the data is a JSON-serializable list
        if not isinstance(data, list):
            raise ValueError("Data should be a list")

        # Check if the file exists
        if not os.path.exists(filename):
            # If the file does not exist, create it and add the data
            with open(filename, 'w') as file:
                json.dump(data, file, indent=4)
                file.write("\n")
            return
        
        # If the file exists, check if it contains any data
        with open(filename, 'r+') as file:
            if os.stat(filename).st_size == 0:
                # If the file is empty, add the data directly
                json.dump(data, file, indent=4)
                file.write("\n")
            else:
                # If the file has data, load existing data, concatenate with new data
                file.seek(0)
                existing_data = json.load(file)
                
                # Ensure existing data is a list
                if not isinstance(existing_data, list):
                    raise ValueError("Existing file data is not in list format")

                # Concatenate existing data with the new data
                updated_data = existing_data + data
                
                # Write the concatenated data back to the file
                file.seek(0)
                file.truncate(0)
                json.dump(updated_data, file, indent=4)
                file.write("\n")
                
    except ValueError as ve:
        print(f"Data validation error: {ve}")
    except IOError as e:
        print(f"Error writing to file {filename}: {e}")
    except json.JSONDecodeError:
        print(f"Error decoding JSON from file {filename}. Ensure the file format is correct.")


In [11]:
def fetch_and_save_apod_data(api_key, start_date, end_date):
    # using the get_range_apod_data function for better performance and to avoid rate limits
    range_data = get_range_apod_data(api_key, start_date, end_date)
    # to test the loop function, uncomment the line below and comment the line above
    # range_data = fetch_multiple_apod_data(api_key, start_date, end_date)
    if range_data:
        print(f"Fetched data for {len(range_data)} dates")
        # Save the data to a JSON file with the default filename
        save_to_json(range_data) 


fetch_and_save_apod_data(api_key, start_date, end_date)

Fetched data for 365 dates


In [None]:
## 2 Problem 2: JSON Data Reading,Looping,and Processing (27Marks)

In [None]:
def read_apod_data(filename='apod_data.json'):
    try:
        with open(filename, 'r') as file:
            data = json.load(file)
        for entry in data:
            print(f"Date: {entry['date']}, Title: {entry['title']}")
        return data
    except FileNotFoundError:
        print(f"Error: The file {filename} was not found.")
    except PermissionError:
        print(f"Error: Permission denied when accessing {filename}.")
    except json.JSONDecodeError:
        print(f"Error: The file {filename} is empty or contains invalid JSON.")
    return None

# Read the data from the JSON file
saved_data = read_apod_data()

In [13]:
def analyze_apod_media(data):
    img_count = 0 # image count
    video_count = 0 # video count
    detail_explanation = {"date": None , "length": 0} # date and length of the explanation with default values of None and 0 respectively

    for entry in data:
        if entry['media_type'] == 'image':
            img_count += 1
        elif entry['media_type'] == 'video':
            video_count += 1
        
        explanation_length = len(entry.get("explanation", ""))
        if explanation_length > detail_explanation['length']:
            detail_explanation['date'] = entry['date']
            detail_explanation['length'] = explanation_length

    print(f"Number of images: {img_count}")
    print(f"Number of videos: {video_count}")
    print(f"Date with the longest explanation: {detail_explanation['date']} (Length: {detail_explanation['length']} characters)")


analyze_apod_media(saved_data)

Number of images: 329
Number of videos: 36
Date with the longest explanation: 2020-08-31 (Length: 1572 characters)


In [14]:
# Extract data into csv file
import csv

def write_to_csv(data, filename='apod_summary.csv'):
    try:
        with open(filename, 'a', newline='') as csvfile:
            writer = csv.writer(csvfile)
            # Write headers if the file is empty
            if csvfile.tell() == 0:
                writer.writerow(["date", "title", "media_type", "url"])

            # Write data entries
            for entry in data:
                writer.writerow([
                    entry.get("date", ""),
                    entry.get("title", ""),
                    entry.get("media_type", ""),
                    entry.get("url", "")
                ])
        print(f"Data successfully written to {filename}")
    except IOError as e:
        print(f"Error writing to CSV file {filename}: {e}")


if saved_data:
    write_to_csv(saved_data)

Data successfully written to apod_summary.csv


In [40]:
import numpy as np

def create_array(rows=20, cols=5):
    main_array = np.empty((0, 5), dtype=int) 
    while main_array.shape[0] < rows:

        # Generate a random row
        new_row = np.random.randint(10, 100, size=(1, cols))
        
        # Condition 1: Check if the sum of the row is even
        if new_row.sum() % 2 == 0:

            temp_array = np.vstack([main_array, new_row])

            # Condition 2: Check if the sum of all values in the array is a multiple of 5
            if temp_array.sum() % 5 == 0:
                # If both conditions are met, update the main array
                main_array =temp_array

    return main_array


array = create_array()

In [None]:
def process_array(array):
    # Elements divisible by both 3 and 5
    divisible_by_3_and_5 = array[(array % 3 == 0) & (array % 5 == 0)]
    print("Elements divisible by both 3 and 5:", divisible_by_3_and_5)

    # Replace elements > 75 with the array mean
    mean_value = array.mean()
    print("Mean value:", mean_value)
    array[array > 75] = mean_value
    print("Modified Array:\n", array)

process_array(array)

In [None]:
# Statistical Operations
mean = array.mean()
std_dev = array.std()
median = np.median(array)
column_variance = array.var(axis=0)

print(f"Mean of array: {mean}")
print(f"Standard deviation of array: {std_dev}")
print(f"Median of array: {median}")
print(f"Variance of each column: {column_variance}")
