In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
spark.version

'3.5.2'

In [3]:
from pyspark.sql.functions import col, explode, to_timestamp, lit, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, DecimalType

# 1. Cities table

In [138]:
bronze_cities_df = spark.read.json(r"C:\Users\karee\Downloads\file_0_20250320_112647.json")

#### Step 1: Explode JSON fields to flatten nested structures
##### Extract the "data" array from the JSON

In [139]:
cities_data_df = bronze_cities_df.select(explode(col("data")).alias("city"))

#### Step 2: Schema enforcement

##### Select relevant fields and apply the schema 

In [140]:
cities_data_df_enforced = cities_data_df.select(
    col("city.city_name").cast(StringType()).alias("city_name"),
    col("city.iata_code").cast(StringType()).alias("iata_code"),
    col("city.country_iso2").cast(StringType()).alias("country_iso2"),
    col("city.latitude").cast(DoubleType()).alias("latitude"),
    col("city.longitude").cast(DoubleType()).alias("longitude"),
    col("city.timezone").cast(StringType()).alias("timezone"),
    col("city.gmt").cast(DecimalType(4,2)).alias("gmt"),
    col("city.geoname_id").cast(IntegerType()).alias("geoname_id")
)

In [141]:
cities_data_df_enforced.show()

+------------+---------+------------+----------+----------+-------------------+------+----------+
|   city_name|iata_code|country_iso2|  latitude| longitude|           timezone|   gmt|geoname_id|
+------------+---------+------------+----------+----------+-------------------+------+----------+
|        Anaa|      AAA|          PF|    -17.05|-145.41667|     Pacific/Tahiti|-10.00|      NULL|
|    Arrabury|      AAB|          AU|     -26.7| 141.04167| Australia/Brisbane| 10.00|      NULL|
|    El Arish|      AAC|          EG| 31.133333|     33.75|       Africa/Cairo|  2.00|    361546|
|      Annaba|      AAE|          DZ| 36.821392|  7.811857|     Africa/Algiers|  1.00|   2506999|
|Apalachicola|      AAF|          US| 29.733334| -84.98333|    America/Chicago| -6.00|   4167695|
|     Arapoti|      AAG|          BR|-24.103611|    -49.79|  America/Sao_Paulo| -3.00|   3471795|
|      Aachen|      AAH|          DE| 50.775438|   6.08151|      Europe/Berlin|  1.00|   3247449|
|     Arraias|      

#### Step 3: Handle null values and standardize formats
##### Replace empty strings with corresponding appropriate values and get rid of the records with a null primary key 


In [142]:
cities_data_df_enforced.createOrReplaceTempView("silver_cities")

In [143]:

silver_cities_df = spark.sql("""
    SELECT
        COALESCE(city_name, 'Unkown') AS city_name,
        COALESCE(iata_code, 'N/A') AS iata_code,
        COALESCE(country_iso2, 'N/A') AS country_iso2,
        COALESCE(latitude, 0.0) AS latitude,
        COALESCE(longitude, 0.0) AS longitude,
        COALESCE(timezone, 'Unkown') AS timezone,
        COALESCE(gmt, 'N/A') AS gmt,
        COALESCE(geoname_id, 0) AS geoname_id
    FROM silver_cities
    WHERE city_name IS NOT NULL
""")

In [144]:
silver_cities_df.schema

StructType([StructField('city_name', StringType(), False), StructField('iata_code', StringType(), False), StructField('country_iso2', StringType(), False), StructField('latitude', DoubleType(), False), StructField('longitude', DoubleType(), False), StructField('timezone', StringType(), False), StructField('gmt', StringType(), False), StructField('geoname_id', IntegerType(), False)])

#### Step 4: Deduplication (if needed)
##### Drop duplicates based on unique identifiers

In [134]:
# print(silver_cities_df.count())
silver_cities_df = silver_cities_df.dropDuplicates(["city_name", "iata_code"])
# print(silver_cities_df.count())


#### Step5: Write into Silver layer as parquet

In [None]:
silver_cities_df.write.mode("overwrite").parquet(r"K:\file.parquet")


# 2. Countries Table

