In [1]:
from pyspark.sql import SparkSession
import pyspark as ps
import pyspark.sql.functions as F
from pyspark.sql.functions import col, lit, when, desc, asc
from pyspark.sql import types as T
import time
import pandas as pd


In [2]:
#abrindo sessão
spark=(SparkSession
       .builder
       .appName("Python Spark SQL basic example") 
        .config("spark.some.config.option", "some-value") 
        .getOrCreate())
conf = ps.SparkConf().setMaster("yarn-client").setAppName("sparK-mer")
conf.set("spark.executor.heartbeatInterval","3600s")

<pyspark.conf.SparkConf at 0x26786f49810>

In [3]:
#variáveis
flights=spark.read.parquet('2015_flights.parquet')
flights.printSchema()

root
 |-- DEPARTURE_DELAY: double (nullable = true)
 |-- ARRIVAL_DELAY: double (nullable = true)
 |-- DISTANCE: long (nullable = true)
 |-- SCHEDULED_DEPARTURE: double (nullable = true)



In [4]:
#numero de linhas
flights.count()

5819079

In [5]:
flights.show()

+---------------+-------------+--------+-------------------+
|DEPARTURE_DELAY|ARRIVAL_DELAY|DISTANCE|SCHEDULED_DEPARTURE|
+---------------+-------------+--------+-------------------+
|          -11.0|        -22.0|    1448|0.08333333333333333|
|           -8.0|         -9.0|    2330|0.16666666666666666|
|           -2.0|          5.0|    2296| 0.3333333333333333|
|           -5.0|         -9.0|    2342| 0.3333333333333333|
|           -1.0|        -21.0|    1448| 0.4166666666666667|
|           -5.0|          8.0|    1589| 0.4166666666666667|
|           -6.0|        -17.0|    1299| 0.4166666666666667|
|           14.0|        -10.0|    2125|                0.5|
|          -11.0|        -13.0|    1464|                0.5|
|            3.0|        -15.0|    1747|                0.5|
|           -6.0|        -30.0|    1199|                0.5|
|           -8.0|        -10.0|    2174| 0.5833333333333334|
|            0.0|         -4.0|    1535| 0.5833333333333334|
|           -6.0|       

In [7]:
#ajuste das horas
flights=flights.withColumn('HORAS',F.floor(flights.SCHEDULED_DEPARTURE))
flights=flights.withColumn('MINUTOS', F.round((flights.SCHEDULED_DEPARTURE - F.floor(flights.HORAS)) * 60))
flights.show(10)

+---------------+-------------+--------+-------------------+-----+-------+
|DEPARTURE_DELAY|ARRIVAL_DELAY|DISTANCE|SCHEDULED_DEPARTURE|HORAS|MINUTOS|
+---------------+-------------+--------+-------------------+-----+-------+
|          -11.0|        -22.0|    1448|0.08333333333333333|    0|    5.0|
|           -8.0|         -9.0|    2330|0.16666666666666666|    0|   10.0|
|           -2.0|          5.0|    2296| 0.3333333333333333|    0|   20.0|
|           -5.0|         -9.0|    2342| 0.3333333333333333|    0|   20.0|
|           -1.0|        -21.0|    1448| 0.4166666666666667|    0|   25.0|
|           -5.0|          8.0|    1589| 0.4166666666666667|    0|   25.0|
|           -6.0|        -17.0|    1299| 0.4166666666666667|    0|   25.0|
|           14.0|        -10.0|    2125|                0.5|    0|   30.0|
|          -11.0|        -13.0|    1464|                0.5|    0|   30.0|
|            3.0|        -15.0|    1747|                0.5|    0|   30.0|
+---------------+--------

In [8]:
#groupBy
flights.groupBy('HORAS').count().orderBy('HORAS').show(24)



+-----+------+
|HORAS| count|
+-----+------+
|    0| 14664|
|    1|  5159|
|    2|  1414|
|    3|   778|
|    4|   531|
|    5|118051|
|    6|406940|
|    7|393947|
|    8|381014|
|    9|351403|
|   10|371644|
|   11|358084|
|   12|355611|
|   13|363509|
|   14|329715|
|   15|367760|
|   16|334153|
|   17|390362|
|   18|334380|
|   19|331338|
|   20|259432|
|   21|187467|
|   22|117551|
|   23| 44172|
+-----+------+



