Creating flight schema and raw data from csv file

In [3]:
from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create SparkSession
spark = SparkSession.builder.appName("Airline Project").getOrCreate()

# Define the schema for the airline data
Flights_schema = StructType([  
    StructField("Year", IntegerType(), nullable=False),
    StructField("Month", IntegerType(), nullable=False),
    StructField("Day", IntegerType(), nullable=False),
    StructField("Day_of_week", IntegerType(), nullable=False),
    StructField("Airline", StringType(), nullable=False),
    StructField("Flight_number", IntegerType(), nullable=False),
    StructField("Tail_number", StringType(), nullable=False),
    StructField("Origin_airport", StringType(), nullable=False),
    StructField("Destination_airport", StringType(), nullable=False),
    StructField("Scheduled_departure", IntegerType(), nullable=False),
    StructField("Departure_time", IntegerType(), nullable=False),
    StructField("Departure_delay", IntegerType(), nullable=False),
    StructField("Taxi_out", IntegerType(), nullable=False),
    StructField("Wheels_off", IntegerType(), nullable=False),
    StructField("Scheduled_time", IntegerType(), nullable=False),
    StructField("Elapsed_time", IntegerType(), nullable=False),
    StructField("Air_time", IntegerType(), nullable=False),
    StructField("Distance", IntegerType(), nullable=False),
    StructField("Wheel_on", IntegerType(), nullable=False),
    StructField("Scheduled_Arrival", IntegerType(), nullable=False),
    StructField("Arrival_time", IntegerType(), nullable=False),
    StructField("Arrival_delay", IntegerType(), nullable=False),
    StructField("Diverted", IntegerType(), nullable=False),
    StructField("Cancelled", IntegerType(), nullable=False),
    StructField("Cancellation_reason", IntegerType(), nullable=False),
    StructField("Air_system_delay", IntegerType(), nullable=False),
    StructField("Security_delay", IntegerType(), nullable=False),
    StructField("Airline_delay", IntegerType(), nullable=False),
    StructField("Late_aircraft_delay", IntegerType(), nullable=False),
    StructField("Weather_delay", IntegerType(), nullable=False)
])


StatementMeta(, 278f9f56-108f-425b-9684-1d3eedf30653, 5, Finished, Available, Finished)

In [4]:
#loading raw data/rows into schema
df = spark.read.format("csv") \
    .schema(Flights_schema) \
    .option("header", "false") \
    .load("Files/bronze/flights.csv")

df.show()


StatementMeta(, 278f9f56-108f-425b-9684-1d3eedf30653, 6, Finished, Available, Finished)

+----+-----+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+--------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|Year|Month| Day|Day_of_week|Airline|Flight_number|Tail_number|Origin_airport|Destination_airport|Scheduled_departure|Departure_time|Departure_delay|Taxi_out|Wheels_off|Scheduled_time|Elapsed_time|Air_time|Distance|Wheel_on|Scheduled_Arrival|Arrival_time|Arrival_delay|Diverted|Cancelled|Cancellation_reason|Air_system_delay|Security_delay|Airline_delay|Late_aircraft_delay|Weather_delay|
+----+-----+----+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+-------

In [5]:
display(df)

StatementMeta(, 278f9f56-108f-425b-9684-1d3eedf30653, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 47093506-e80c-4093-9547-27728902cd7c)

In [7]:
# creating delta table in Flights_schema
df.write.format("delta").saveAsTable("Flights_schema.flights")


StatementMeta(, 278f9f56-108f-425b-9684-1d3eedf30653, 9, Finished, Available, Finished)

Following is the process of adding new columns and Rows into existing or new table

Adding new columns and row

In [None]:
columns = ["IATA_CODE", "AIRLINE"]
new_row = spark.createDataFrame([('VV', 'MM')], columns)
new_row.show()


In [None]:
new_row.write.format("delta").mode("append").saveAsTable("airline.airlines")

In [None]:
df = spark.sql("SELECT * FROM PowerLakehouse.Airline.airlines LIMIT 1000")
display(df)

Batch processing adding multiple rows in existing or new table

In [None]:
dff = spark.read.format("csv") \
    .schema(Airline) \
    .option("header", "false") \
    .load("Files/airlines.csv")

dff.show()

In [None]:
dff.write.format("delta").mode("append").saveAsTable("airline.airlines")

In [None]:
df = spark.sql("SELECT * FROM PowerLakehouse.Airline.airlines LIMIT 1000")
display(df)