In [12]:
import yaml
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, max, min, count, broadcast, desc, asc, when, rank, round
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, IntegerType, FloatType

In [13]:
def control_null (dataframe): 
    for dfcol in dataframe.columns:
        df_null = dataframe.filter(col(dfcol).isNull()).count()
        if df_null > 0:
            print(f"{dfcol} : {df_null} null")
        else:
            print(f"{dfcol} no tiene null")

In [14]:
spark = (
    SparkSession.builder
    .appName("Spark")
    .config("spark.driver.extraJavaOptions", r'-Dlog4j.configurationFile=file:/home/illidan/proyecto_desde0/ETL/log4j.properties')\
    .getOrCreate()
)

spark.sparkContext.setLogLevel("INFO")

logger = spark._jvm.org.apache.log4j.LogManager.getLogger(__name__)

logger.info("Log de ejemplo guardado en archivo y consola.")

errores_detectados = []


15:20:49.667 [Thread-3] INFO  __main__ - Log de ejemplo guardado en archivo y consola.


In [15]:
try:
    #reading the config file
    #geting file path into a dictionary
    with open("/home/illidan/proyecto_desde0/Config_file/Config.Yaml", "r") as file:
        config = yaml.safe_load(file)
except Exception as error:
    logger.error("Error ruta .yaml:" + str(error))
    
try:
    read_parquet_airline = config["Parquet_file"]["df_airline"]
    read_parquet_flights = config["Parquet_file"]["df_flights"]
    read_parquet_airports = config["Parquet_file"]["df_airports"]
except Exception as error:
    logger.error("Error ruta .yaml:" + str(error))


In [16]:
try:
    df_flights = spark.read.parquet(read_parquet_flights)
    df_airline = spark.read.parquet(read_parquet_airline)
    df_airports = spark.read.parquet(read_parquet_airports)
except Exception as error:
    logger.error("Error read.parquet:" + str(error))

15:20:49.730 [Thread-3] INFO  org.apache.spark.sql.execution.datasources.InMemoryFileIndex - It took 4 ms to list leaf files for 1 paths.
15:20:49.773 [Thread-3] INFO  org.apache.spark.SparkContext - Starting job: parquet at NativeMethodAccessorImpl.java:0
15:20:49.774 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - Got job 25 (parquet at NativeMethodAccessorImpl.java:0) with 1 output partitions
15:20:49.775 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 38 (parquet at NativeMethodAccessorImpl.java:0)
15:20:49.775 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
15:20:49.775 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
15:20:49.775 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 38 (MapPartitionsRDD[86] at parquet at NativeMethodAccessorImpl.java:0)

In [17]:
try:
    #avg per fly
    avg_flight = df_flights.join(broadcast(df_airline), df_flights.AIRLINE == df_airline.IATA_CODE) \
                        .groupBy(df_flights.AIRLINE, df_airline.AIRLINE) \
                        .agg(round(avg(df_flights.DISTANCE), 2).alias("avg_DISTANCE")) \
                        .orderBy(desc("avg_DISTANCE")) \
                        .select(df_airline.AIRLINE.alias("AIRLINE_Name"), "avg_DISTANCE")
except Exception as error:
    logger.error("Error variable avg_flight:" + str(error))


In [18]:

try:
    #how many times a flight visit an airport
    airline_in_airport = df_flights.join(broadcast(df_airports), df_flights.ORIGIN_AIRPORT == df_airports.IATA_CODE) \
                                .join(broadcast(df_airline), df_flights.AIRLINE == df_airline.IATA_CODE) \
                                .repartition(df_flights.DESTINATION_AIRPORT) \
                                .groupBy(df_flights.DESTINATION_AIRPORT, df_airline.AIRLINE) \
                                    .agg(count(df_flights.AIRLINE).alias("Count_visit_per_airline")) \
                                .orderBy(asc(df_flights.DESTINATION_AIRPORT)) \
                                .select(df_flights.DESTINATION_AIRPORT, df_airline.AIRLINE,"Count_visit_per_airline")
except Exception as error:
    logger.error("Error variable airline_in_airport:" + str(error))