In [9]:
#groupBy agg
(flights.groupBy('HORAS').agg(F.round(F.mean('DEPARTURE_DELAY'),1).alias('MEDIA_ATRASO_PARTIDA'),
                            F.median('ARRIVAL_DELAY').alias('MEDIANA_ATRASO_CHEGADA'),
                            F.min('DISTANCE').alias('MINIMO_DISTANCIA'),
                            F.max('DISTANCE').alias('MAXIMO_DISTANCIA')).orderBy('HORAS')).withColumn('SALDO_ATRASO'
                                ,F.round(F.col('MEDIA_ATRASO_PARTIDA')+F.col('MEDIANA_ATRASO_CHEGADA'))).show()

+-----+--------------------+----------------------+----------------+----------------+------------+
|HORAS|MEDIA_ATRASO_PARTIDA|MEDIANA_ATRASO_CHEGADA|MINIMO_DISTANCIA|MAXIMO_DISTANCIA|SALDO_ATRASO|
+-----+--------------------+----------------------+----------------+----------------+------------+
|    0|                 7.2|                  -6.0|             108|            2846|         1.0|
|    1|                 8.1|                  -5.0|             237|            2762|         3.0|
|    2|                 7.3|                  -7.0|             868|            2072|         0.0|
|    3|                 8.9|                  -9.0|             982|            2072|         0.0|
|    4|                10.3|                  -6.0|             257|            1674|         4.0|
|    5|                 1.9|                  -9.0|              67|            2704|        -7.0|
|    6|                 2.1|                  -8.0|              67|            3801|        -6.0|
|    7|   

In [11]:
#groupBy pivot
flights.groupBy('HORAS').pivot('MINUTOS').count().orderBy('HORAS').show(24)

+-----+------+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+
|HORAS|   0.0| 1.0| 2.0| 3.0| 4.0|  5.0| 6.0| 7.0| 8.0| 9.0| 10.0|11.0|12.0|13.0|14.0| 15.0|16.0|17.0|18.0|19.0| 20.0|21.0|22.0|23.0|24.0| 25.0|26.0|27.0|28.0|29.0| 30.0|31.0|32.0|33.0|34.0| 35.0|36.0|37.0|38.0|39.0| 40.0|41.0|42.0|43.0|44.0| 45.0|46.0|47.0|48.0|49.0| 50.0|51.0|52.0|53.0|54.0| 55.0|56.0|57.0|58.0|59.0|
+-----+------+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+-----+----+----+----+----+
|    0|  NULL|  13|  11|  93|   5|  6

In [12]:
flights.withColumn('SAIDA',when(col('DEPARTURE_DELAY')>0,'ATRASO')
                            .when(col('DEPARTURE_DELAY')<0,'ANTECIPADO')
                            .otherwise('NO HORARIO')).withColumn('CHEGADA',when(col('ARRIVAL_DELAY')>0,'ATRASO')
                            .when(col('ARRIVAL_DELAY')<0,'ANTECIPADO')
                            .otherwise('NO HORARIO')).groupBy('CHEGADA').pivot('SAIDA').count().show()

+----------+----------+-------+----------+
|   CHEGADA|ANTECIPADO| ATRASO|NO HORARIO|
+----------+----------+-------+----------+
|ANTECIPADO|   2708248| 559437|    233214|
|NO HORARIO|     75576|  58034|     97674|
|    ATRASO|    494124|1508147|     84625|
+----------+----------+-------+----------+



In [13]:
#ordenação avançada

flights.orderBy(F.asc('HORAS'), F.asc('MINUTOS'), F.desc('DEPARTURE_DELAY')).show()

+---------------+-------------+--------+--------------------+-----+-------+
|DEPARTURE_DELAY|ARRIVAL_DELAY|DISTANCE| SCHEDULED_DEPARTURE|HORAS|MINUTOS|
+---------------+-------------+--------+--------------------+-----+-------+
|          483.0|        500.0|    2174|0.016666666666666666|    0|    1.0|
|           28.0|         19.0|     409|0.016666666666666666|    0|    1.0|
|           27.0|         53.0|     627|0.016666666666666666|    0|    1.0|
|           25.0|          8.0|     678|0.016666666666666666|    0|    1.0|
|            7.0|         -9.0|     783|0.016666666666666666|    0|    1.0|
|            6.0|         -2.0|    1085|0.016666666666666666|    0|    1.0|
|            1.0|        -17.0|     455|0.016666666666666666|    0|    1.0|
|            0.0|         12.0|     763|0.016666666666666666|    0|    1.0|
|           -3.0|          0.0|    2342|0.016666666666666666|    0|    1.0|
|           -4.0|        -26.0|    1626|0.016666666666666666|    0|    1.0|
|           

