# Finance #

## Load and initialize data ##

In [11]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Data Preprocessing + Finance Pipeline")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

#  Google Storage File Path
delay_path = 'gs://de_as2_data/flights_delaydata.csv' 
time_path = 'gs://de_as2_data/flights_timedata.csv' 
info_path = 'gs://de_as2_data/flights_infodata.csv'
finance_path = 'gs://de_as2_data/flight_financedata.csv'
# Create data frame
df_delay = spark.read.format("csv") \
       .option("header", "true") \
       .option("inferSchema", "true") \
       .load(delay_path)
df_delay.printSchema()

df_time = spark.read.format("csv") \
       .option("header", "true") \
       .option("inferSchema", "true") \
       .load(time_path)
df_time.printSchema()

df_info = spark.read.format("csv") \
       .option("header", "true") \
       .option("inferSchema", "true") \
       .load(info_path)
df_info.printSchema()

df_finance = spark.read.format("csv") \
       .option("header", "true") \
       .option("inferSchema", "true") \
       .load(finance_path)
df_finance.printSchema()


# Inferschema makes all data types adjust dynamically
# APPOINT WORKER TO EACH PIPELINE

root
 |-- id: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- sched_dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- sched_arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- name: string (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- average_ticket_price1: double (null

## Data Preprocessing ##
In the next section, we will perform data cleaning techniques, such as removing rows with missing values, removing duplicates and removing extreme outliers.

In [12]:
# Show original data count
print("Original row count flights_delaydata:", df_delay.count())
print("Original row count flights_timedata:", df_time.count())
print("Original row count flights_infodata:", df_info.count())
print("Original row count flights_financedata:", df_finance.count())


Original row count flights_delaydata: 336776
Original row count flights_timedata: 336776
Original row count flights_infodata: 336776
Original row count flights_financedata: 336776


We drop rows with missing values to ensure consistency.

In [13]:
# Remove rows with any missing values
df_delay = df_delay.dropna()
df_time = df_time.dropna()
df_info = df_info.dropna()
df_finance = df_finance.dropna()

Next, we remove rows containing duplicate information.

In [14]:
# Remove rows containing duplicate information
df_delay = df_delay.dropDuplicates()
df_time = df_time.dropDuplicates()
df_info = df_info.dropDuplicates()
df_finance = df_finance.dropDuplicates()

In [18]:
#df_finance.show(10)
df_finance.filter(df_finance.id == '235657 ').show()
df_finance = df_finance.drop("average_ticket_price1")
df_finance= df_finance.withColumnRenamed("average_ticket_price7", "average_ticket_price")
#df_finance.show(10)
df_finance.filter(df_finance.id == '235657 ').show()

+------+---------------+-----------+---------------+-------------+-----------+--------------------+
|    id|passenger_count|  fuel_cost|average_revenue|flight_profit|time_of_day|average_ticket_price|
+------+---------------+-----------+---------------+-------------+-----------+--------------------+
|235657|            268|2141.746343|     50329.7516|  48188.00526|          9|         263.1536318|
+------+---------------+-----------+---------------+-------------+-----------+--------------------+

+------+---------------+-----------+---------------+-------------+-----------+--------------------+
|    id|passenger_count|  fuel_cost|average_revenue|flight_profit|time_of_day|average_ticket_price|
+------+---------------+-----------+---------------+-------------+-----------+--------------------+
|235657|            268|2141.746343|     50329.7516|  48188.00526|          9|         263.1536318|
+------+---------------+-----------+---------------+-------------+-----------+--------------------+

Lastly, we remove extreme outliers using the Interquartile Range (IQR) statistical method, as it is robust at detecting outliers. This is only neccessary for the attribute 'arr_delay' in flights_delaydata.csv

In [19]:
from pyspark.sql.functions import col

#Remove extreme outliers in average_ticket_price in financedata
column_price = "average_ticket_price"

q1, q3 = df_finance.approxQuantile(column_price, [0.25, 0.75], 0.01)
iqr = q3 - q1

# Calculate lower- and upper bound respectively
lower_bound = q1 - 1.5 * iqr
upper_bound = q3 + 1.5 * iqr

df_finance = df_finance.filter((col(column_price) >= lower_bound) & (col(column_price) <= upper_bound))

In [9]:
# Show cleaned data count
print("Cleaned row count flights_delaydata:", df_delay.count())
print("Cleaned row count flights_timedata:", df_time.count())
print("Cleaned row count flights_infodata:", df_info.count())
print("Cleaned row count flights_financedata:", df_finance.count())

Cleaned row count flights_delaydata: 327346
Cleaned row count flights_timedata: 327346
Cleaned row count flights_infodata: 334264
Cleaned row count flights_financedata: 334526


In [10]:
df_finance.select('average_ticket_price').describe().show()


+-------+--------------------+
|summary|average_ticket_price|
+-------+--------------------+
|  count|              334526|
|   mean|  301.17452448213646|
| stddev|  130.63841893106007|
|    min|         80.40255821|
|    max|         682.3777921|
+-------+--------------------+



## New variable creation: time_of_day ##

Convert time_hour to timestamp format, so that it can be used during logic operations.

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, hour, when


# Convert 'time_hour' to datetime type
df_time_new = df_time.withColumn("time_hour_datetime", to_timestamp(col("time_hour"), "dd/MM/yyyy HH:mm"))

# Add new categorical column
df_time_new = df_time_new.withColumn(
    "time_of_day_dep",
    when((hour(col("time_hour_datetime")) >= 0) & (hour(col("time_hour_datetime")) <= 6), "Morning")
    .when((hour(col("time_hour_datetime")) >= 7) & (hour(col("time_hour_datetime")) <= 12), "Day")
    .when((hour(col("time_hour_datetime")) >= 13) & (hour(col("time_hour_datetime")) <= 18), "Afternoon")
    .otherwise("Night")  
)

# Show the updated DataFrame
df_time_new.select("time_hour_datetime", "time_of_day_dep").show()

df_time_new.printSchema()

+-------------------+---------------+
| time_hour_datetime|time_of_day_dep|
+-------------------+---------------+
|2013-01-01 15:00:00|      Afternoon|
|2013-01-02 06:00:00|        Morning|
|2013-01-02 07:00:00|            Day|
|2013-01-02 09:00:00|            Day|
|2013-01-02 12:00:00|            Day|
|2013-01-02 14:00:00|      Afternoon|
|2013-01-02 16:00:00|      Afternoon|
|2013-01-02 18:00:00|      Afternoon|
|2013-01-02 19:00:00|          Night|
|2013-01-03 08:00:00|            Day|
|2013-01-04 08:00:00|            Day|
|2013-01-04 10:00:00|            Day|
|2013-01-04 08:00:00|            Day|
|2013-01-05 05:00:00|        Morning|
|2013-01-05 06:00:00|        Morning|
|2013-01-05 09:00:00|            Day|
|2013-01-05 12:00:00|            Day|
|2013-01-05 15:00:00|      Afternoon|
|2013-01-05 18:00:00|      Afternoon|
|2013-01-06 07:00:00|            Day|
+-------------------+---------------+
only showing top 20 rows

root
 |-- id: integer (nullable = true)
 |-- year: integer (nu

Drop time_hour as it is redundant

In [22]:
df_time_new = df_time_new.drop("time_hour")
df_time_new.show(10)

+----+----+-----+---+--------+----+------+-------------------+---------------+
|  id|year|month|day|air_time|hour|minute| time_hour_datetime|time_of_day_dep|
+----+----+-----+---+--------+----+------+-------------------+---------------+
| 473|2013|    1|  1|     163|  15|    30|2013-01-01 15:00:00|      Afternoon|
| 874|2013|    1|  2|     167|   6|    10|2013-01-02 06:00:00|        Morning|
|1022|2013|    1|  2|     198|   7|    51|2013-01-02 07:00:00|            Day|
|1072|2013|    1|  2|     160|   9|     4|2013-01-02 09:00:00|            Day|
|1294|2013|    1|  2|     159|  12|    40|2013-01-02 12:00:00|            Day|
|1318|2013|    1|  2|     154|  14|     0|2013-01-02 14:00:00|      Afternoon|
|1453|2013|    1|  2|     166|  16|     5|2013-01-02 16:00:00|      Afternoon|
|1630|2013|    1|  2|     238|  18|    15|2013-01-02 18:00:00|      Afternoon|
|1711|2013|    1|  2|      53|  19|    42|2013-01-02 19:00:00|          Night|
|1983|2013|    1|  3|      97|   8|    55|2013-01-03

## Join Dataframes ##

In [23]:
df1 = df_finance
df2 = df_info
df3 = df_time_new

joined_df_finance = df1.join(df2, on="id", how="inner") \
                    .join(df3, on="id", how="inner") \

joined_df_finance = joined_df_finance.drop("time_of_day")
# Save the final DataFrame to GCS as a CSV
joined_df_finance.show(10)

+---+---------------+-----------+---------------+-------------+--------------------+-------+------+-------+------+----+--------+--------------------+----+-----+---+--------+----+------+----------------+
| id|passenger_count|  fuel_cost|average_revenue|flight_profit|average_ticket_price|carrier|flight|tailnum|origin|dest|distance|                name|year|month|day|air_time|hour|minute|       time_hour|
+---+---------------+-----------+---------------+-------------+--------------------+-------+------+-------+------+----+--------+--------------------+----+-----+---+--------+----+------+----------------+
|  1|            205|1032.268511|    98458.57313|  97426.30461|         622.6722533|     UA|  1714| N24211|   LGA| IAH|    1416|United Air Lines ...|2013|    1|  1|     227|   5|    29|01/01/2013 05:00|
|  3|            220|4325.253537|    74681.94661|  70356.69307|         293.0288722|     B6|   725| N804JB|   JFK| BQN|    1576|     JetBlue Airways|2013|    1|  1|     183|   5|    45|01/

# Upload new Dataframe to bucket #

In [25]:
joined_df_finance.write.mode("overwrite").csv("gs://de_as2_data/joined_flights_financedata_final", header=True)

## Upload to BigQuery ##

Ensure all neccessary dependencies are installed

In [97]:
import sys
!{sys.executable} -m pip install google-cloud-bigquery



In [29]:
from google.cloud import bigquery

# Initialize BigQuery client
client = bigquery.Client(project="chromatic-pride-435508-i5")   

# Define dataset and table
dataset_id = "chromatic-pride-435508-i5.flights"
table_id = f"{dataset_id}.Finance_Pipeline"

# Configure the load job
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1  # Ignore the header row
job_config.autodetect = True
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE  # Overwrite the table

# Path to the data in Google Cloud Storage
gcs_path = "gs://de_as2_data/joined_flights_financedata_final/*.csv"  # Combines all matching CSV files

# Start the load job
load_job = client.load_table_from_uri(gcs_path, table_id, job_config=job_config)

# Wait for the job to complete
load_job.result()

print("Data successfully uploaded to BigQuery, and the table was overwritten.")


Data successfully uploaded to BigQuery, and the table was overwritten.


### Stop spark session ###

In [134]:
spark.stop()