In [4]:
bronze_countries_df = spark.read.json(r"K:\New folder (2)\*.json")

#### Step 1: Explode JSON fields to flatten nested structures
##### Extract the "data" array from the JSON

In [5]:
countries_data_df = bronze_countries_df.select(explode(col("data")).alias("country"))

In [6]:
countries_data_df.count()

250

#### Step 2: Schema enforcement

##### select relevant fields and apply the schema 


In [None]:
countries_data_df_enforced = countries_data_df.select(
    col("country.country_name").cast(StringType()).alias("country_name"),
    col("country.country_iso2").cast(StringType()).alias("country_iso2"),
    col("country.country_iso3").cast(StringType()).alias("country_iso3"),
    col("country.country_iso_numeric").cast(IntegerType()).alias("country_iso_numeric"),
    col("country.population").cast(IntegerType()).alias("population"),
    col("country.capital").cast(StringType()).alias("capital"),
    col("country.continent").cast(StringType()).alias("continent"),
    col("country.currency_name").cast(StringType()).alias("currency_name"),
    col("country.currency_code").cast(StringType()).alias("currency_code"),
    col("country.fips_code").cast(StringType()).alias("fips_code"),
    col("country.phone_prefix").cast(StringType()).alias("phone_prefix")
)

In [156]:
countries_data_df_enforced.show()

+--------------------+------------+------------+-------------------+----------+--------------------+---------+-------------+-------------+---------+------------+
|        country_name|country_iso2|country_iso3|country_iso_numeric|population|             capital|continent|currency_name|currency_code|fips_code|phone_prefix|
+--------------------+------------+------------+-------------------+----------+--------------------+---------+-------------+-------------+---------+------------+
|             Hungary|          HU|         HUN|                348|   9982000|            Budapest|       EU|       Forint|          HUF|       HU|          36|
|           Indonesia|          ID|         IDN|                360| 242968342|             Jakarta|       AS|       Rupiah|          IDR|       ID|          62|
|             Ireland|          IE|         IRL|                372|   4622917|              Dublin|       EU|         Euro|          EUR|       EI|         353|
|              Israel|      

#### Step 3: Handle null values and standardize formats
##### Replace empty strings with corresponding appropriate values and get rid of the records with a null primary key 


In [157]:
countries_data_df_enforced.createOrReplaceTempView("silver_countries")

In [158]:
silver_countries_df = spark.sql("""
    SELECT
        COALESCE(country_name, 'Unknown') AS country_name,
        COALESCE(country_iso2, 'N/A') AS country_iso2,
        COALESCE(country_iso3, 'N/A') AS country_iso3,
        COALESCE(country_iso_numeric, 0) AS country_iso_numeric,
        COALESCE(population, 0) AS population,
        COALESCE(capital, 'Unknown') AS capital,
        COALESCE(continent, 'Unknown') AS continent,
        COALESCE(currency_name, 'Unknown') AS currency_name,
        COALESCE(currency_code, 'N/A') AS currency_code,
        COALESCE(fips_code, 'N/A') AS fips_code,
        COALESCE(phone_prefix, 'N/A') AS phone_prefix
    FROM silver_countries
    WHERE country_name IS NOT NULL
""")

In [160]:
silver_countries_df.schema

StructType([StructField('country_name', StringType(), False), StructField('country_iso2', StringType(), False), StructField('country_iso3', StringType(), False), StructField('country_iso_numeric', IntegerType(), False), StructField('population', IntegerType(), False), StructField('capital', StringType(), False), StructField('continent', StringType(), False), StructField('currency_name', StringType(), False), StructField('currency_code', StringType(), False), StructField('fips_code', StringType(), False), StructField('phone_prefix', StringType(), False)])

#### Step 4: Deduplication (if needed)
##### Drop duplicates based on unique identifiers

In [161]:
# print(silver_countries_df.count())
silver_countries_df = silver_countries_df.dropDuplicates(["country_name"])
# print(silver_countries_df.count())


#### Step5: Write into Silver layer as parquet

In [None]:
silver_countries_df.write.mode("overwrite").parquet(r"K:\file.parquet")


# 3. Airplanes Table:


