<a href="https://colab.research.google.com/github/Tajara/starlink_api_ingestion/blob/main/Starlink_Challenge.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Blue Onion Labs Take Home Test
Hey! We are stoked that you are interested in joining the team at Blue Onion Labs.

We have crafted the following test to see how you approach pulling and manipulating of data. We want to get a general idea of how you approach some common types of problems that we encounter here at Blue Onion (we are really proficient at integrations!)

Background
spacexdata.com provides an API to query attributes about SpaceX launches (https://github.com/r-spacex/SpaceX-API/tree/master/docs#rspacex-api-docs). For this exercise we are going to be working with one resource in particular:

The Starlink Schema
For this exercise, no need to pull directly from the API as we have a pull of historical data here in this repo in the starlink_historical_data.json

The Problem:
We want to be achieve a few goals:

To import the SpaceX Satellite data as a time series into a database
To be able to query the data to determine the last known latitude/longitude of the satellite for a given time
The Task (Part 1):
Stand up your favorite kind of database (and ideally it would be in a form that would be runnable by us, via something like docker-compose).

The Task (Part 2):
Write a script (in whatever language that you prefer, though Ruby, Python, or Javascript would be ideal for us) to import the relevant fields in starlink_historical_data.json as a time series. The relevant fields are: - spaceTrack.creation_date (represents the time that the lat/lon records were recorded) - longitude - latitude - id (this is the starlink satellite id) Again, the goal is that we want to be able to query the database for the last known position for a given starlink satellite. Don't hesitate to use any tools/tricks you know to load data quickly and easily!

The Task (Part 3):
Write a query to fetch the last known position of a satellite (by id), given a time T. Include this query in your README or somewhere in the project submission

Bonus Task (Part 4):
Write some logic (via a combination of query + application logic, most likely) to fetch from the database the closest satellite at a given time T, and a given a position on a globe as a (latitude, longitude) coordinate.

No need to derive any fancy match for distances for a point on the globe to a position above the earth. You can just use the Haversine formula. Example libraries to help here:

For Python: https://github.com/mapado/haversine

For Ruby: https://github.com/kristianmandrup/haversine

How to Submit
Run through it one last time to make sure it works!
Push the code up to your repo one last time (or save your working directory to a 'zip')
Reach out to us with your solution
Questions
If you have any questions at all during the challenge do not hesitate to reach out! Whether it be a question about the requirements, submitting, anything, just send us a note!

In [1]:
#Pyspark installation
!pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
#SqlAlchemy Installation
!pip install sqlalchemy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [3]:
#loading sql extension on sqlite
%load_ext sql

In [4]:
#Lib Imports
from sqlalchemy import create_engine
import sqlite3
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import requests
import json
import pandas as pd


#spark setup
spark = SparkSession.builder.appName('starlink_data_ingestion').getOrCreate()


QUESTIONS 1 AND 2 - DATA INGESTION TO DATABASE

In [5]:
# CREATING DATA OUTPUT TABLE starlink ON test.db
import sqlite3

conn = sqlite3.connect('test.db')
print("Opened database successfully");

conn.execute('''
CREATE TABLE  IF NOT EXISTS starlink (
  'index' integer
  ,height_km decimal(5,18) 
  ,id varchar(200) 
  ,latitude decimal(5,18)
  ,launch varchar(200) 
  ,longitude decimal(5,18)
  ,velocity_kms decimal(5,18) 
  ,version varchar(200) 
  ,inclination varchar(200) 
  ,arg_of_pericenter varchar(200) 
  ,mean_motion varchar(200) 
  ,decay_date varchar(200) 
  ,periapsis varchar(200) 
  ,ra_of_asc_node varchar(200) 
  ,ephemeris_type varchar(200) 
  ,site varchar(200) 
  ,rcs_size varchar(200) 
  ,norad_cat_id varchar(200) 
  ,mean_motion_ddot varchar(200) 
  ,apoapsis varchar(200) 
  ,comment varchar(200) 
  ,mean_anomaly varchar(200) 
  ,tle_line0 varchar(200) 
  ,mean_element_theory varchar(200) 
  ,object_name varchar(200) 
  ,mean_motion_dot varchar(200) 
  ,eccentricity varchar(200) 
  ,classification_type varchar(200) 
  ,ccsds_omm_vers varchar(200) 
  ,time_system varchar(200) 
  ,element_set_no varchar(200) 
  ,ref_frame varchar(200) 
  ,country_code varchar(200) 
  ,epoch varchar(200) 
  ,period varchar(200) 
  ,tle_line1 varchar(200) 
  ,file varchar(200) 
  ,bstar varchar(200) 
  ,tle_line2 varchar(200) 
  ,rev_at_epoch varchar(200) 
  ,decayed varchar(200) 
  ,semimajor_axis varchar(200) 
  ,object_type varchar(200) 
  ,object_id varchar(200) 
  ,gp_id varchar(200) 
  ,creation_date varchar(200) 
  ,launch_date varchar(200) 
  ,center_name varchar(200) 
  ,originator varchar(200)
);
''')

conn.commit()

print("Table created successfully");

conn.close()

Opened database successfully
Table created successfully


In [6]:
#Class responsible to access the starlink api and append the results in a list (lst)

class DataIngestion():
      
    def api_request(self):
      #Access the Api and return the json row
        loaded_data = requests.get(
            f'https://api.spacexdata.com/v4/starlink')
        if loaded_data.status_code == 200:
            return loaded_data.json()
        else:
            return loaded_data.status_code

    def data_ingestion(self):
      #Loop into the data available appending it in a list
        lst_json = []

        api_data = self.api_request()
        if type(api_data) is not int:
            for i in range(len(api_data)):
               lst_json.append(api_data[i])
        else:
            print(api_data)
        return lst_json

path = DataIngestion()
json_list = path.data_ingestion()
print('list created with API data')

#Pyspark Dataframe creation from the json list
df = spark.createDataFrame(json_list)
print('Pyspark Dataframe Created')


list created with API data
Pyspark Dataframe Created


In [7]:
#DATA TRANSFORMATION

class DataTransformation():

  def dataframeCreation(self,df):
    #Pyspark Dataframe creation from the json list
    df = spark.createDataFrame(json_list)
    return df

  def explodeSpaceTrack(self,df):
    #The data where structured in columns on pyspark dataframe, but the column spaceTrack at this point it`s not splitted below there`s an example of a register from the column
    #{TIME_SYSTEM -> UTC, TLE_LINE1 -> 1 44244U 19029K   20287.12291165  .47180237  12426-4  22139-2 0  9995, TLE_LINE0 -> 0 STARLINK-30, TLE_LINE2 -> 2 44244  52.9708 332.0356 0003711 120.7278 242.0157 16.43170483 77756, RA_OF_ASC_NODE -> 332.0356, ELEMENT_SET_NO -> 999, MEAN_ANOMALY -> 242.0157, CENTER_NAME -> EARTH, APOAPSIS -> 159.809, DECAYED -> 1, OBJECT_TYPE -> PAYLOAD, COMMENT -> GENERATED VIA SPACE-TRACK.ORG API, MEAN_MOTION_DOT -> 0.47180237, OBJECT_ID -> 2019-029K, MEAN_MOTION -> 16.43170483, CLASSIFICATION_TYPE -> U, INCLINATION -> 52.9708, EPOCH -> 2020-10-13T02:56:59.566560, FILE -> 2850561, PERIAPSIS -> 154.958, BSTAR -> 0.0022139, ORIGINATOR -> 18 SPCS, MEAN_MOTION_DDOT -> 1.2426E-5, SITE -> AFETR, DECAY_DATE -> 2020-10-13, LAUNCH_DATE -> 2019-05-24, NORAD_CAT_ID -> 44244, CREATION_DATE -> 2020-10-13T04:16:08, REF_FRAME -> TEME, MEAN_ELEMENT_THEORY -> SGP4, SEMIMAJOR_AXIS -> 6535.519, EPHEMERIS_TYPE -> 0, ARG_OF_PERICENTER -> 120.7278, CCSDS_OMM_VERS -> 2.0, PERIOD -> 87.635, ECCENTRICITY -> 3.711E-4, OBJECT_NAME -> STARLINK-30, RCS_SIZE -> LARGE, REV_AT_EPOCH -> 7775, GP_ID -> 163365918, COUNTRY_CODE -> US}
    #So, the code on this cell is responsible to persist the columns defined on de variable: col and split the registers on the column spaceTrack in new columns
    cols = ['height_km','id','latitude','launch','longitude','velocity_kms','version']

    keys_df = df.select(
        explode(map_keys(df.spaceTrack))).distinct()
    keys = list(map(lambda row: row[0], keys_df.collect()))
    key_cols = list(map(lambda f: df.spaceTrack.getItem(f).alias(str(f)), keys))
    final_cols = cols + key_cols

    print('spaceTrack field splitted')
    return df.select(final_cols)
  
  def columnsToLower(self,df):
    #The keys on the column spaceTrack are on Higher case, so the script below turn them all the columns on dataframe into lower case 
    for col in df.columns:
      df = df.withColumnRenamed(col, col.lower())

    print('columns renamed')  
    return df
  
  def castToPandas(self,df):
    #A Pandas Dataframe is created from a pyspark dataframe
    df1 = df.toPandas() 
    print('pandas dataframe created')

    return df1

transform = DataTransformation()
#There`s no need of memory allocation with multiple dataframes, so the code below overwrites the previously dataframe 
df = transform.dataframeCreation(df)
df = transform.explodeSpaceTrack(df)
df = transform.columnsToLower(df)
pd_df = transform.castToPandas(df)


spaceTrack field splitted
columns renamed
pandas dataframe created


In [8]:
class DataLoad():

  def createEngine():
    #The native database (Sqlite) from google colab is being used on this challenge so, in this cell the engine with the database test.db is created
    engine = create_engine('sqlite:///test.db', echo=False)
    print('engine created')
    return engine

  def insertSqliteDB(pd_df,engine):
    #Insert on table starlink from the pandas dataframe with append mode is executed
    pd_df.to_sql('starlink', con=engine, if_exists='append')
    print('data insertion completed')
    return

load = DataLoad()
engine = DataLoad.createEngine()
DataLoad.insertSqliteDB(pd_df,engine)


engine created
data insertion completed


**The Task (Part 3): Write a query to fetch the last known position of a satellite (by id), given a time T. Include this query in your README or somewhere in the project submission**

In [9]:
#Function responsible to print the results from the question 3

def satellite_position(id):
 
  conn = sqlite3.connect('test.db')

  cursor = conn.execute(''' SELECT 
                            id,

                            datetime(epoch),
                            CASE 
                              WHEN latitude IS NULL THEN 'unavailable'
                            ELSE
                                latitude
                            END 
                                latitude,
                            CASE 
                              WHEN longitude IS NULL THEN 'unavailable'
                            ELSE
                                longitude
                            END 
                                longitude

                            FROM 
                              starlink
                            WHERE
                              id = ?
                            ORDER BY
                              datetime(epoch) desc
                            LIMIT 
                              1
                            
                          ;''',[id])

  for row in cursor:
    print(row)
  conn.close()

  return 
  

In [10]:
#QUESTION 3 TEST - PROVIDE DE SATTELITE ID TO KNOW THE LAST POSITION IN LATITUDE AND LONGITUDE
#Edit the variable id with an id string, there`s some examples below
#5eed7714096e59000698562b
#5eed7714096e5900069856b0
#5eed7715096e59000698573d
#5eed7716096e5900069857ec
#5fc7ec8be4130000069e2c02
#5eed7714096e59000698567d
#5eed7714096e590006985680
#5eed7715096e59000698575d
#5eed7715096e590006985760
#5eed7716096e5900069857d6


id = '5eed7714096e590006985648'




schema = "('id','epoch','latitude','longitude')"
print('Question 3 resolution')
print(schema)
satellite_position(id)


Question 3 resolution
('id','epoch','latitude','longitude')
('5eed7714096e590006985648', '2022-05-26 12:17:16', 51.125722079524266, 149.29128965731803)