In [15]:
#join
flights_empresas = (spark.read.format("csv")
      .option("header","true")
      .option('delimiter',';')
      .load("2015_flights.csv"))
print(flights_empresas.printSchema())
flights_empresas=flights_empresas.withColumn('HORAS',flights_empresas['HORAS'].cast(T.IntegerType()))

flights_empresas.show(24)

root
 |-- HORAS: string (nullable = true)
 |-- EMPRESA: string (nullable = true)

None
+-----+-------+
|HORAS|EMPRESA|
+-----+-------+
|    0|    GOL|
|    1|  LATAM|
|    2|   AZUL|
|    3|    GOL|
|    4|  LATAM|
|    5|   AZUL|
|    6|    GOL|
|    7|  LATAM|
|    8|   AZUL|
|    9|    GOL|
|   10|  LATAM|
|   11|   AZUL|
|   12|    GOL|
|   13|  LATAM|
|   14|   AZUL|
|   15|    GOL|
|   16|  LATAM|
|   17|   AZUL|
|   18|    GOL|
|   19|  LATAM|
|   20|   AZUL|
|   21|    GOL|
|   22|  LATAM|
|   23|   AZUL|
+-----+-------+



In [16]:
flights.join(flights_empresas,'HORAS', 'left').show()

+-----+---------------+-------------+--------+-------------------+-------+-------+
|HORAS|DEPARTURE_DELAY|ARRIVAL_DELAY|DISTANCE|SCHEDULED_DEPARTURE|MINUTOS|EMPRESA|
+-----+---------------+-------------+--------+-------------------+-------+-------+
|    0|          -11.0|        -22.0|    1448|0.08333333333333333|    5.0|    GOL|
|    0|           -8.0|         -9.0|    2330|0.16666666666666666|   10.0|    GOL|
|    0|           -2.0|          5.0|    2296| 0.3333333333333333|   20.0|    GOL|
|    0|           -5.0|         -9.0|    2342| 0.3333333333333333|   20.0|    GOL|
|    0|           -1.0|        -21.0|    1448| 0.4166666666666667|   25.0|    GOL|
|    0|           -5.0|          8.0|    1589| 0.4166666666666667|   25.0|    GOL|
|    0|           -6.0|        -17.0|    1299| 0.4166666666666667|   25.0|    GOL|
|    0|           14.0|        -10.0|    2125|                0.5|   30.0|    GOL|
|    0|          -11.0|        -13.0|    1464|                0.5|   30.0|    GOL|
|   

In [18]:
#Cache
#normal
start = time.time()
for i in range(5000):
    flights.count()
end = time.time()
print(f'tempo normal: {(end-start):,.2f}')

#normal cache
flights.cache() #entrando em cache
start = time.time()
for i in range(5000):
    flights.count()
end = time.time()
print(f'tempo cache: {(end-start):,.2f}')
flights.unpersist() #sair do cache

tempo normal: 231.99
tempo cache: 196.48


DataFrame[DEPARTURE_DELAY: double, ARRIVAL_DELAY: double, DISTANCE: bigint, SCHEDULED_DEPARTURE: double, HORAS: bigint, MINUTOS: double]

In [20]:
flights.rdd.getNumPartitions()

7

In [21]:
flights_exp=flights.repartition(24, 'HORAS')
flights_exp.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange hashpartitioning(HORAS#41L, 24), REPARTITION_BY_NUM, [plan_id=479655]
   +- Project [DEPARTURE_DELAY#0, ARRIVAL_DELAY#1, DISTANCE#2L, SCHEDULED_DEPARTURE#3, HORAS#41L, round(((SCHEDULED_DEPARTURE#3 - cast(FLOOR(HORAS#41L) as double)) * 60.0), 0) AS MINUTOS#47]
      +- Project [DEPARTURE_DELAY#0, ARRIVAL_DELAY#1, DISTANCE#2L, SCHEDULED_DEPARTURE#3, FLOOR(SCHEDULED_DEPARTURE#3) AS HORAS#41L]
         +- FileScan parquet [DEPARTURE_DELAY#0,ARRIVAL_DELAY#1,DISTANCE#2L,SCHEDULED_DEPARTURE#3] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/c:/Users/Gabriel/Documents/Escola/Aulas/Git_aulas/Coder/2015_fli..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEPARTURE_DELAY:double,ARRIVAL_DELAY:double,DISTANCE:bigint,SCHEDULED_DEPARTURE:double>




In [22]:
flights_exp.rdd.getNumPartitions()

24

In [23]:
#Salvamento*
flights_exp.toPandas().to_parquet('2015_flights_2.parquet')

In [24]:
spark.stop()