In [2]:
import requests
from datetime import datetime, timedelta
import os
import pandas as pd
import json
from time import sleep
import psycopg2
import psycopg2.extras
from dotenv import load_dotenv
from tenacity import retry, stop_after_attempt, wait_exponential



In [3]:
class Flares():

    def __init__ (self):

        self._initialize()


    def _initialize(self): 

        self.__nasa_api_key = os.environ.get("NASA_API_KEY")
        self.__date_ref = '2024-11-25' #(datetime.today()  - timedelta(days = 1)).strftime("%Y-%m-%d")

        try:
            self.response = requests.get(f"https://api.nasa.gov/DONKI/FLR?startDate={self.__date_ref}&endDate={self.__date_ref}&api_key={self.__nasa_api_key}", timeout = 50)
            self.response.raise_for_status()
            self._flares = self.response.json()
        
        except requests.exceptions.RequestException as req_err:
            print(f"Request error: {req_err}")
            self._flares = None

        except ValueError as json_err:
            print(f"JSON Error: {json_err}")
            self._flares = None

    def flares_data(self):

        try:

            self.rename_columns = {
            'flrID': 'flr_id',
            'instruments': 'instruments',
            'beginTime': 'begin_time',
            'peakTime': 'peak_time',
            'endTime': 'end_time',
            'classType': 'class_type',
            'sourceLocation': 'source_location',
            'activeRegionNum': 'active_region_num',
            'note': 'note',
            'submissionTime': 'submission_time',
            'versionId': 'version_id',
            'link': 'link',
            'linkedEvents': 'linked_events'}

            self.df = pd.DataFrame(self._flares)
            self.df = self.df.drop('catalog', axis = 1)

            self.df[["versionId"]] = self.df[["versionId"]].astype(str)
            self.df[["activeRegionNum"]] =  self.df[["activeRegionNum"]].astype(str)
            #self.df[["beginTime","peakTime","endTime","submissionTime"]] = self.df[["beginTime","peakTime","endTime","submissionTime"]].apply(pd.to_datetime)
            self.df['instruments'] = self.df['instruments'].apply(lambda x: json.dumps(x) if pd.notnull(x) else None)
            self.df['linkedEvents'] = self.df['linkedEvents'].apply(lambda x: json.dumps(x) if pd.notnull(x) else None)

            self.df.rename(columns = self.rename_columns, inplace=True)

            return self.df
        
        except:
            return pd.DataFrame()

In [4]:
class Iss():

    def __init__ (self):

        self.response_crew = requests.get("http://api.open-notify.org/astros.json")
        self.response_iss = requests.get("http://api.open-notify.org/iss-now.json")



    def crew_members (self):
        
        try:
            self.response_crew.raise_for_status()
            self.response_crew = self.response_crew.json()

        except requests.exceptions.RequestException as req_err:
            print(f"Request Error: {req_err}")
            self.response_crew = None

        except ValueError as json_err:
            print(f"JSON Error: {json_err}")
            self.response_crew = None

        if self.response_crew["message"] == "success":     

            self.crew = [names['name'] for names in self.response_crew['people'] if names['craft'] == 'ISS']
            self.df = pd.DataFrame({"name":self.crew})
            self.df["date_ref"] = datetime.today().strftime("%Y-%m-%d")

            return self.df
        
        else:
            return 'Failed to connect.'

    def iss_position (self):

        try:
            self.response_iss.raise_for_status()  
            self.response_iss = self.response_iss.json() 

        except requests.exceptions.RequestException as req_err:
            print(f"Request Error: {req_err}")
            self.response_iss = None

        except ValueError as json_err:
            print(f"JSON Error: {json_err}")
            self.response_iss = None


        if self.response_iss["message"] == "success": 
    
            self.position = self.response_iss['iss_position']
            self.position['NS'] = "North" if float(self.response_iss['iss_position']['latitude']) > 0 else "South"
            self.position['WE'] =  "East" if float(self.response_iss['iss_position']['longitude']) > 0 else "West"
            self.position['timestamp'] = self.response_iss['timestamp']

            return self.position
        
        else:
            return 'Failed to connect.'

