https://aws-data-wrangler.readthedocs.io/en/stable/stubs/awswrangler.s3.read_csv.html

The scrip below performs the following:
1. Read all ADS-B jason files as a single dataframe
2. Compute NOAA RAP pressure level (hPa) for each flight track record in the dataframe
3. Filter the records in the dataframe for CONUS
4. Save the dataframe to AWS S3 Bucket

In [None]:
pip install awswrangler

In [None]:
pip install haversine

In [None]:
import sagemaker
import boto3
import awswrangler as wr

import pandas as pd
import numpy as np
from datetime import datetime

import json
from haversine import haversine, Unit
from scipy.spatial import distance

In [None]:
input_bucket = 'adsb-data'
input_subfolder = ''

output_bucket = 'partly-cloudy-common-area'
output_subfolder = ''

In [None]:
from sagemaker import get_execution_role
role = get_execution_role()

In [None]:
# Examine the files in the 'adsb-data s3 Bucket
conn = boto3.client('s3')
contents = conn.list_objects(Bucket= input_bucket, Prefix= input_subfolder)['Contents']
for f in contents:
    print(f['Key'])

In [None]:
# Examine the contents in the 'adsb-data' s3 Bucket
contents

In [None]:
# Get all file names in 'adsb-data' Bucket into a list 
myFileNames = [f['Key'] for f in contents] # <---------------------- Using 'AWS contents' does not work!!!
# myFileNames = ['2016_06_20_0002Z.json', '2016_06_20_0003Z.json']

In [None]:
# Generate all file names assume to be in 'adsb-data' <--------------------------------- Redo by Iteration
dateTimes = pd.date_range('2016-06-20 00:00:00', '2016-06-20 23:59:00', freq= 'T')
dateTimesSer = pd.Series([str(dateTime) for dateTime in dateTimes], name= 'temp')
myFileNames = [dateTimesSer[i][0:4] + '_' + dateTimesSer[i][5:7] + '_' + dateTimesSer[i][8:10] + '_' + dateTimesSer[i][11:13] + dateTimesSer[i][14:16] +'Z.json' for i in range(len(dateTimesSer))]

In [None]:
myFileNames

In [None]:
%%time
# Obtain necessary data items from all json files in 'adsb-data' in a single dataframe

dt = []
eyedee = []
alt = []
lat = []
long = []

for myFileName in myFileNames:
    s3_obj = boto3.client('s3')
    s3_myobj = s3_obj.get_object(Bucket= input_bucket, Key= myFileName)
    s3_mydata = s3_myobj['Body'].read().decode('utf-8')
    js = json.loads(s3_mydata)
    
    dattim = datetime(year= int(myFileName[0:4]),
                      month= int(myFileName[5:7]),
                      day= int(myFileName[8:10]),
                      hour= int(myFileName[11:13]),
                      minute= int(myFileName[13:15]),
                      second= 0)
    
    for i in range(len(js['acList'])):
        dt.append(str(dattim))
        if "Id" in js['acList'][i].keys():
            eyedee.append(js['acList'][i]['Id'])
        else:
            eyedee.append(np.nan)
        if "Alt" in js['acList'][i].keys():
            alt.append(js['acList'][i]['Alt'])
        else:
            alt.append(np.nan)
        if ("Lat" in js['acList'][i].keys()):
            if type(js['acList'][i]['Lat']) == np.float:
                lat.append(js['acList'][i]['Lat'])
            else:
                lat.append(np.nan)
        else:
            lat.append(np.nan)
        if "Long" in js['acList'][i].keys():
            long.append(js['acList'][i]['Long'])
        else:
            long.append(np.nan)


In [None]:
df = pd.DataFrame({"DateTime":dt, "id":eyedee, 'Altitude':alt, 'Latitude':lat, 'Longitude':long})

In [None]:
df

In [None]:
# Examine number of records with NULL
df.isnull().sum()

In [None]:
df['DateTime'].value_counts()

In [None]:
df_select = df.dropna(axis= 0, how= 'any').copy() # Per Dr. Cherry on 6/17/2021

In [None]:
df_select

In [None]:
latlong_bucket = 'partly-cloudy-common-area'
latlongFileName = 'latlonRAP.csv'
latlongRAP = wr.s3.read_csv(path= f"s3://{latlong_bucket}/{latlongFileName}")

In [None]:
latlongRAP

In [None]:
minLatRAP = min(latlongRAP['Lat']) - 0.04
maxLatRAP = max(latlongRAP['Lat']) + 0.04
minLonRAP = min(latlongRAP['Lon']) - 0.2
maxLonRAP = max(latlongRAP['Lon']) + 0.2

In [None]:
adsb_conus = df_select.loc[(df_select['Latitude'] >= minLatRAP) & 
                          (df_select['Latitude'] <= maxLatRAP) &
                          (df_select['Longitude'] >= minLonRAP) & 
                          (df_select['Longitude'] <= maxLonRAP)].copy()

In [None]:
adsb_conus.reset_index(inplace= True)

In [None]:
adsb_conus

In [None]:
# Obtain NOAA RAP hPa (to the nearest 25 hPa) from altitude value in feet
def alt_ft_to_nearest_hPa(alt):
    if alt > 51805: # upper limit altitude of 100 hPa (51805 ft)
        return 100
    elif alt < 364: # lower limit altitude of 1000 hPa (364 ft)
        return 1000
    else:
        a = (1/0.190284)
        b =  alt/145366.45
        P_hpa  = ((1-b)**a) * 1013.25
        return int(25 * round(P_hpa/25))

In [None]:
ls = list(adsb_conus['Altitude'])
nearest_hpa = [alt_ft_to_nearest_hPa(i) for i  in ls]

adsb_conus['hPa'] = nearest_hpa

In [None]:
adsb_conus

In [None]:
# Develop lists of lat,long tuples for haversine
adsb_coord = list(zip(adsb_conus['Latitude'], adsb_conus['Longitude'])) # a list of (Latitude, Longitude) tuples from "adsb_conus"
latlonRAP_coord = list(zip(latlongRAP['Lat'], latlongRAP['Lon'])) # a list of (Lat, Lon) tuples from "latlongRAP"

In [None]:
%%time
from datetime import datetime

# for each (Latitude, Longitude) tuple in "adsb_coord" list, find the index in 'latlonRAP_coord' that has the minimum distance to (Lat, Lon) tuple 
idx = []
for i in range(len(adsb_coord)):
    idx.append(np.argmin([haversine(adsb_coord[i], latlonRAP_coord[j]) for j in range(len(latlonRAP_coord))]))
    print(i, datetime.now().time())

In [None]:
x = np.array(latlongRAP['Nx']) # All "Nx" in 'latlongRAP' in an array
y = np.array(latlongRAP['Ny']) # All "Ny" in 'latlongRAP' in an array

adsb_conus['Nx'] = list(x[idx]) # "Nx" values with indexes with minimum distance
adsb_conus['Ny'] = list(y[idx]) # "Ny" values with indexes with minimum distance

In [None]:
# Examine the list of files are already in the output_bucket (i.e., "partly-cloudy-common-area")
conn = boto3.client('s3')
contents = conn.list_objects(Bucket= output_bucket, Prefix= output_subfolder)['Contents']
for f in contents:
    print(f['Key'])

In [None]:
outputFileName = 'adsb_conus.parquet' # <--------------------------------------------- Specify

wr.s3.to_csv(df_select, f"s3://{output_bucket}/{outputFileName}", index=False)

In [None]:
wr.s3.to_parquet(adsb_conus, f"s3://{output_bucket}/{outputFileName}")