In [3]:
pip install amadeus pandas boto3 python-dotenv


Collecting python-dotenv
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Downloading python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1


In [4]:
import boto3
import pandas as pd
from amadeus import Client, ResponseError
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, to_timestamp, unix_timestamp
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from dotenv import load_dotenv
import os
from google.colab import drive

drive.mount('/content/drive')

Mounted at /content/drive


In [6]:

load_dotenv('//content/drive/MyDrive/Big Data Engineering/2024/Project 2/Final/secrets.env')


AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')
AMADEUS_CLIENT_ID = os.getenv('AMADEUS_CLIENT_ID')
AMADEUS_CLIENT_SECRET = os.getenv('AMADEUS_CLIENT_SECRET')

amadeus = Client(
    client_id=AMADEUS_CLIENT_ID,
    client_secret=AMADEUS_CLIENT_SECRET
)

# Initialize Spark session
spark = SparkSession.builder.appName("RealTimeFlightDataProcessing").getOrCreate()

# Currency conversion function
def convert_currency(amount, from_currency, to_currency="ZAR"):
    url = f"https://api.exchangerate-api.com/v4/latest/{from_currency}"
    response = requests.get(url)
    data = response.json()
    conversion_rate = data['rates'].get(to_currency)
    if conversion_rate:
        return amount * conversion_rate
    return amount


def get_flights(origin, destination, date):
    try:
        response = amadeus.shopping.flight_offers_search.get(
            originLocationCode=origin,
            destinationLocationCode=destination,
            departureDate=date,
            adults=1
        )
        flights = response.data
        return flights
    except ResponseError as error:
        print("Error:", error)
        return None


flights_data = get_flights("CPT", "LAX", "2024-11-15")  # Example date

# I
if flights_data:
    #
    flight_records = []
    for flight in flights_data:
        for itinerary in flight['itineraries']:
            for segment in itinerary['segments']:
                flight_info = {
                    'origin': segment['departure']['iataCode'],
                    'destination': segment['arrival']['iataCode'],
                    'departure_time': segment['departure']['at'],
                    'arrival_time': segment['arrival']['at'],
                    'carrier_code': segment['carrierCode'],
                    'flight_number': segment['number'],
                    'price': float(flight['price']['grandTotal']),
                    'num_stops': segment.get('stopQuantity', 0)  # number of stops
                }
                # Convert price from USD (or other currencies) to ZAR
                flight_info['price_in_zar'] = convert_currency(flight_info['price'], "USD")
                flight_records.append(flight_info)


    pd_df = pd.DataFrame(flight_records)


    spark_df = spark.createDataFrame(pd_df)

    # Handle Missing or Null Values
    spark_df = spark_df.fillna({'price_in_zar': 0, 'departure_time': 'unknown'})

    # Remove Duplicates
    spark_df = spark_df.dropDuplicates()

    # Standardize Date Formats
    spark_df = spark_df.withColumn("departure_time", to_timestamp("departure_time", "yyyy-MM-dd'T'HH:mm:ss"))
    spark_df = spark_df.withColumn("arrival_time", to_timestamp("arrival_time", "yyyy-MM-dd'T'HH:mm:ss"))

    # Feature Engineering: Add New Features
    # Add a price category column
    spark_df = spark_df.withColumn("price_category",
                                   when(col("price_in_zar") < 10000, "Cheap")
                                   .when(col("price_in_zar").between(10000, 20000), "Moderate")
                                   .otherwise("Expensive"))

    # Create a duration column (in hours) between departure and arrival times
    spark_df = spark_df.withColumn("duration",
                                   (unix_timestamp("arrival_time") - unix_timestamp("departure_time")) / 3600)

    # Feature Engineering: Add a Column for Non-Stop Flights
    spark_df = spark_df.withColumn("is_non_stop", when(col("num_stops") == 0, 1).otherwise(0))

    indexer = StringIndexer(inputCol="carrier_code", outputCol="carrier_index")
    spark_df = indexer.fit(spark_df).transform(spark_df)

    # Normalize the Price (using Min-Max Scaler)
    assembler = VectorAssembler(inputCols=["price_in_zar"], outputCol="price_features")
    df_assembled = assembler.transform(spark_df)

    scaler = MinMaxScaler(inputCol="price_features", outputCol="scaled_price")
    scaler_model = scaler.fit(df_assembled)
    df_scaled = scaler_model.transform(df_assembled)

    # Convert Spark DataFrame to Pandas DataFrame
    final_df = df_scaled.toPandas()

    # Save the DataFrame to an Excel file
    file_name = "flights_data.xlsx"
    final_df.to_excel(file_name, index=False)

    print(f"Data saved to Excel: {file_name}")

    # Upload the file to S3
    def upload_to_s3(file_name, bucket_name):
        s3_client = boto3.client(
            's3',
            aws_access_key_id=AWS_ACCESS_KEY,
            aws_secret_access_key=AWS_SECRET_KEY
        )
        try:
            s3_client.upload_file(file_name, bucket_name, file_name)
            print(f"File '{file_name}' uploaded successfully to S3 bucket '{bucket_name}'.")
        except Exception as e:
            print(f"Failed to upload file to S3: {e}")

    upload_to_s3(file_name, 'weatherflightroad')
else:
    print("No flight data found.")


Data saved to Excel: flights_data.xlsx
File 'flights_data.xlsx' uploaded successfully to S3 bucket 'weatherflightroad'.


In [7]:
df_scaled.show()

+------+-----------+-------------------+-------------------+------------+-------------+-------+---------+------------------+--------------+------------------+-----------+-------------+--------------------+--------------------+
|origin|destination|     departure_time|       arrival_time|carrier_code|flight_number|  price|num_stops|      price_in_zar|price_category|          duration|is_non_stop|carrier_index|      price_features|        scaled_price|
+------+-----------+-------------------+-------------------+------------+-------------+-------+---------+------------------+--------------+------------------+-----------+-------------+--------------------+--------------------+
|   CPT|        ADD|2024-11-15 14:35:00|2024-11-15 22:00:00|          ET|          846|1226.68|        0|         21528.234|     Expensive| 7.416666666666667|          1|          1.0|         [21528.234]|[0.08281679793970...|
|   ADD|        IAD|2024-11-15 22:50:00|2024-11-16 08:00:00|          ET|          500|1085.