In [162]:
bronze_airplanes_df  = spark.read.json(r"C:\Users\karee\Downloads\file_1010_20250320_124911.json")

#### Step 1: Explode JSON fields to flatten nested structures
##### Extract the "data" array from the JSON

In [163]:
airplanes_data_df = bronze_airplanes_df.select(explode(col("data")).alias("airplane"))


#### Step 2: Schema enforcement

##### select relevant fields and apply the schema 


In [164]:
airplanes_data_df_enforced = airplanes_data_df.select(
    col("airplane.registration_number").cast(StringType()).alias("registration_number"),
    col("airplane.production_line").cast(StringType()).alias("production_line"),
    col("airplane.iata_type").cast(StringType()).alias("iata_type"),
    col("airplane.model_name").cast(StringType()).alias("model_name"),
    col("airplane.model_code").cast(StringType()).alias("model_code"),
    col("airplane.icao_code_hex").cast(StringType()).alias("icao_code_hex"),
    col("airplane.iata_code_short").cast(StringType()).alias("iata_code_short"),
    col("airplane.construction_number").cast(StringType()).alias("construction_number"),
    col("airplane.test_registration_number").cast(StringType()).alias("test_registration_number"),
    col("airplane.rollout_date").cast(StringType()).alias("rollout_date"),
    col("airplane.first_flight_date").cast(StringType()).alias("first_flight_date"),
    col("airplane.delivery_date").cast(StringType()).alias("delivery_date"),
    col("airplane.registration_date").cast(StringType()).alias("registration_date"),
    col("airplane.line_number").cast(StringType()).alias("line_number"),
    col("airplane.plane_series").cast(StringType()).alias("plane_series"),
    col("airplane.airline_iata_code").cast(StringType()).alias("airline_iata_code"),
    col("airplane.airline_icao_code").cast(StringType()).alias("airline_icao_code"),
    col("airplane.plane_owner").cast(StringType()).alias("plane_owner"),
    col("airplane.engines_count").cast(IntegerType()).alias("engines_count"),
    col("airplane.engines_type").cast(StringType()).alias("engines_type"),
    col("airplane.plane_age").cast(IntegerType()).alias("plane_age"),
    col("airplane.plane_status").cast(StringType()).alias("plane_status"),
    col("airplane.plane_class").cast(StringType()).alias("plane_class")
)

In [166]:
airplanes_data_df_enforced.show()

+-------------------+----------------+---------+----------+-------------+-------------+---------------+-------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-----------+------------+-----------------+-----------------+--------------------+-------------+------------+---------+------------+-----------+
|registration_number| production_line|iata_type|model_name|   model_code|icao_code_hex|iata_code_short|construction_number|test_registration_number|        rollout_date|   first_flight_date|       delivery_date|   registration_date|line_number|plane_series|airline_iata_code|airline_icao_code|         plane_owner|engines_count|engines_type|plane_age|plane_status|plane_class|
+-------------------+----------------+---------+----------+-------------+-------------+---------------+-------------------+------------------------+--------------------+--------------------+--------------------+--------------------+-----------+--

#### Step 3: Handle null values and standardize formats
##### Replace empty strings with corresponding appropriate values and get rid of the records with a null primary key 


In [167]:
airplanes_data_df_enforced.createOrReplaceTempView("silver_airplanes")


In [168]:
silver_airplanes_df = spark.sql("""
    SELECT
        COALESCE(registration_number, 'N/A') AS registration_number,
        COALESCE(production_line, 'Unknown') AS production_line,
        COALESCE(iata_type, 'Unknown') AS iata_type,
        COALESCE(model_name, 'Unknown') AS model_name,
        COALESCE(model_code, 'Unknown') AS model_code,
        COALESCE(icao_code_hex, 'N/A') AS icao_code_hex,
        COALESCE(iata_code_short, 'N/A') AS iata_code_short,
        COALESCE(construction_number, 'N/A') AS construction_number,
        COALESCE(test_registration_number, 'N/A') AS test_registration_number,
        COALESCE(rollout_date, 'N/A') AS rollout_date,
        COALESCE(first_flight_date, 'N/A') AS first_flight_date,
        COALESCE(delivery_date, 'N/A') AS delivery_date,
        COALESCE(registration_date, 'N/A') AS registration_date,
        COALESCE(line_number, 'N/A') AS line_number,
        COALESCE(plane_series, 'N/A') AS plane_series,
        COALESCE(airline_iata_code, 'N/A') AS airline_iata_code,
        COALESCE(airline_icao_code, 'N/A') AS airline_icao_code,
        COALESCE(plane_owner, 'Unknown') AS plane_owner,
        COALESCE(engines_count, 0) AS engines_count,
        COALESCE(engines_type, 'Unknown') AS engines_type,
        COALESCE(plane_age, 0) AS plane_age,
        COALESCE(plane_status, 'Unknown') AS plane_status,
        COALESCE(plane_class, 'Unknown') AS plane_class
    FROM silver_airplanes
    WHERE registration_number IS NOT NULL
""")