In [5]:
class NEOs():


    def __init__(self):
        
        self._date_ref = (datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d")

        try:
            self.response = requests.get(f'https://www.neowsapp.com/rest/v1/feed?start_date={self._date_ref}&end_date={self._date_ref}&detailed=false', timeout=50)
            self.response.raise_for_status()
            self._neos = self.response.json()
        
        except requests.exceptions.RequestException as req_err:
            print(f"Request error: {req_err}")
            self._neos = None

        except ValueError as json_err:
            print(f"JSON Error: {json_err}")
            self._neos = None

    def neos_data(self):
    
        self.tuple_infos1 = ('id','name','is_potentially_hazardous_asteroid')
        self.tuple_infos2 = ('orbiting_body','close_approach_date', 'relative_velocity') #close approach 
        self.tuple_infos3 = ('estimated_diameter_min','estimated_diameter_max') #estimated diameter
        self.data_l = []

        self._neos_data  = self._neos['near_earth_objects'][self._date_ref]

        for f in range(self._neos['element_count']):
        
            dict_data = {}
            dict_data['ref_date'] = self._date_ref

            for i in self.tuple_infos1:
                dict_data[i] = self._neos_data[f][i]

            for i in self.tuple_infos2:
            
                if i == 'relative_velocity':
                    rv = ('kilometers_per_hour', 'kilometers_per_second')
                    for j in rv:
                        dict_data[j] = self._neos_data[f]['close_approach_data'][0][i][j]

                else:
                    dict_data[i] = self._neos_data[f]['close_approach_data'][0][i]

            for i in self.tuple_infos3:
                dict_data[f'{i}_meters'] = self._neos_data[f]['estimated_diameter']['meters'][i]

            self.data_l.append(dict_data)

        self.df = pd.DataFrame(self.data_l)
        self.df[self.df.columns[-4:]] = self.df[self.df.columns[-4:]].astype(str)
        self.df['is_potentially_hazardous_asteroid'] = self.df['is_potentially_hazardous_asteroid'].astype(str)
        self.df.rename(columns={'id':'neo_id'},inplace=True)
                
        return self.df


In [None]:
def connection_db (dataframe,table_name):

    load_dotenv()

    try: 
        
        conn = psycopg2.connect(
            host=os.getenv("PSQL_HOST"),
            port=os.getenv("PSQL_PORT"),
            database=os.getenv("PSQL_DATABASE"),
            user=os.getenv("PSQL_USER"),
            password=os.getenv("PSQL_PASSWORD")
        )
    
    except psycopg2.Error as error:
    
        print(f"database connection error: \n-- {error}\nplease try again!") 


    if dataframe.empty == False:

        columns = ', '.join(dataframe.columns)
        values = [tuple(row) for row in dataframe.values]
        sql_insert = f"INSERT INTO {table_name} ({columns}) VALUES %s"

        try:
    
            cursor = conn.cursor()
            psycopg2.extras.execute_values(cursor, sql_insert, values)
            conn.commit()
            cursor.close()
            conn.close()

        except Exception as e:
            conn.rollback()
            print(e)
            
    else: 
        print('Empty Dataframe!')
    

In [7]:
@retry(
    stop=stop_after_attempt(10),  
    wait=wait_exponential(multiplier=2, min=2, max=12) 
)

def flares():

    data_flares = Flares().flares_data()

    if data_flares is None:

        raise ValueError("Failed to fetch Flares data.")
    
    return data_flares

flares_data = flares() 

if flares_data.empty == True:
    pass

else:
    #lançar os dados 
    flares_data

In [8]:
@retry(
    stop=stop_after_attempt(10),  
    wait=wait_exponential(multiplier=2, min=2, max=12) 
)

def crew():

    crew = Iss().crew_members()

    if crew is None:

        raise ValueError("Failed to fetch crew members.") 
    
    return crew

crew_data = crew()

In [9]:
@retry(
    stop=stop_after_attempt(10),  
    wait=wait_exponential(multiplier=2, min=2, max=12) 
)

def neos():

    neos = NEOs().neos_data()

    if neos is None:
        raise ValueError("Failed to fetch crew NEO(s) data.")

    return neos 

In [None]:
@retry(
    stop=stop_after_attempt(10),  
    wait=wait_exponential(multiplier=2, min=2, max=12) 
)

def position():

    pos = Iss().iss_position()

    if pos is None or pos == 'Failed to connect.':

        raise ValueError("Failed to fetch ISS Position.")  
    
    return pos


def pos_sender():

    data_list = []

    for i in range(3):

        try:

            iss_pos = position()
            data_list.append(iss_pos)
            sleep(10)

        except Exception as e:
            return e

    
    iss_df = pd.DataFrame(data_list)

    iss_df['timestamp'] = pd.to_datetime(iss_df['timestamp'], unit = 's')
    iss_df['latitude'] = iss_df['latitude'].astype(str)
    iss_df['longitude'] = iss_df['longitude'].astype(str)
    iss_df['timestamp'] = iss_df['timestamp'].astype(str)
    iss_df.rename(columns={'timestamp':'date_hour_ref'},inplace = True)

    return iss_df
