In [1]:
%pip install pyspark==3.5.0 python-dotenv requests pandas

Note: you may need to restart the kernel to use updated packages.


In [4]:
import os
import sys
from pathlib import Path
from dotenv import load_dotenv
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 1. LOAD CREDENTIALS (Identical to Silver script)
# Since your notebook is in /notebooks, we go up one level to find .env
env_path = Path.cwd().parent / '.env'
load_dotenv(dotenv_path=env_path)

STORAGE_ACCOUNT = os.getenv("STORAGE_ACCOUNT")
CLIENT_ID       = os.getenv("CLIENT_ID")
TENANT_ID       = os.getenv("TENANT_ID")
CLIENT_SECRET   = os.getenv("CLIENT_SECRET")

# 2. BUILD THE SPARK SESSION
spark = SparkSession.builder \
    .appName("Aviation_EDA") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-azure:3.3.4,com.microsoft.azure:azure-storage:8.6.6") \
    .config(f"fs.azure.account.auth.type.{STORAGE_ACCOUNT}.dfs.core.windows.net", "OAuth") \
    .config(f"fs.azure.account.oauth.provider.type.{STORAGE_ACCOUNT}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") \
    .config(f"fs.azure.account.oauth2.client.id.{STORAGE_ACCOUNT}.dfs.core.windows.net", CLIENT_ID) \
    .config(f"fs.azure.account.oauth2.client.secret.{STORAGE_ACCOUNT}.dfs.core.windows.net", CLIENT_SECRET) \
    .config(f"fs.azure.account.oauth2.client.endpoint.{STORAGE_ACCOUNT}.dfs.core.windows.net", f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/token") \
    .getOrCreate()

# 3. LOAD THE DATA
silver_path = f"abfss://bronze@{STORAGE_ACCOUNT}.dfs.core.windows.net/raw_flights"
df_bronze = spark.read.parquet(silver_path)

# 4. PREVIEW
df_bronze.limit(5).toPandas() # .toPandas() makes it look much nicer in Notebooks than .show()

Unnamed: 0,icao24,callsign,origin_country,time_position,last_contact,long,lat,baro_altitude,on_ground,velocity,true_track,vertical_rate,sensors,geo_altitude,squawk,spi,position_source
0,ac491d,DAL1509,United States,1770995000.0,1770995155,-91.1856,31.6974,9753.6,False,202.92,248.9,0.0,,10119.36,2064.0,False,0
1,ab1614,SCU42,United States,,1770995156,,,,False,47.85,179.38,-0.33,,,,False,0
2,a114f4,AAL1644,United States,1770995000.0,1770995156,-96.8763,32.7597,3619.5,False,125.88,315.99,0.0,,3749.04,2654.0,False,0
3,a5958c,SWA1462,United States,1770995000.0,1770995155,-84.043,30.3229,11887.2,False,275.82,148.27,0.0,,12245.34,6567.0,False,0
4,a5f87d,N484LP,United States,1770995000.0,1770995155,-112.1851,34.2046,1965.96,False,34.87,338.36,0.33,,1981.2,,False,0


In [9]:
df_bronze.describe().toPandas()

26/02/14 07:47:51 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,summary,icao24,callsign,origin_country,time_position,last_contact,long,lat,baro_altitude,velocity,true_track,vertical_rate,sensors,geo_altitude,squawk,position_source
0,count,200,200,200,195.0,200.0,195.0,195.0,178.0,199.0,200.0,184.0,0.0,178.0,104.0,200.0
1,mean,514514.23529411765,,,1770995121.4358974,1770995136.2,-26.998386153846155,36.86246974358975,7255.267415730337,162.53170854271357,191.2316,0.6518478260869567,,7272.433820224719,3139.9326923076924,0.0
2,stddev,321084.1704387919,,,189.749671627738,59.16062793881947,66.52661104865352,12.720917448177948,4220.833129486218,81.9411298969163,103.40150221219562,5.134742464178498,,4250.631861129824,2321.8043923212804,0.0
3,min,00b22b,,Austria,1770992637.0,1770994842.0,-135.1395,-33.115,-53.34,0.0,0.0,-15.28,,7.62,22.0,0.0
4,max,e8026f,WZZ9GN,United States,1770995156.0,1770995156.0,100.8535,66.9388,13716.0,290.21,359.61,20.48,,13860.78,7660.0,0.0


In [11]:
df_bronze.dtypes

[('icao24', 'string'),
 ('callsign', 'string'),
 ('origin_country', 'string'),
 ('time_position', 'bigint'),
 ('last_contact', 'bigint'),
 ('long', 'double'),
 ('lat', 'double'),
 ('baro_altitude', 'double'),
 ('on_ground', 'boolean'),
 ('velocity', 'double'),
 ('true_track', 'double'),
 ('vertical_rate', 'double'),
 ('sensors', 'string'),
 ('geo_altitude', 'double'),
 ('squawk', 'string'),
 ('spi', 'boolean'),
 ('position_source', 'bigint')]

In [None]:
from pyspark.sql.functions import col, count, when

#count nulls in each column
df_bronze.select([count(when(col(c).isNull(), c)).alias(c) for c in df_bronze.columns]).toPandas()

                                                                                

Unnamed: 0,icao24,callsign,origin_country,time_position,last_contact,long,lat,baro_altitude,on_ground,velocity,true_track,vertical_rate,sensors,geo_altitude,squawk,spi,position_source
0,0,0,0,5,0,5,5,22,0,1,0,16,200,22,96,0,0


In [12]:
#check for duplicates
total_count = df_bronze.count()
distinct_count = df_bronze.dropDuplicates(["icao24", "time_position"]).count()

print(f"Duplicate rows found: {total_count - distinct_count}")



Duplicate rows found: 0


                                                                                

In [13]:
#check time format
df_bronze.select("time_position").show(5)

+-------------+
|time_position|
+-------------+
|   1770995155|
|         NULL|
|   1770995156|
|   1770995155|
|   1770995155|
+-------------+
only showing top 5 rows

