# Load Data From DBFS and Save it to Hive

I couldn't figure out how to save the data as Gold delta live tables, but didn't want to let that hold me

In [0]:
from delta.tables import DeltaTable

## SAVE FUSER TRAINING DATA

In [0]:
fuser_data_types = [
    'configs_data_set',
    'runways_data_set',
    'first_position_data_set',
    'TBFM_data_set',
    'TFM_track_data_set',
    'ETD_data_set',
    'LAMP_data_set',
    'MFS_data_set',
]
list_of_airport_centers = ['KATL', 'KCLT', 'KDEN', 'KDFW', 'KJFK', 'KMEM', 'KORD', 
                           'KPHX', 'KSEA'] 
fuser_df = {airport_center:{} for airport_center in list_of_airport_centers}

In [0]:
# create a dictionary of dataframe for each airport and each data table in FUSER
for airport_center in list_of_airport_centers:
    for fuser_data_type in fuser_data_types:
        fuser_df[airport_center][fuser_data_type] = spark.read.csv(f"dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/FUSER_train_{airport_center}/{airport_center}/*.{fuser_data_type}.csv", header=True, inferSchema=True)

## Test with one airport

In [0]:
atl_runways_df = fuser_df['KATL']['runways_data_set']


In [0]:
# save the table to a delta live
fuser_data_type = 'runways_data_set'
airport_name = 'KATL'
table_name = f"train_{airport_name}__{fuser_data_type}_table"
print(f"\t Processing {table_name}")
atl_runways_df.write.option("mergeSchema", "true").format("delta").mode("append").saveAsTable(table_name)

In [0]:
# save data as delta live table
for airport_center in list_of_airport_centers:
    for fuser_data_type in fuser_data_types:
        fuser_df[airport_center][fuser_data_type].write.format("delta").mode("overwrite").saveAsTable(f"fuser_train_{airport_center}_{fuser_data_type}")

It seems to have worked.
How do I read back the table now


In [0]:
atl_runways_df.write.format("delta").mode("overwrite").saveAsTable(f"fuser_train_{airport_name}_{fuser_data_type}")

## Read Hive metastore tables

In [0]:
spark = SparkSession \
            .builder \
            .appName("HiveTest") \
            .enableHiveSupport() \
            .getOrCreate()

In [0]:
spark.sql("show databases").show()

In [0]:

atl_runway_data_df = spark.table("fuser_train_katl_runways_data_set") 

In [0]:
atl_config_data_df = spark.table("fuser_train_katl_configs_data_set")

# Save FUSER TEST DATA

In [0]:
dbutils.fs.ls("dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/")

In [0]:
dbutils.fs.ls("dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/FUSER_test/")

In [0]:
dbutils.fs.ls("dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/FUSER_test/FUSER_test")

In [0]:
dbutils.fs.ls("dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/FUSER_test/FUSER_test/KATL")

In [0]:
fuser_test_df = {airport_center:{} for airport_center in list_of_airport_centers}
for airport_center in list_of_airport_centers:
    for fuser_data_type in fuser_data_types:
        fuser_test_df[airport_center][fuser_data_type] = spark.read.csv(f"dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/FUSER_test/FUSER_test/{airport_center}/{airport_center}_*.{fuser_data_type}.csv", header=True, inferSchema=True)

In [0]:
# save data as delta live table
for airport_center in list_of_airport_centers:
    for fuser_data_type in fuser_data_types:
        fuser_test_df[airport_center][fuser_data_type].write.format("delta").mode("overwrite").saveAsTable(f"fuser_test_{airport_center}_{fuser_data_type}")

# Save METAR DATA

- METAR_test
- METAR_train

METAR (Meteorological Aerodrome Report) is a type of aviation weather observation that provides detailed information about the current weather conditions at airports around the world. These reports are critical for flight operations, providing real-time data on factors such as wind speed and direction, visibility, cloud cover, temperature, dew point, and atmospheric pressure. METAR reports are typically issued hourly, but can be updated more frequently if conditions change rapidly.


The METAR data is provided in text files (.txt), organized by hour, with each file representing the weather conditions observed at various airports around a specific hour. The directory structure is as follows:

    data/
        └── METAR/ 
          ├── metar.20220901.00Z.txt 
          ├── metar.20220901.01Z.txt 
          ├── metar.20220901.02Z.txt
          └── ... 

In [0]:
dbutils.fs.ls("dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/METAR_test/METAR_test/")

In [0]:
# read METAR test data
metar_test_df = spark.read.csv(f"dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/METAR_test/METAR_test/metar.*.txt", header=False, inferSchema=True)

In [0]:
display(metar_test_df)
# there is only one column of data with various comments.
# technically the first rwo is the date and the time, the second rowcontains information about the weather
# Read https://en.wikipedia.org/wiki/METAR for more info on the format

Databricks data profile. Run in Databricks to view.

In [0]:
metar_test_df = metar_test_df.toDF('weather_comments')
# we will probably have to clean the data to parse for airport identifier, time of observation, wind direction and speed, visibility, current weather phenomena such as precipitation, cloud cover and heights, temperature, dew point, and barometric pressure.
metar_test_df.write.format("delta").mode("overwrite").saveAsTable(f"metar_test")

### Read Metar Train data

In [0]:
dbutils.fs.ls("dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/METAR_train/METAR_train_part_1/METAR_train_part_1/")

In [0]:
from pyspark.sql.types import StringType, StructType, StructField
# read METAR train data
schema = StructType([
    StructField("weather_comments", StringType(), True)])
metar_train_df = spark.read.csv(f"dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/METAR_train/METAR_train_part_*/METAR_train_part_*/metar.*.txt", header=False, schema=schema)

In [0]:
metar_train_df.write.format("delta").mode("overwrite").saveAsTable(f"metar_train")

## Read and Save TAF data

TAF (Terminal Aerodrome Forecast) is a format used to provide weather forecasts specifically for aviation. TAF reports give a detailed forecast for a 24 to 30-hour period and are updated every 6 hours. They provide predictions about wind, visibility, weather phenomena, and cloud cover that are crucial for planning flights

TAF report includes the forecasted weather conditions for an airport and is typically longer and more complex than a METAR. An example row from a TAF file might look like

TAF data is provided in text files (.txt), organized by issuance time, with each file representing the forecasts issued for a 6-hour period. The directory structure is as follows:

     data/
       └── TAF/
          ├── taf.20220901.00Z.txt
          ├── taf.20220901.06Z.txt
          ├── taf.20220901.12Z.txt
          ├── taf.20220901.18Z.txt
          ├── taf.20220902.00Z.txt
          └── ...    



*The files will likely have  to be reprocessed after we get a better understanding of the format*

### TAF train

In [0]:
dbutils.fs.ls("dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/TAF_train/TAF_train/")

In [0]:
# read TAF data
taf_train_df = spark.read.csv(f"dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/TAF_train/TAF_train/taf.*.txt", header=False, inferSchema=True)

In [0]:
taf_train_df.write.format("delta").mode("overwrite").saveAsTable(f"taf_train")

### TAF Test

In [0]:
dbutils.fs.ls("dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/TAF_test/TAF_test")

In [0]:
# read TAF test data
taf_test_df = spark.read.csv(f"dbfs:/mnt/nasa_challenge/1-raw-unzipped-files/TAF_test/TAF_test/taf.*.txt", header=False, inferSchema=True)

In [0]:
taf_test_df.write.format("delta").mode("overwrite").saveAsTable(f"taf_test")