In [169]:
silver_airplanes_df.schema

StructType([StructField('registration_number', StringType(), False), StructField('production_line', StringType(), False), StructField('iata_type', StringType(), False), StructField('model_name', StringType(), False), StructField('model_code', StringType(), False), StructField('icao_code_hex', StringType(), False), StructField('iata_code_short', StringType(), False), StructField('construction_number', StringType(), False), StructField('test_registration_number', StringType(), False), StructField('rollout_date', StringType(), False), StructField('first_flight_date', StringType(), False), StructField('delivery_date', StringType(), False), StructField('registration_date', StringType(), False), StructField('line_number', StringType(), False), StructField('plane_series', StringType(), False), StructField('airline_iata_code', StringType(), False), StructField('airline_icao_code', StringType(), False), StructField('plane_owner', StringType(), False), StructField('engines_count', IntegerType(),

#### Step 4: Deduplication (if needed)
##### Drop duplicates based on unique identifiers

In [171]:
# print(silver_airplanes_df.count())
silver_airplanes_df = silver_airplanes_df.dropDuplicates(["registration_number"])
# print(silver_airplanes_df.count())


#### Step5: Write into Silver layer as parquet

In [172]:
silver_airplanes_df.write.mode("overwrite").parquet(r"K:\file.parquet")


# 4. Airports Table

In [173]:
bronze_airports_df  = spark.read.json(r"C:\Users\karee\Downloads\file_1111_20250320_121457.json")

#### Step 1: Explode JSON fields to flatten nested structures
##### Extract the "data" array from the JSON

In [176]:
airports_data_df = bronze_airports_df.select(explode(col("data")).alias("airport"))


#### Step 2: Schema enforcement

##### select relevant fields and apply the schema 


In [177]:
airports_data_df_enforced = airports_data_df.select(
    col("airport.airport_name").cast(StringType()).alias("airport_name"),
    col("airport.iata_code").cast(StringType()).alias("iata_code"),
    col("airport.icao_code").cast(StringType()).alias("icao_code"),
    col("airport.latitude").cast(DoubleType()).alias("latitude"),
    col("airport.longitude").cast(DoubleType()).alias("longitude"),
    col("airport.geoname_id").cast(IntegerType()).alias("geoname_id"),
    col("airport.timezone").cast(StringType()).alias("timezone"),
    col("airport.gmt").cast(StringType()).alias("gmt"),
    col("airport.phone_number").cast(StringType()).alias("phone_number"),
    col("airport.country_name").cast(StringType()).alias("country_name"),
    col("airport.country_iso2").cast(StringType()).alias("country_iso2"),
    col("airport.city_iata_code").cast(StringType()).alias("city_iata_code")
)


#### Step 3: Handle null values and standardize formats
##### Replace empty strings with corresponding appropriate values and get rid of the records with a null primary key 


In [178]:
airports_data_df_enforced.createOrReplaceTempView("silver_airports")


In [179]:
silver_airports_df = spark.sql("""
    SELECT
        COALESCE(airport_name, 'Unknown') AS airport_name,
        COALESCE(iata_code, 'N/A') AS iata_code,
        COALESCE(icao_code, 'N/A') AS icao_code,
        COALESCE(latitude, 0.0) AS latitude,
        COALESCE(longitude, 0.0) AS longitude,
        COALESCE(geoname_id, 0) AS geoname_id,
        COALESCE(timezone, 'Unknown') AS timezone,
        COALESCE(gmt, 'N/A') AS gmt,
        COALESCE(phone_number, 'N/A') AS phone_number,
        COALESCE(country_name, 'Unknown') AS country_name,
        COALESCE(country_iso2, 'N/A') AS country_iso2,
        COALESCE(city_iata_code, 'N/A') AS city_iata_code
    FROM silver_airports
    WHERE airport_name IS NOT NULL
""")

#### Step 4: Deduplication (if needed)
##### Drop duplicates based on unique identifiers

In [None]:
# print(silver_airplanes_df.count())
silver_airports_df = silver_airports_df.dropDuplicates(["iata_code"])
# print(silver_airplanes_df.count())


100
100


#### Step5: Write into Silver layer as parquet

In [None]:
silver_airports_df.write.mode("overwrite").parquet(r"K:\file.parquet")


# 5. Airlines Table

In [182]:
bronze_airlines_df  = spark.read.json(r"C:\Users\karee\Downloads\file_10504_20250320_122519.json")

#### Step 1: Explode JSON fields to flatten nested structures
##### Extract the "data" array from the JSON

In [183]:
airlines_data_df = bronze_airlines_df.select(explode(col("data")).alias("airline"))


#### Step 2: Schema enforcement

##### select relevant fields and apply the schema 


In [None]:
airlines_data_df_enforced = airlines_data_df.select(
    col("airline.airline_name").cast(StringType()).alias("airline_name"),
    col("airline.iata_code").cast(StringType()).alias("iata_code"),
    col("airline.iata_prefix_accounting").cast(StringType()).alias("iata_prefix_accounting"),
    col("airline.icao_code").cast(StringType()).alias("icao_code"),
    col("airline.callsign").cast(StringType()).alias("callsign"),
    col("airline.type").cast(StringType()).alias("type"),
    col("airline.status").cast(StringType()).alias("status"),
    col("airline.fleet_size").cast(IntegerType()).alias("fleet_size"),
    col("airline.fleet_average_age").cast(DoubleType()).alias("fleet_average_age"),
    col("airline.date_founded").cast(IntegerType()).alias("date_founded"),
    col("airline.hub_code").cast(StringType()).alias("hub_code"),
    col("airline.country_name").cast(StringType()).alias("country_name"),
    col("airline.country_iso2").cast(StringType()).alias("country_iso2")
)

#### Step 3: Handle null values and standardize formats
##### Replace empty strings with corresponding appropriate values and get rid of the records with a null primary key 


In [185]:
airlines_data_df_enforced.createOrReplaceTempView("silver_airlines")


In [None]:
silver_airlines_df = spark.sql("""
    SELECT
        COALESCE(airline_name, 'Unknown') AS airline_name,
        COALESCE(icao_code, 'N/A') AS icao_code,
        COALESCE(iata_code, 'N/A') AS iata_code, --there are nulls here
        COALESCE(iata_prefix_accounting, 'N/A') AS iata_prefix_accounting,
        COALESCE(callsign, 'N/A') AS callsign,
        COALESCE(type, 'Unknown') AS type,
        COALESCE(status, 'Unknown') AS status,
        COALESCE(fleet_size, 0) AS fleet_size,
        COALESCE(fleet_average_age, 0.0) AS fleet_average_age,
        COALESCE(date_founded, 0) AS date_founded,
        COALESCE(hub_code, 'N/A') AS hub_code,
        COALESCE(country_name, 'Unknown') AS country_name,
        COALESCE(country_iso2, 'N/A') AS country_iso2
    FROM silver_airlines
    WHERE airline_name IS NOT NULL
""")

#### Step 4: Deduplication (if needed)
##### Drop duplicates based on unique identifiers

In [195]:
# print(silver_airplanes_df.count())
silver_airlines_df = silver_airlines_df.dropDuplicates(["icao_code"])
# print(silver_airplanes_df.count())


#### Step5: Write into Silver layer as parquet

In [None]:
silver_airlines_df.write.mode("overwrite").parquet(r"K:\file.parquet")
