In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [3]:
!tar xf /content/spark-3.1.2-bin-hadoop2.7.tgz

In [4]:
#variáveis do ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] =  "/content/spark-3.1.2-bin-hadoop2.7"

In [5]:
!pip install -q findspark

In [6]:
import findspark
findspark.init(os.environ["SPARK_HOME"])

In [7]:
#criando spark session
from pyspark.sql import SparkSession
#import pyspark.sql.functions as F
#import pyspark.sql.types as T

spark = (SparkSession.builder
         .master('local')
         .appName('streaming_ETL_pipeline')
         .getOrCreate()
)

In [8]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
#Listando todos...
!ls /content/drive/MyDrive/airlines-datasets/

part-00000.csv	part-00004.csv	part-00008.csv	part-00012.csv
part-00001.csv	part-00005.csv	part-00009.csv	part-00013.csv
part-00002.csv	part-00006.csv	part-00010.csv	part-00014.csv
part-00003.csv	part-00007.csv	part-00011.csv	part-00015.csv


In [10]:
part = spark.read.csv("/content/drive/MyDrive/airlines-datasets/part-00000.csv", header=True, inferSchema=True)

In [11]:
part.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [12]:
part.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|IsArrDelayed|IsDepDelayed|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+------------+------------+
|1987|   10|    

In [13]:
#Utilizando part-00000.csv como entrada para criar a versão da stream. Depois, será lido cada arquivo um por um, simulando um sistema de streaming de dados
dataSchema = part.schema
dataSchema

StructType(List(StructField(Year,IntegerType,true),StructField(Month,IntegerType,true),StructField(DayofMonth,IntegerType,true),StructField(DayOfWeek,IntegerType,true),StructField(DepTime,StringType,true),StructField(CRSDepTime,IntegerType,true),StructField(ArrTime,StringType,true),StructField(CRSArrTime,IntegerType,true),StructField(UniqueCarrier,StringType,true),StructField(FlightNum,IntegerType,true),StructField(TailNum,StringType,true),StructField(ActualElapsedTime,StringType,true),StructField(CRSElapsedTime,IntegerType,true),StructField(AirTime,StringType,true),StructField(ArrDelay,StringType,true),StructField(DepDelay,StringType,true),StructField(Origin,StringType,true),StructField(Dest,StringType,true),StructField(Distance,StringType,true),StructField(TaxiIn,StringType,true),StructField(TaxiOut,StringType,true),StructField(Cancelled,IntegerType,true),StructField(CancellationCode,StringType,true),StructField(Diverted,IntegerType,true),StructField(CarrierDelay,StringType,true),Str

In [14]:
#Limitando o flow de streaming para 1 arquivo de cada vez por trigger
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("/content/drive/MyDrive/airlines-datasets/")
)

In [15]:
#Aplicando transformação: rankeando número de voo com respectivas origens e destinos com as maiores médias de atrasos de chegada a partir de um total de 20 voos
streaming.createOrReplaceTempView('ranking_arrDelay_table')
ranking_arrDelay = spark.sql('''SELECT * FROM (
              SELECT FlightNum, Origin, Dest, count(*) as total_flights, round(mean(ArrDelay),2) as mean_ArrDelay_in_minutes 
              FROM ranking_arrDelay_table 
              GROUP BY flightNum, Origin, Dest 
              ORDER BY mean_ArrDelay_in_minutes desc
             )
             WHERE total_flights > 20''')

In [16]:
activityQuery = (
    ranking_arrDelay.writeStream.queryName("ranking_arrDelay_table")
    .format("memory")
    .outputMode("complete")
    .start()
)

import time

for x in range(15):
    _df = spark.sql("SELECT * FROM ranking_arrDelay_table")
    if _df.count() > 0:
        print("Total rows:", _df.count())
        _df.show(10)
    time.sleep(1)

Total rows: 16542
+---------+------+----+-------------+------------------------+
|FlightNum|Origin|Dest|total_flights|mean_ArrDelay_in_minutes|
+---------+------+----+-------------+------------------------+
|      355|   DEN| SEA|           30|                   73.66|
|       81|   LAX| HNL|           34|                   66.03|
|      398|   LAS| ABQ|           36|                   65.81|
|      960|   SJU| EWR|           81|                   64.23|
|      103|   ATL| PHX|           25|                   58.92|
|      399|   MSP| SMF|           27|                    58.1|
|     1105|   DEN| SFO|           69|                   57.93|
|      345|   SJC| SFO|           28|                   56.81|
|      325|   MCI| SFO|           85|                   55.93|
|     1637|   IAD| DEN|           54|                   55.44|
+---------+------+----+-------------+------------------------+
only showing top 10 rows

Total rows: 18677
+---------+------+----+-------------+-------------------

In [17]:
spark.streams.active[0].isActive

True

In [18]:
activityQuery.status

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

In [19]:
activityQuery.stop()