#a Rank of wich airline is the most visited in each airport
try:
    window_spec = Window.partitionBy("DESTINATION_AIRPORT").orderBy(desc("Count_visit_per_airline"))
    
    Rank_airline_in_airport = airline_in_airport.withColumn("ranking", rank().over(window_spec))


except Exception as error:
    logger.error("Error Rank_airline_in_airport:" + str(error))


15:20:50.333 [dispatcher-BlockManagerMaster] INFO  org.apache.spark.storage.BlockManagerInfo - Removed broadcast_40_piece0 on 172.23.57.81:37441 in memory (size: 36.9 KiB, free: 433.9 MiB)
15:20:50.338 [dispatcher-BlockManagerMaster] INFO  org.apache.spark.storage.BlockManagerInfo - Removed broadcast_42_piece0 on 172.23.57.81:37441 in memory (size: 36.9 KiB, free: 433.9 MiB)
15:20:50.343 [dispatcher-BlockManagerMaster] INFO  org.apache.spark.storage.BlockManagerInfo - Removed broadcast_41_piece0 on 172.23.57.81:37441 in memory (size: 36.9 KiB, free: 434.0 MiB)
15:20:50.350 [dispatcher-BlockManagerMaster] INFO  org.apache.spark.storage.BlockManagerInfo - Removed broadcast_39_piece0 on 172.23.57.81:37441 in memory (size: 92.4 KiB, free: 434.1 MiB)


In [19]:
try:
    #mention how many times an airline visit a destination and canceled this arrival
    flights_per_cancell = df_flights.select(col("AIRLINE"), col("CANCELLED"), col("DESTINATION_AIRPORT")) \
                                .filter(col("CANCELLED") == "1") \
                                .repartition(col("AIRLINE"), col("DESTINATION_AIRPORT")) \
                                .groupBy(col("AIRLINE"), col("DESTINATION_AIRPORT")) \
                                    .agg(count(col("AIRLINE")).alias("count_airlines_cancel")) \
                                .orderBy(desc("DESTINATION_AIRPORT")) \
                                .limit(100)
except Exception as error:
    logger.error("Error variable flights_per_cancell:" + str(error))


In [20]:
try:
    #i create this variable because i need to know how many airports have flights but are not in the list of airports
    airport_notin_thelist = df_flights.join(broadcast(df_airports), df_flights.DESTINATION_AIRPORT == df_airports.IATA_CODE, "left_anti") \
                                        .select("DESTINATION_AIRPORT") \
                                        .repartition("DESTINATION_AIRPORT") \
                                        .groupBy("DESTINATION_AIRPORT").agg(count("*").alias("Count_airports"))
except Exception as error:
    logger.error("Error variable airport_notin_thelist:" + str(error))


In [21]:
#Create parquet files

try:
    avg_flight.write.parquet(config["Parquet_file"]["avg_flight"], mode="overwrite")

    Rank_airline_in_airport.write.parquet(config["Parquet_file"]["airline_in_airport"], mode="overwrite")

    flights_per_cancell.write.parquet(config["Parquet_file"]["flights_per_cancell"], mode="overwrite")

    airport_notin_thelist.write.parquet(config["Parquet_file"]["airport_notin_thelist"], mode="overwrite")
except Exception as error:
    logger.error("Error write parquet files:" + str(error))


