###flights data

In [0]:

df=spark.read.format("csv").option("inferSchema","true").option("header","true").load(f"/mnt/bronzelayer/sampleDatasets/flights2/")


In [0]:
df.printSchema()
df.display()
df.write.format("delta").mode("overwrite").save(f"/mnt/silverlayer/sampleDatasets/flights2/")

### airport data

In [0]:
%python
from datetime import datetime
from pyspark.sql.functions import col, split

# Get the current date in 'YYYY-MM-DD' format
current_date = datetime.now().strftime("%Y-%m-%d")

try:
    files = dbutils.fs.ls(f'dbfs:/mnt/bronzelayer/sampleDatasets/airport/')
    if any(file.name.endswith('.csv') for file in files):
        df2 = spark.read.format("csv") \
                        .option("inferSchema", "true") \
                        .option("header", "true") \
                        .load(f"/mnt/bronzelayer/sampleDatasets/airport/*.csv")
        # Split Description column to get the City and State
        df2 = df2.withColumn("City", split(col('Description'), ', ').getItem(0)) \
         .withColumn("State", split(split(col('Description'), ',').getItem(1), ':').getItem(0))\
         .withColumn("name", split(col('Description'), ':').getItem(1))
        # df2.printSchema()
        # display(df2)
        df2.write.format("delta").mode("overwrite").save(f"/mnt/silverlayer/sampleDatasets/airport/{current_date}/")
    else:
        display("No CSV files found")
except Exception as e:
    display(f"No data found: {e}")

### json flight data
Create Schema

create schema

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, TimestampType

# Define the schema based on the JSON structure
schema = StructType([
    StructField("pagination", StructType([
        StructField("limit", IntegerType(), True),
        StructField("offset", IntegerType(), True),
        StructField("count", IntegerType(), True),
        StructField("total", IntegerType(), True)
    ]), True),
    
    # Use ArrayType to handle the data field, which contains an array of flight records
    StructField("data", ArrayType(StructType([
        StructField("flight_date", StringType(), True),
        StructField("flight_status", StringType(), True),
        StructField("departure", StructType([
            StructField("airport", StringType(), True),
            StructField("timezone", StringType(), True),
            StructField("iata", StringType(), True),
            StructField("icao", StringType(), True),
            StructField("terminal", StringType(), True),
            StructField("gate", StringType(), True),
            StructField("delay", IntegerType(), True),
            StructField("scheduled", StringType(), True),
            StructField("estimated", StringType(), True),
            StructField("actual", StringType(), True),
            StructField("estimated_runway", StringType(), True),
            StructField("actual_runway", StringType(), True)
        ]), True),
        StructField("arrival", StructType([
            StructField("airport", StringType(), True),
            StructField("timezone", StringType(), True),
            StructField("iata", StringType(), True),
            StructField("icao", StringType(), True),
            StructField("terminal", StringType(), True),
            StructField("gate", StringType(), True),
            StructField("baggage", StringType(), True),
            StructField("delay", IntegerType(), True),
            StructField("scheduled", StringType(), True),
            StructField("estimated", StringType(), True),
            StructField("actual", StringType(), True),
            StructField("estimated_runway", StringType(), True),
            StructField("actual_runway", StringType(), True)
        ]), True),
        StructField("airline", StructType([
            StructField("name", StringType(), True),
            StructField("iata", StringType(), True),
            StructField("icao", StringType(), True)
        ]), True),
        StructField("flight", StructType([
            StructField("number", StringType(), True),
            StructField("iata", StringType(), True),
            StructField("icao", StringType(), True),
            StructField("codeshared", StructType([
                StructField("airline_name", StringType(), True),
                StructField("airline_iata", StringType(), True),
                StructField("airline_icao", StringType(), True),
                StructField("flight_number", StringType(), True),
                StructField("flight_iata", StringType(), True),
                StructField("flight_icao", StringType(), True)
            ]), True)
        ]), True),
        StructField("aircraft", StringType(), True),
        StructField("live", StringType(), True)
    ])), True)
])
tmp=False
try:
    
