In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/local/openjdk-8"
os.environ["SPARK_HOME"] = "/user_data/spark-3.3.0-bin-hadoop2"

import findspark
findspark.init('spark-3.3.0-bin-hadoop2')

In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as F
import matplotlib.pyplot as plt

spark = (
    SparkSession.builder.appName("ac2")
    .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
    .config("spark.sql.catalogImplementation", "hive")
    .getOrCreate()
)

24/04/27 11:54:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [8]:
dataframe = spark.read.csv("hdfs://spark-master:9000/datasets/flight/Combined_Flights_2022.csv", header=True, inferSchema=True)
use_cols = [
    'Month',
    'FlightDate',
    'Airline',
    'Origin',
     'Dest',
     'Cancelled',
     'Diverted',
     'CRSDepTime',
     'DepTime',
     'DepDelayMinutes', 
     'OriginAirportID',
     'OriginStateName',
     'DestAirportID',
     'DestCityName',
     'DestStateName',
    'Marketing_Airline_Network',
    'ArrDelayMinutes'
]
dataframe = dataframe.select(*use_cols)
lines_count = dataframe.count()
print(f"Número de linhas no DataFrame: {lines_count}")

[Stage 7:>                                                        (0 + 16) / 16]

Número de linhas no DataFrame: 4078318


                                                                                

In [9]:
dataframe.printSchema()

root
 |-- Month: integer (nullable = true)
 |-- FlightDate: timestamp (nullable = true)
 |-- Airline: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Cancelled: boolean (nullable = true)
 |-- Diverted: boolean (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- DepTime: double (nullable = true)
 |-- DepDelayMinutes: double (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- OriginStateName: string (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DestCityName: string (nullable = true)
 |-- DestStateName: string (nullable = true)
 |-- Marketing_Airline_Network: string (nullable = true)
 |-- ArrDelayMinutes: double (nullable = true)



In [10]:
# dataframe.describe().show()

In [11]:
dataframe.show(5)

+-----+-------------------+--------------------+------+----+---------+--------+----------+-------+---------------+---------------+---------------+-------------+-------------------+-------------+-------------------------+---------------+
|Month|         FlightDate|             Airline|Origin|Dest|Cancelled|Diverted|CRSDepTime|DepTime|DepDelayMinutes|OriginAirportID|OriginStateName|DestAirportID|       DestCityName|DestStateName|Marketing_Airline_Network|ArrDelayMinutes|
+-----+-------------------+--------------------+------+----+---------+--------+----------+-------+---------------+---------------+---------------+-------------+-------------------+-------------+-------------------------+---------------+
|    4|2022-04-04 00:00:00|Commutair Aka Cha...|   GJT| DEN|    false|   false|      1133| 1123.0|            0.0|          11921|       Colorado|        11292|         Denver, CO|     Colorado|                       UA|            0.0|
|    4|2022-04-04 00:00:00|Commutair Aka Cha...|   H

Como o número de células vazias no nosso dataset é de cerca de 3%, o que é uma quantidade baixa, iremos apenas eliminar esses dados, ao invés de tratarmos.

In [12]:
total_count = dataframe.count()
not_null_count = dataframe.dropna().count() 
percentage_not_null = not_null_count * 100 / total_count
print(f"Porcentagem não nula: {percentage_not_null}")
dataframe = dataframe.dropna()

[Stage 14:>                                                       (0 + 16) / 16]

Porcentagem não nula: 96.72899464926472


                                                                                

- Analisando os voos cancelados, foi observado que 3% da base de dados foram cancelados.
- O mês de janeiro (1) apresentou a maior quantidade de voos cancelados, em seguida de fevereiro (2) e junho (6)
- A companhia áerea AA, seguida da UA e DL apresentaram maior quantidades de voos cancelados.

In [None]:
# Proporção de voos cancelados
cancelled_ratio = (
    dataframe.select(
        F.count(
            F.when(
                F.col("Cancelled") == 1, 1
            )
        )
    ).collect()[0][0] / dataframe.count()
) * 100

print(f"Proporção de Voos Cancelados: {cancelled_ratio:.2f}%")

# Distribuição de voos cancelados por mês
df_cancelado_por_mes = dataframe.groupBy("Month").agg(
    F.count(
        F.when(
            F.col("Cancelled") == 1, 1
        )
    ).alias("Voos cancelados")
).orderBy("Month")
df_cancelado_por_mes.show()

# Distribuição de voos cancelados por companhia aérea
cancelled_by_network = dataframe.groupBy("Marketing_Airline_Network").agg(
    F.count(
        F.when(
            F.col("Cancelled") == 1, 1
        )
    ).alias("Voos cancelados")
).orderBy("Voos cancelados")
cancelled_by_network.show()

                                                                                

- Analisando o tempo de atraso, foi observado uma média de 15 minutos de atraso.
- O mês de julho (7) e junho (6) apresentaram um tempo médio de atraso maior
- As companias B6, G4 e F9 apresentaram maior tempo de atraso médio

In [None]:
# Tempo médio de atraso (minutos)
average_delay = dataframe.select(F.avg("ArrDelayMinutes")).collect()[0][0]
print(f"Tempo Médio de Atraso: {average_delay:.2f} minutos")

# Distribuição de atrasos por mês
delay_by_month = dataframe.groupBy("Month").agg(F.avg("ArrDelayMinutes").alias("atraso_medio")).orderBy("Month")
delay_by_month.show()

# Distribuição de atrasos por companhia aérea
delay_by_company = dataframe.groupBy("Marketing_Airline_Network").agg(F.avg("ArrDelayMinutes").alias("atraso_medio")).orderBy("atraso_medio", ascending=False)
delay_by_company.show()

In [None]:
# Apresenta a contagem de atrasos a cada range de 10 minutos
dataframe.groupBy("ArrDelayMinutes").agg(F.count("*").alias("count")).orderBy("ArrDelayMinutes").withColumn("bin", F.floor(F.col("ArrDelayMinutes") / 10)).groupBy("bin").agg(F.sum("count").alias("count")).sort("bin").withColumn("delay_range", F.concat(F.col("bin") * 10, F.lit(" - "), F.col("bin") * 10 + 9)).show(truncate=False)

In [None]:
import matplotlib.pyplot as plt

arr_delay_values = dataframe.select("ArrDelayMinutes").rdd.flatMap(lambda x: x).collect()
plt.hist(arr_delay_values, bins=50, color="skyblue", edgecolor="black")
plt.xlabel("Atraso na chegada")
plt.ylabel("Frequência")
plt.title("Distribuição de atraso na chegada")
plt.show()

In [None]:
allowed_types = ['int', 'double']

# Filtrando colunas com tipos permitidos
selected_columns = [col_name for col_name, col_type in dataframe.dtypes if any(data_type in col_type for data_type in allowed_types)]

# Selecionando apenas as colunas permitidas
df_filtered = dataframe.select(*selected_columns)

# Mostrando o esquema do DataFrame resultante
df_filtered.printSchema()

# Pré processamento

In [None]:
target = "OriginAirportID"
feature_columns = df_filtered.columns
feature_columns.remove(target)
vector_assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_vector = vector_assembler.transform(df_filtered)

In [None]:
# Dividindo o conjunto de dados entre treino e teste
train_data, test_data = df_vector.randomSplit(
    [0.8, 0.2], # 80% treino e 20% teste
    seed=42 # Semente de aleatoriedade
)

# Model

In [None]:
rf_classifier = RandomForestClassifier(labelCol=target, featuresCol="features", numTrees=10)
pipeline = Pipeline(stages=[rf_classifier])
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="OriginAirportID", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)