15:20:50.639 [Thread-3] INFO  org.apache.spark.sql.execution.datasources.FileSourceStrategy - Pushed Filters: IsNotNull(AIRLINE)
15:20:50.639 [Thread-3] INFO  org.apache.spark.sql.execution.datasources.FileSourceStrategy - Post-Scan Filters: isnotnull(AIRLINE#807)
15:20:50.640 [Thread-3] INFO  org.apache.spark.sql.execution.datasources.FileSourceStrategy - Pushed Filters: IsNotNull(IATA_CODE)
15:20:50.640 [Thread-3] INFO  org.apache.spark.sql.execution.datasources.FileSourceStrategy - Post-Scan Filters: isnotnull(IATA_CODE#865)
15:20:50.673 [broadcast-exchange-4] INFO  org.apache.spark.storage.memory.MemoryStore - Block broadcast_43 stored as values in memory (estimated size 200.3 KiB, free 426.3 MiB)
15:20:50.703 [broadcast-exchange-4] INFO  org.apache.spark.storage.memory.MemoryStore - Block broadcast_43_piece0 stored as bytes in memory (estimated size 34.6 KiB, free 426.3 MiB)
15:20:50.704 [dispatcher-BlockManagerMaster] INFO  org.apache.spark.storage.BlockManagerInfo - Added broadc

[Stage 42:>                                                         (0 + 8) / 8]

15:20:51.879 [Executor task launch worker for task 6.0 in stage 42.0 (TID 88)] INFO  org.apache.spark.executor.Executor - Finished task 6.0 in stage 42.0 (TID 88). 3926 bytes result sent to driver
15:20:51.884 [Executor task launch worker for task 7.0 in stage 42.0 (TID 89)] INFO  org.apache.spark.executor.Executor - Finished task 7.0 in stage 42.0 (TID 89). 3926 bytes result sent to driver
15:20:51.886 [task-result-getter-0] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 6.0 in stage 42.0 (TID 88) in 1047 ms on 172.23.57.81 (executor driver) (1/8)
15:20:51.886 [task-result-getter-0] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 7.0 in stage 42.0 (TID 89) in 1046 ms on 172.23.57.81 (executor driver) (2/8)
15:20:51.902 [Executor task launch worker for task 3.0 in stage 42.0 (TID 85)] INFO  org.apache.spark.executor.Executor - Finished task 3.0 in stage 42.0 (TID 85). 3926 bytes result sent to driver
15:20:51.903 [Executor task launch worker for task 1.

                                                                                

15:20:52.085 [Thread-3] INFO  org.apache.spark.sql.execution.adaptive.ShufflePartitionsUtil - For shuffle(10), advisory target size: 67108864, actual target size 1048576, minimum partition size: 1048576
15:20:52.099 [Thread-3] INFO  org.apache.spark.sql.execution.datasources.parquet.ParquetUtils - Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
15:20:52.101 [Thread-3] INFO  org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - File Output Committer Algorithm version is 1
15:20:52.101 [Thread-3] INFO  org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
15:20:52.102 [Thread-3] INFO  org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol - Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
15:20:52.102 [Thread-3] INFO  org.apache.hadoop.mapreduce.lib.output.

[Stage 52:>                                                         (0 + 8) / 8]

15:20:53.612 [Executor task launch worker for task 6.0 in stage 52.0 (TID 101)] INFO  org.apache.spark.executor.Executor - Finished task 6.0 in stage 52.0 (TID 101). 2494 bytes result sent to driver
15:20:53.620 [task-result-getter-3] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 6.0 in stage 52.0 (TID 101) in 1022 ms on 172.23.57.81 (executor driver) (1/8)




15:20:53.967 [Executor task launch worker for task 7.0 in stage 52.0 (TID 102)] INFO  org.apache.spark.executor.Executor - Finished task 7.0 in stage 52.0 (TID 102). 2451 bytes result sent to driver
15:20:53.969 [task-result-getter-2] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 7.0 in stage 52.0 (TID 102) in 1370 ms on 172.23.57.81 (executor driver) (2/8)
15:20:54.087 [Executor task launch worker for task 4.0 in stage 52.0 (TID 99)] INFO  org.apache.spark.executor.Executor - Finished task 4.0 in stage 52.0 (TID 99). 2451 bytes result sent to driver
15:20:54.089 [task-result-getter-1] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 52.0 (TID 99) in 1492 ms on 172.23.57.81 (executor driver) (3/8)
15:20:54.142 [Executor task launch worker for task 1.0 in stage 52.0 (TID 96)] INFO  org.apache.spark.executor.Executor - Finished task 1.0 in stage 52.0 (TID 96). 2494 bytes result sent to driver
15:20:54.143 [task-result-getter-0] INFO  org.apac



15:20:54.182 [Executor task launch worker for task 2.0 in stage 52.0 (TID 97)] INFO  org.apache.spark.executor.Executor - Finished task 2.0 in stage 52.0 (TID 97). 2451 bytes result sent to driver
15:20:54.183 [task-result-getter-1] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 52.0 (TID 97) in 1586 ms on 172.23.57.81 (executor driver) (7/8)
15:20:54.197 [Executor task launch worker for task 0.0 in stage 52.0 (TID 95)] INFO  org.apache.spark.executor.Executor - Finished task 0.0 in stage 52.0 (TID 95). 2494 bytes result sent to driver
15:20:54.198 [task-result-getter-0] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 0.0 in stage 52.0 (TID 95) in 1602 ms on 172.23.57.81 (executor driver) (8/8)
15:20:54.198 [task-result-getter-0] INFO  org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 52.0, whose tasks have all completed, from pool 
15:20:54.199 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - ShuffleM

                                                                                

15:20:54.938 [dispatcher-BlockManagerMaster] INFO  org.apache.spark.storage.BlockManagerInfo - Removed broadcast_59_piece0 on 172.23.57.81:37441 in memory (size: 26.0 KiB, free: 434.2 MiB)
15:20:54.981 [Executor task launch worker for task 7.0 in stage 56.0 (TID 120)] INFO  org.apache.spark.executor.Executor - Finished task 7.0 in stage 56.0 (TID 120). 7505 bytes result sent to driver
15:20:54.982 [dispatcher-event-loop-7] INFO  org.apache.spark.scheduler.TaskSetManager - Starting task 8.0 in stage 56.0 (TID 121) (172.23.57.81, executor driver, partition 8, NODE_LOCAL, 8988 bytes) 
15:20:54.982 [Executor task launch worker for task 8.0 in stage 56.0 (TID 121)] INFO  org.apache.spark.executor.Executor - Running task 8.0 in stage 56.0 (TID 121)
15:20:54.985 [task-result-getter-3] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 7.0 in stage 56.0 (TID 120) in 151 ms on 172.23.57.81 (executor driver) (1/10)
15:20:54.996 [Executor task launch worker for task 8.0 in stage 56.0



15:20:58.390 [Executor task launch worker for task 9.0 in stage 56.0 (TID 122)] INFO  org.apache.spark.executor.Executor - Finished task 9.0 in stage 56.0 (TID 122). 7505 bytes result sent to driver
15:20:58.391 [task-result-getter-3] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 9.0 in stage 56.0 (TID 122) in 3193 ms on 172.23.57.81 (executor driver) (10/10)
15:20:58.391 [task-result-getter-3] INFO  org.apache.spark.scheduler.TaskSchedulerImpl - Removed TaskSet 56.0, whose tasks have all completed, from pool 
15:20:58.391 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - ShuffleMapStage 56 (parquet at NativeMethodAccessorImpl.java:0) finished in 3.565 s
15:20:58.391 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - looking for newly runnable stages
15:20:58.391 [dag-scheduler-event-loop] INFO  org.apache.spark.scheduler.DAGScheduler - running: Set()
15:20:58.391 [dag-scheduler-event-loop] INFO  org.apache.spark.schedu

                                                                                

15:20:59.266 [Executor task launch worker for task 4.0 in stage 60.0 (TID 128)] INFO  org.apache.spark.executor.Executor - Finished task 4.0 in stage 60.0 (TID 128). 2296 bytes result sent to driver
15:20:59.268 [task-result-getter-2] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 4.0 in stage 60.0 (TID 128) in 488 ms on 172.23.57.81 (executor driver) (1/8)
15:20:59.290 [Executor task launch worker for task 2.0 in stage 60.0 (TID 126)] INFO  org.apache.spark.executor.Executor - Finished task 2.0 in stage 60.0 (TID 126). 2296 bytes result sent to driver
15:20:59.291 [task-result-getter-0] INFO  org.apache.spark.scheduler.TaskSetManager - Finished task 2.0 in stage 60.0 (TID 126) in 511 ms on 172.23.57.81 (executor driver) (2/8)
15:20:59.306 [Executor task launch worker for task 7.0 in stage 60.0 (TID 131)] INFO  org.apache.spark.executor.Executor - Finished task 7.0 in stage 60.0 (TID 131). 2296 bytes result sent to driver
15:20:59.308 [task-result-getter-3] INFO  org.a