# Step 2: Read the JSON Data with the Defined Schema
    json_file_path = f"/mnt/bronzelayer/Api-response-raw/{current_date}/"

# Read the JSON data with the schema
    df3 = spark.read.schema(schema).json(json_file_path)

# Show the data
    df3.display()
    tmp=True
except Exception as e:
    display(f"No data found: {e}")

flatten Json data and clean

In [0]:
if tmp:
    from pyspark.sql.functions import explode, col
    from pyspark.sql.functions import split

# Step 1: Explode the 'data' column to separate each flight record into its own row
    df_exploded = df3.withColumn("flight", explode(col("data")))

# Step 2: Flatten the nested fields from the 'flight' column into separate columns

    df_flattened = df_exploded.select(
        col("flight.flight_date"),
        col("flight.flight_status"),
    # Split 'departure.timezone' by '/' and select part of it
        split(col("flight.departure.timezone"), "/").alias("departure_timezone_parts"),
        col("flight.departure.airport").alias("departure_airport"),
        col("flight.departure.iata").alias("departure_iata"),
        col("flight.departure.icao").alias("departure_icao"),
        col("flight.departure.terminal").alias("departure_terminal"),
        col("flight.departure.gate").alias("departure_gate"),
        col("flight.departure.delay").alias("departure_delay"),
        col("flight.departure.scheduled").alias("departure_scheduled"),
        col("flight.departure.estimated").alias("departure_estimated"),
        col("flight.departure.actual").alias("departure_actual"),
        col("flight.departure.estimated_runway").alias("departure_estimated_runway"),
        col("flight.departure.actual_runway").alias("departure_actual_runway"),
        col("flight.arrival.airport").alias("arrival_airport"),
        split(col("flight.arrival.timezone"),'/').alias("arrival_timezone_parts"),
        col("flight.arrival.iata").alias("arrival_iata"),
        col("flight.arrival.icao").alias("arrival_icao"),
        col("flight.arrival.terminal").alias("arrival_terminal"),
        col("flight.arrival.gate").alias("arrival_gate"),
        col("flight.arrival.baggage").alias("arrival_baggage"),
        col("flight.arrival.delay").alias("arrival_delay"),
        col("flight.arrival.scheduled").alias("arrival_scheduled"),
        col("flight.arrival.estimated").alias("arrival_estimated"),
        col("flight.arrival.actual").alias("arrival_actual"),
        col("flight.arrival.estimated_runway").alias("arrival_estimated_runway"),
        col("flight.arrival.actual_runway").alias("arrival_actual_runway"),
        col("flight.airline.name").alias("airline_name"),
        col("flight.airline.iata").alias("airline_iata"),
        col("flight.airline.icao").alias("airline_icao"),
        col("flight.flight.number").alias("flight_number"),
        col("flight.flight.iata").alias("flight_iata"),
        col("flight.flight.icao").alias("flight_icao"),
        col("flight.flight.codeshared.airline_name").alias("codeshared_airline_name"),
        col("flight.flight.codeshared.airline_iata").alias("codeshared_airline_iata"),
        col("flight.flight.codeshared.airline_icao").alias("codeshared_airline_icao"),
        col("flight.flight.codeshared.flight_number").alias("codeshared_flight_number"),
        col("flight.flight.codeshared.flight_iata").alias("codeshared_flight_iata"),
        col("flight.flight.codeshared.flight_icao").alias("codeshared_flight_icao")
    )
    df_transformed=df_flattened.withColumn("flight_date", col("flight_date").cast("date"))\
                           .withColumn("departure_timezone_part1", col("departure_timezone_parts").getItem(0)).withColumn("departure_timezone_part2", col("departure_timezone_parts").getItem(1))\
                                .withColumn("arrival_timezone_part1", col("arrival_timezone_parts").getItem(0)).withColumn("arrival_timezone_part2", col("arrival_timezone_parts").getItem(1))\
                               .withColumn("departure_delay", col("departure_delay").cast("integer"))\
                                .withColumn("arrival_delay", col("arrival_delay").cast("integer"))\
                                    .withColumn("departure_scheduled", split(col("departure_scheduled"), "\\+").getItem(0))\
                                    .withColumn("departure_estimated", split(col("departure_estimated"), "\\+").getItem(0))\
                                    .withColumn("departure_actual", split(col("departure_actual"), "\\+").getItem(0))\
                                    .withColumn("arrival_scheduled", split(col("arrival_scheduled"), "\\+").getItem(0))\
                                    .withColumn("arrival_estimated", split(col("arrival_estimated"), "\\+").getItem(0))\
                                    .withColumn("arrival_actual", split(col("arrival_actual"), "\\+").getItem(0))\
                                    .withColumn("arrival_estimated_runway", split(col("arrival_estimated_runway"), "//+").getItem(0))\
                                    .withColumn("arrival_actual_runway", split(col("arrival_actual_runway"), "//+").getItem(0))\
                                    .withColumn("flight_number", col("flight_number").cast("integer"))
                                    
                                                                       
    df_transformed.display()
    df_transformed.printSchema()
    df_transformed.write.format("delta").mode("overwrite").save(f"/mnt/silverlayer/Api-response-raw/{current_date}/")



###postgresql data

In [0]:
try:
  df3=spark.read.format("csv").option("inferSchema","true").option("header","true").load(f"/mnt/bronzelayer/postgreSQL-raw/{current_date}/")
  df3=df3.withColumn("movieId", col("movieId").cast("string"))\
    .withColumn("num_ratings", col("num_ratings").cast("integer"))\
    .withColumn("avg_rating", col("avg_rating").cast("double"))\
      .withColumn("std_rating", col("std_rating").cast("double"))
  df3.printSchema()
  df3.write.format("delta").mode("overwrite").save(f"/mnt/silverlayer/postgreSQL-raw/{current_date}/")
except Exception as e:
  print(e)

###Cancellation SQL file

In [0]:
# Step 1: Read the SQL file
df4 = spark.read.text("/mnt/bronzelayer/sampleDatasets/cancellation/")  # Path to your SQL file

# Step 2: Concatenate all lines into a single string
queries = ''
for line in df4.select("value").collect():
    queries += line[0].replace('"', '') + "\n"  # Add newline after each line for clarity

# Step 3: Split the queries by semicolon (;) and execute them one by one
queries_list = queries.split(';')

# Step 4: Execute each query separately
for query in queries_list:
    query = query.strip()  # Clean up any extra spaces or empty queries
    if query:  # Avoid executing empty queries
        spark.sql(query)

# Step 5: Verify the table creation
result = spark.sql("SELECT * FROM Cancellation")
display(result)


In [0]:
from datetime import datetime
# Get the current date in 'YYYY-MM-DD' format
current_date = datetime.now().strftime("%Y-%m-%d")
result.write.format("delta").mode("overwrite").save(f"/mnt/silverlayer/sampleDatasets/cancellation/{current_date}/")

###uniqueCarriers sql file

In [0]:
# Step 1: Read the SQL file
df5 = spark.read.text("/mnt/bronzelayer/sampleDatasets/uniqueCarriers/")  # Path to your SQL file

# Step 2: Concatenate all lines into a single string
queries = ''
for line in df5.select("value").collect():
    queries += line[0].replace('"', '') + "\n"  # Add newline after each line for clarity

# Step 3: Split the queries by semicolon (;) and execute them one by one
queries_list = queries.split(';')

# Step 4: Execute each query separately
for query in queries_list:
    query = query.strip()  # Clean up any extra spaces or empty queries
    if query:  # Avoid executing empty queries
        # Check if the table already exists before attempting to create it
        if "CREATE TABLE" in query:
            table_name = query.split(" ")[2]  # Extract table name from the CREATE statement
            if not spark.catalog.tableExists(table_name):
                spark.sql(query)
            else:
                print(f"Table {table_name} already exists, skipping creation.")
        else:
            spark.sql(query)

# Step 5: Verify the table creation
result = spark.sql("SELECT * FROM UNIQUE_CARRIERS")
display(result)


In [0]:
result.write.format("delta").mode("overwrite").save(f"/mnt/silverlayer/sampleDatasets/uniqueCarriers/{current_date}/")

> **<--THE END-->**